diff --git a/internal/controller/consumer_controller.go b/internal/controller/consumer_controller.go index 22905065..ba7ebb0d 100644 --- a/internal/controller/consumer_controller.go +++ b/internal/controller/consumer_controller.go @@ -18,9 +18,18 @@ package controller import ( "context" + "errors" + "fmt" + "github.com/go-logr/logr" + "github.com/nats-io/nats.go/jetstream" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "time" - jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" + api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" ctrl "sigs.k8s.io/controller-runtime" ) @@ -36,14 +45,264 @@ type ConsumerReconciler struct { // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.3/pkg/reconcile func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := klog.FromContext(ctx) - log.Info("reconcile", "namespace", req.Namespace, "name", req.Name) + if ok := r.ValidNamespace(req.Namespace); !ok { + log.Info("Controller restricted to namespace, skipping reconciliation.") + return ctrl.Result{}, nil + } + + // Fetch consumer resource + consumer := &api.Consumer{} + if err := r.Get(ctx, req.NamespacedName, consumer); err != nil { + if apierrors.IsNotFound(err) { + log.Info("Consumer resource not found. Ignoring since object must be deleted.") + return ctrl.Result{}, nil + } + return ctrl.Result{}, fmt.Errorf("get consumer resource '%s': %w", req.NamespacedName.String(), err) + } + + log = log.WithValues( + "streamName", consumer.Spec.StreamName, + "consumerName", consumer.Spec.DurableName, + ) + + // Update ready status to unknown when no status is set + if consumer.Status.Conditions == nil || len(consumer.Status.Conditions) == 0 { + log.Info("Setting initial ready condition to unknown.") + consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionUnknown, "Reconciling", "Starting reconciliation") + err := r.Status().Update(ctx, consumer) + if err != nil { + return ctrl.Result{}, fmt.Errorf("set condition unknown: %w", err) + } + return ctrl.Result{Requeue: true}, nil + } + + // Add finalizer + if !controllerutil.ContainsFinalizer(consumer, consumerFinalizer) { + log.Info("Adding consumer finalizer.") + if ok := controllerutil.AddFinalizer(consumer, consumerFinalizer); !ok { + return ctrl.Result{}, errors.New("failed to add finalizer to consumer resource") + } + + if err := r.Update(ctx, consumer); err != nil { + return ctrl.Result{}, fmt.Errorf("update consumer resource to add finalizer: %w", err) + } + return ctrl.Result{}, nil + } + + // Check Deletion + markedForDeletion := consumer.GetDeletionTimestamp() != nil + if markedForDeletion { + if controllerutil.ContainsFinalizer(consumer, consumerFinalizer) { + err := r.deleteConsumer(ctx, log, consumer) + if err != nil { + return ctrl.Result{}, fmt.Errorf("delete consumer: %w", err) + } + } else { + log.Info("Consumer marked for deletion and already finalized. Ignoring.") + } + + return ctrl.Result{}, nil + } + + // Create or update stream + if err := r.createOrUpdate(ctx, log, consumer); err != nil { + return ctrl.Result{}, fmt.Errorf("create or update: %s", err) + } return ctrl.Result{}, nil } +func (r *ConsumerReconciler) deleteConsumer(ctx context.Context, log logr.Logger, consumer *api.Consumer) error { + + // Set status to not false + consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, "Finalizing", "Performing finalizer operations.") + if err := r.Status().Update(ctx, consumer); err != nil { + return fmt.Errorf("update ready condition: %w", err) + } + + if !consumer.Spec.PreventDelete && !r.ReadOnly() { + err := r.WithJetStreamClient(consumerConnOpts(consumer.Spec), func(js jetstream.JetStream) error { + return js.DeleteConsumer(ctx, consumer.Spec.StreamName, consumer.Spec.DurableName) + }) + switch { + case errors.Is(err, jetstream.ErrConsumerNotFound): + log.Info("Consumer does not exist. Unable to delete.") + case errors.Is(err, jetstream.ErrStreamNotFound): + log.Info("Stream of consumer does not exist. Unable to delete.") + case err != nil: + return fmt.Errorf("delete jetstream consumer: %w", err) + default: + log.Info("Consumer deleted.") + } + } else { + log.Info("Skipping consumer deletion.", + "consumerName", consumer.Spec.DurableName, + "preventDelete", consumer.Spec.PreventDelete, + "read-only", r.ReadOnly(), + ) + } + + log.Info("Removing consumer finalizer.") + if ok := controllerutil.RemoveFinalizer(consumer, consumerFinalizer); !ok { + return errors.New("failed to remove consumer finalizer") + } + if err := r.Update(ctx, consumer); err != nil { + return fmt.Errorf("remove finalizer: %w", err) + } + + return nil +} + +func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger, consumer *api.Consumer) error { + + // Create or Update the stream based on the spec + if consumer.Spec.PreventUpdate || r.ReadOnly() { + log.Info("Skipping consumer creation or update.", + "preventDelete", consumer.Spec.PreventDelete, + "read-only", r.ReadOnly(), + ) + return nil + } + + // Map spec to consumer target config + targetConfig, err := consumerSpecToConfig(&consumer.Spec) + if err != nil { + return fmt.Errorf("map consumer spec to target config: %w", err) + } + + err = r.WithJetStreamClient(consumerConnOpts(consumer.Spec), func(js jetstream.JetStream) error { + log.Info("Consumer created or updated.") + _, err := js.CreateOrUpdateConsumer(ctx, consumer.Spec.StreamName, *targetConfig) + return err + }) + if err != nil { + err = fmt.Errorf("create or update consumer: %w", err) + consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, "Errored", err.Error()) + if err := r.Status().Update(ctx, consumer); err != nil { + log.Error(err, "Failed to update ready condition to Errored.") + } + return err + } + + // update the observed generation and ready status + consumer.Status.ObservedGeneration = consumer.Generation + consumer.Status.Conditions = updateReadyCondition( + consumer.Status.Conditions, + v1.ConditionTrue, + "Reconciling", + "Consumer successfully created or updated.", + ) + err = r.Status().Update(ctx, consumer) + if err != nil { + return fmt.Errorf("update ready condition: %w", err) + } + + return nil +} + +func consumerConnOpts(spec api.ConsumerSpec) *connectionOptions { + return &connectionOptions{ + Account: spec.Account, + Creds: spec.Creds, + Nkey: spec.Nkey, + Servers: spec.Servers, + TLS: spec.TLS, + } +} + +func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, error) { + + config := &jetstream.ConsumerConfig{ + Durable: spec.DurableName, + Description: spec.Description, + OptStartSeq: uint64(spec.OptStartSeq), + MaxDeliver: spec.MaxDeliver, + FilterSubject: spec.FilterSubject, + RateLimit: uint64(spec.RateLimitBps), + SampleFrequency: spec.SampleFreq, + MaxWaiting: spec.MaxWaiting, + MaxAckPending: spec.MaxAckPending, + HeadersOnly: spec.HeadersOnly, + MaxRequestBatch: spec.MaxRequestBatch, + MaxRequestMaxBytes: spec.MaxRequestMaxBytes, + Replicas: spec.Replicas, + MemoryStorage: spec.MemStorage, + FilterSubjects: spec.FilterSubjects, + Metadata: spec.Metadata, + + // Explicitly set not (yet) mapped fields + Name: "", + InactiveThreshold: 0, + } + + // DeliverPolicy + if spec.DeliverPolicy != "" { + err := config.DeliverPolicy.UnmarshalJSON(asJsonString(spec.DeliverPolicy)) + if err != nil { + return nil, fmt.Errorf("invalid delivery policy: %w", err) + } + } + + // OptStartTime RFC3339 + if spec.OptStartTime != "" { + t, err := time.Parse(time.RFC3339, spec.OptStartTime) + if err != nil { + return nil, fmt.Errorf("invalid opt start time: %w", err) + } + config.OptStartTime = &t + } + + // AckPolicy + if spec.AckPolicy != "" { + err := config.AckPolicy.UnmarshalJSON(asJsonString(spec.AckPolicy)) + if err != nil { + return nil, fmt.Errorf("invalid ack policy: %w", err) + } + } + + // AckWait + if spec.AckWait != "" { + d, err := time.ParseDuration(spec.AckWait) + if err != nil { + return nil, fmt.Errorf("invalid ack wait duration: %w", err) + } + config.AckWait = d + } + + //BackOff + for _, bo := range spec.BackOff { + d, err := time.ParseDuration(bo) + if err != nil { + return nil, fmt.Errorf("invalid backoff: %w", err) + } + + config.BackOff = append(config.BackOff, d) + } + + // ReplayPolicy + if spec.ReplayPolicy != "" { + err := config.ReplayPolicy.UnmarshalJSON(asJsonString(spec.ReplayPolicy)) + if err != nil { + return nil, fmt.Errorf("invalid replay policy: %w", err) + } + } + + // MaxRequestExpires + if spec.MaxRequestExpires != "" { + d, err := time.ParseDuration(spec.MaxRequestExpires) + if err != nil { + return nil, fmt.Errorf("invalid opt start time: %w", err) + } + config.MaxRequestExpires = d + } + + return config, nil +} + // SetupWithManager sets up the controller with the Manager. func (r *ConsumerReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). - For(&jetstreamnatsiov1beta2.Consumer{}). + For(&api.Consumer{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). Complete(r) } diff --git a/internal/controller/consumer_controller_test.go b/internal/controller/consumer_controller_test.go index 6ce92bcc..62fd2d22 100644 --- a/internal/controller/consumer_controller_test.go +++ b/internal/controller/consumer_controller_test.go @@ -17,73 +17,668 @@ limitations under the License. package controller import ( - "context" - + "github.com/nats-io/nats.go/jetstream" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "testing" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" + api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" ) var _ = Describe("Consumer Controller", func() { Context("When reconciling a resource", func() { const resourceName = "test-resource" - ctx := context.Background() + const streamName = "orders" + const consumerName = "test-consumer" typeNamespacedName := types.NamespacedName{ Name: resourceName, Namespace: "default", // TODO(user):Modify as needed } - consumer := &jetstreamnatsiov1beta2.Consumer{} + consumer := &api.Consumer{} + + emptyStreamConfig := jetstream.StreamConfig{ + Name: streamName, + Replicas: 1, + Retention: jetstream.WorkQueuePolicy, + Discard: jetstream.DiscardOld, + Storage: jetstream.FileStorage, + } - BeforeEach(func() { + emptyConsumerConfig := jetstream.ConsumerConfig{ + Durable: consumerName, + } + + // Tested coontroller + var controller *ConsumerReconciler + + BeforeEach(func(ctx SpecContext) { By("creating the custom resource for the Kind Consumer") err := k8sClient.Get(ctx, typeNamespacedName, consumer) if err != nil && errors.IsNotFound(err) { - resource := &jetstreamnatsiov1beta2.Consumer{ + resource := &api.Consumer{ ObjectMeta: metav1.ObjectMeta{ Name: resourceName, Namespace: "default", }, - Spec: jetstreamnatsiov1beta2.ConsumerSpec{ - AckPolicy: "none", - DeliverPolicy: "new", - DurableName: "test-consumer", + Spec: api.ConsumerSpec{ + AckPolicy: "explicit", + DeliverPolicy: "all", + DurableName: consumerName, + Description: "test consumer", + StreamName: streamName, ReplayPolicy: "instant", }, - // TODO(user): Specify other spec details if needed. } Expect(k8sClient.Create(ctx, resource)).To(Succeed()) + // Fetch consumer + Expect(k8sClient.Get(ctx, typeNamespacedName, consumer)).To(Succeed()) + } + + By("creating the underlying stream") + _, err = jsClient.CreateStream(ctx, emptyStreamConfig) + Expect(err).ToNot(HaveOccurred()) + + By("setting up the tested controller") + controller = &ConsumerReconciler{ + baseController, } }) - AfterEach(func() { - // TODO(user): Cleanup logic after each test, like removing the resource instance. - resource := &jetstreamnatsiov1beta2.Consumer{} + AfterEach(func(ctx SpecContext) { + By("removing the consumer resource") + resource := &api.Consumer{} err := k8sClient.Get(ctx, typeNamespacedName, resource) - Expect(err).NotTo(HaveOccurred()) + if err != nil { + Expect(err).To(MatchError(k8serrors.IsNotFound, "Is not found")) + } else { + if controllerutil.ContainsFinalizer(resource, consumerFinalizer) { + By("removing the finalizer") + controllerutil.RemoveFinalizer(resource, consumerFinalizer) + Expect(k8sClient.Update(ctx, resource)).To(Succeed()) + } - By("Cleanup the specific resource instance Consumer") - Expect(k8sClient.Delete(ctx, resource)).To(Succeed()) - }) - It("should successfully reconcile the resource", func() { - By("Reconciling the created resource") - controllerReconciler := &ConsumerReconciler{ - baseController, + By("removing the consumer resource") + Expect(k8sClient.Delete(ctx, resource)). + To(SatisfyAny( + Succeed(), + MatchError(k8serrors.IsNotFound, "is not found"), + )) } - _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ - NamespacedName: typeNamespacedName, + By("deleting the nats consumer") + Expect(jsClient.DeleteConsumer(ctx, streamName, consumerName)). + To(SatisfyAny( + Succeed(), + MatchError(jetstream.ErrStreamNotFound), + MatchError(jetstream.ErrConsumerNotFound), + )) + + By("deleting the consumers nats stream") + Expect(jsClient.DeleteStream(ctx, streamName)). + To(SatisfyAny( + Succeed(), + MatchError(jetstream.ErrStreamNotFound), + )) + }) + + When("reconciling a not existing resource", func() { + It("should stop reconciliation without error", func(ctx SpecContext) { + By("reconciling the created resource") + result, err := controller.Reconcile(ctx, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: "fake", + Name: "not-existing", + }, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal(ctrl.Result{})) + }) + }) + + When("reconciling a not initialized resource", func() { + + It("should initialize a new resource", func(ctx SpecContext) { + + By("re-queueing until it is initialized") + // Initialization can require multiple reconciliation loops + Eventually(func(ctx SpecContext) *api.Consumer { + _, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + got := &api.Consumer{} + Expect(k8sClient.Get(ctx, typeNamespacedName, got)).To(Succeed()) + return got + }).WithContext(ctx). + Within(time.Second). + Should(SatisfyAll( + HaveField("Finalizers", HaveExactElements(consumerFinalizer)), + HaveField("Status.Conditions", Not(BeEmpty())), + )) + + By("validating the ready condition") + // Fetch consumer + Expect(k8sClient.Get(ctx, typeNamespacedName, consumer)).To(Succeed()) + Expect(consumer.Status.Conditions).To(HaveLen(1)) + + assertReadyStateMatches(consumer.Status.Conditions[0], v1.ConditionUnknown, "Reconciling", "Starting reconciliation", time.Now()) + }) + }) + + When("reconciling an initialized resource", func() { + + BeforeEach(func(ctx SpecContext) { + By("initializing the stream resource") + + By("setting the finalizer") + Expect(controllerutil.AddFinalizer(consumer, consumerFinalizer)).To(BeTrue()) + Expect(k8sClient.Update(ctx, consumer)).To(Succeed()) + + By("setting an unknown ready state") + consumer.Status.Conditions = []api.Condition{{ + Type: readyCondType, + Status: v1.ConditionUnknown, + Reason: "Test", + Message: "start condition", + LastTransitionTime: time.Now().Format(time.RFC3339Nano), + }} + Expect(k8sClient.Status().Update(ctx, consumer)).To(Succeed()) + Expect(k8sClient.Get(ctx, typeNamespacedName, consumer)).To(Succeed()) + }) + + When("the underlying stream does not exist", func() { + It("should set false ready state and error", func(ctx SpecContext) { + By("setting a not existing stream on the resource") + consumer.Spec.StreamName = "not-existing" + Expect(k8sClient.Update(ctx, consumer)).To(Succeed()) + + By("running Reconcile") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).To(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking for expected ready state") + Expect(k8sClient.Get(ctx, typeNamespacedName, consumer)).To(Succeed()) + Expect(consumer.Status.Conditions).To(HaveLen(1)) + assertReadyStateMatches( + consumer.Status.Conditions[0], + v1.ConditionFalse, + "Errored", + "stream", // Not existing stream as message + time.Now(), + ) + }) + }) + + It("should create a new consumer", func(ctx SpecContext) { + + By("running Reconcile") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + // Fetch resource + Expect(k8sClient.Get(ctx, typeNamespacedName, consumer)).To(Succeed()) + + By("checking if the ready state was updated") + Expect(consumer.Status.Conditions).To(HaveLen(1)) + assertReadyStateMatches(consumer.Status.Conditions[0], v1.ConditionTrue, "Reconciling", "created or updated", time.Now()) + + By("checking if the observed generation matches") + Expect(consumer.Status.ObservedGeneration).To(Equal(consumer.Generation)) + + By("checking if the consumer was created") + natsconsumer, err := jsClient.Consumer(ctx, streamName, consumerName) + Expect(err).NotTo(HaveOccurred()) + consumerInfo, err := natsconsumer.Info(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(consumerInfo.Config.Name).To(Equal(consumerName)) + Expect(consumerInfo.Config.Description).To(Equal("test consumer")) + Expect(consumerInfo.Created).To(BeTemporally("~", time.Now(), time.Second)) + }) + + It("should update an existing consumer", func(ctx SpecContext) { + + By("reconciling once to create the consumer") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + // Fetch resource + Expect(k8sClient.Get(ctx, typeNamespacedName, consumer)).To(Succeed()) + previousTransitionTime := consumer.Status.Conditions[0].LastTransitionTime + + By("updating the resource") + consumer.Spec.Description = "new description" + Expect(k8sClient.Update(ctx, consumer)).To(Succeed()) + + By("reconciling the updated resource") + result, err = controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + // Fetch resource + Expect(k8sClient.Get(ctx, typeNamespacedName, consumer)).To(Succeed()) + + By("checking if the state transition time was not updated") + Expect(consumer.Status.Conditions).To(HaveLen(1)) + Expect(consumer.Status.Conditions[0].LastTransitionTime).To(Equal(previousTransitionTime)) + + By("checking if the observed generation matches") + Expect(consumer.Status.ObservedGeneration).To(Equal(consumer.Generation)) + + By("checking if the consumer was updated") + natsStream, err := jsClient.Consumer(ctx, streamName, consumerName) + Expect(err).NotTo(HaveOccurred()) + + streamInfo, err := natsStream.Info(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(streamInfo.Config.Description).To(Equal("new description")) + // Other fields unchanged + Expect(streamInfo.Config.ReplayPolicy).To(Equal(jetstream.ReplayInstantPolicy)) + }) + + When("PreventUpdate is set", func() { + + BeforeEach(func(ctx SpecContext) { + By("setting preventUpdate on the resource") + consumer.Spec.PreventUpdate = true + Expect(k8sClient.Update(ctx, consumer)).To(Succeed()) + }) + It("should not create the consumer", func(ctx SpecContext) { + By("running Reconcile") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that no consumer was created") + _, err = jsClient.Consumer(ctx, streamName, consumerName) + Expect(err).To(MatchError(jetstream.ErrConsumerNotFound)) + }) + It("should not update the consumer", func(ctx SpecContext) { + By("creating the consumer") + _, err := jsClient.CreateConsumer(ctx, streamName, emptyConsumerConfig) + Expect(err).NotTo(HaveOccurred()) + + By("running Reconcile") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that consumer was not updated") + c, err := jsClient.Consumer(ctx, streamName, consumerName) + Expect(c.CachedInfo().Config.Description).To(BeEmpty()) + }) + }) + + When("read-only mode is enabled", func() { + + BeforeEach(func(ctx SpecContext) { + By("setting read only on the controller") + readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{ReadOnly: true}) + Expect(err).NotTo(HaveOccurred()) + controller = &ConsumerReconciler{ + JetStreamController: readOnly, + } + }) + + It("should not create the consumer", func(ctx SpecContext) { + By("running Reconcile") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that no consumer was created") + _, err = jsClient.Consumer(ctx, streamName, consumerName) + Expect(err).To(MatchError(jetstream.ErrConsumerNotFound)) + }) + It("should not update the consumer", func(ctx SpecContext) { + By("creating the consumer") + _, err := jsClient.CreateConsumer(ctx, streamName, emptyConsumerConfig) + Expect(err).NotTo(HaveOccurred()) + + By("running Reconcile") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that consumer was not updated") + s, err := jsClient.Consumer(ctx, streamName, consumerName) + Expect(s.CachedInfo().Config.Description).To(BeEmpty()) + }) + }) + + When("namespace restriction is enabled", func() { + + BeforeEach(func(ctx SpecContext) { + By("setting a namespace on the resource") + namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{Namespace: "other-namespace"}) + Expect(err).NotTo(HaveOccurred()) + controller = &ConsumerReconciler{ + JetStreamController: namespaced, + } + }) + + It("should not create the consumer", func(ctx SpecContext) { + By("running Reconcile") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that no consumer was created") + _, err = jsClient.Consumer(ctx, streamName, consumerName) + Expect(err).To(MatchError(jetstream.ErrConsumerNotFound)) + }) + It("should not update the consumer", func(ctx SpecContext) { + By("creating the consumer") + _, err := jsClient.CreateConsumer(ctx, streamName, emptyConsumerConfig) + Expect(err).NotTo(HaveOccurred()) + + By("running Reconcile") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that consumer was not updated") + s, err := jsClient.Consumer(ctx, streamName, consumerName) + Expect(s.CachedInfo().Config.Description).To(BeEmpty()) + }) + }) + + When("the resource is marked for deletion", func() { + + BeforeEach(func(ctx SpecContext) { + By("marking the resource for deletion") + Expect(k8sClient.Delete(ctx, consumer)).To(Succeed()) + Expect(k8sClient.Get(ctx, typeNamespacedName, consumer)).To(Succeed()) // re-fetch after update + }) + + It("should succeed deleting a not existing consumer", func(ctx SpecContext) { + By("reconciling") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that the resource is deleted") + Eventually(k8sClient.Get). + WithArguments(ctx, typeNamespacedName, consumer). + ShouldNot(Succeed()) + }) + + It("should succeed deleting a consumer of a deleted stream", func(ctx SpecContext) { + By("Setting not existing stream") + consumer.Spec.StreamName = "deleted-stream" + Expect(k8sClient.Update(ctx, consumer)).To(Succeed()) + + By("reconciling") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that the resource is deleted") + Eventually(k8sClient.Get). + WithArguments(ctx, typeNamespacedName, consumer). + ShouldNot(Succeed()) + }) + + When("the underlying consumer exists", func() { + BeforeEach(func(ctx SpecContext) { + By("creating the consumer on the nats server") + _, err := jsClient.CreateConsumer(ctx, streamName, emptyConsumerConfig) + Expect(err).NotTo(HaveOccurred()) + }) + + AfterEach(func(ctx SpecContext) { + err := jsClient.DeleteConsumer(ctx, streamName, consumerName) + if err != nil { + Expect(err).To(MatchError(jetstream.ErrConsumerNotFound)) + } + }) + + It("should delete the consumer", func(ctx SpecContext) { + By("reconciling") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that the consumer is deleted") + _, err = jsClient.Consumer(ctx, streamName, consumerName) + Expect(err).To(MatchError(jetstream.ErrConsumerNotFound)) + + By("checking that the resource is deleted") + Eventually(k8sClient.Get). + WithArguments(ctx, typeNamespacedName, consumer). + ShouldNot(Succeed()) + }) + + When("PreventDelete is set", func() { + BeforeEach(func(ctx SpecContext) { + By("setting preventDelete on the resource") + consumer.Spec.PreventDelete = true + Expect(k8sClient.Update(ctx, consumer)).To(Succeed()) + }) + It("Should delete the resource and not delete the nats consumer", func(ctx SpecContext) { + By("reconciling") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that the consumer is not deleted") + _, err = jsClient.Consumer(ctx, streamName, consumerName) + Expect(err).NotTo(HaveOccurred()) + + By("checking that the resource is deleted") + Eventually(k8sClient.Get). + WithArguments(ctx, typeNamespacedName, consumer). + ShouldNot(Succeed()) + }) + }) + + When("read only is set", func() { + BeforeEach(func(ctx SpecContext) { + By("setting read only on the controller") + readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{ReadOnly: true}) + Expect(err).NotTo(HaveOccurred()) + controller = &ConsumerReconciler{ + JetStreamController: readOnly, + } + }) + It("should delete the resource and not delete the consumer", func(ctx SpecContext) { + + By("reconciling") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that the consumer is not deleted") + _, err = jsClient.Consumer(ctx, streamName, consumerName) + Expect(err).NotTo(HaveOccurred()) + + By("checking that the resource is deleted") + Eventually(k8sClient.Get). + WithArguments(ctx, typeNamespacedName, consumer). + ShouldNot(Succeed()) + }) + }) + + When("controller is restricted to different namespace", func() { + BeforeEach(func(ctx SpecContext) { + namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{Namespace: "other-namespace"}) + Expect(err).NotTo(HaveOccurred()) + controller = &ConsumerReconciler{ + JetStreamController: namespaced, + } + }) + It("should not delete the resource and consumer", func(ctx SpecContext) { + + By("reconciling") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that the consumer is not deleted") + _, err = jsClient.Consumer(ctx, streamName, consumerName) + Expect(err).NotTo(HaveOccurred()) + + By("checking that the finalizer is not removed") + Expect(k8sClient.Get(ctx, typeNamespacedName, consumer)).To(Succeed()) + Expect(consumer.Finalizers).To(ContainElement(consumerFinalizer)) + }) + }) + }) + }) + + It("should create consumer on different server as specified in spec", func(ctx SpecContext) { + + By("setting up the alternative server") + altServer := CreateTestServer() + defer altServer.Shutdown() + // Setup altClient for alternate server + altClient, closer, err := CreateJetStreamClient(&NatsConfig{ServerURL: altServer.ClientURL()}, true) + defer closer.Close() + Expect(err).NotTo(HaveOccurred()) + + By("setting up the stream on the alternative server") + _, err = altClient.CreateStream(ctx, emptyStreamConfig) + Expect(err).NotTo(HaveOccurred()) + + By("setting the server in the consumer spec") + consumer.Spec.Servers = []string{altServer.ClientURL()} + Expect(k8sClient.Update(ctx, consumer)).To(Succeed()) + + By("checking precondition, that the consumer does not yet exist") + got, err := jsClient.Consumer(ctx, streamName, consumerName) + Expect(err).To(MatchError(jetstream.ErrConsumerNotFound)) + + By("reconciling the resource") + result, err := controller.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking if the consumer was created on the alternative server") + got, err = altClient.Consumer(ctx, streamName, consumerName) + Expect(err).NotTo(HaveOccurred()) + Expect(got.CachedInfo().Created).To(BeTemporally("~", time.Now(), time.Second)) + + By("checking that the consumer was NOT created on the original server") + _, err = jsClient.Consumer(ctx, streamName, consumerName) + Expect(err).To(MatchError(jetstream.ErrConsumerNotFound)) }) - Expect(err).NotTo(HaveOccurred()) - // TODO(user): Add more specific assertions depending on your controller's reconciliation logic. - // Example: If you expect a certain status condition after reconciliation, verify it here. }) }) }) + +func Test_consumerSpecToConfig(t *testing.T) { + + date := time.Date(2024, 12, 03, 16, 55, 5, 0, time.UTC) + dateString := date.Format(time.RFC3339) + + tests := []struct { + name string + spec *api.ConsumerSpec + want *jetstream.ConsumerConfig + wantErr bool + }{ + { + name: "empty spec", + spec: &api.ConsumerSpec{}, + want: &jetstream.ConsumerConfig{}, + wantErr: false, + }, + { + name: "full spec", + spec: &api.ConsumerSpec{ + AckPolicy: "explicit", + AckWait: "10ns", + BackOff: []string{"1s", "5m"}, + Creds: "", + DeliverGroup: "", + DeliverPolicy: "new", + DeliverSubject: "", + Description: "test consumer", + PreventDelete: false, + PreventUpdate: false, + DurableName: "test-consumer", + FilterSubject: "time.us.>", + FilterSubjects: []string{"time.us.east", "time.us.west"}, + FlowControl: false, + HeadersOnly: true, + HeartbeatInterval: "", + MaxAckPending: 6, + MaxDeliver: 3, + MaxRequestBatch: 7, + MaxRequestExpires: "8s", + MaxRequestMaxBytes: 1024, + MaxWaiting: 5, + MemStorage: true, + Nkey: "", + OptStartSeq: 17, + OptStartTime: dateString, + RateLimitBps: 512, + ReplayPolicy: "instant", + Replicas: 9, + SampleFreq: "25%", + Servers: nil, + StreamName: "", + TLS: api.TLS{}, + Account: "", + Metadata: map[string]string{ + "meta": "data", + }, + }, + want: &jetstream.ConsumerConfig{ + Name: "", // Optional, not mapped + Durable: "test-consumer", + Description: "test consumer", + DeliverPolicy: jetstream.DeliverNewPolicy, + OptStartSeq: 17, + OptStartTime: &date, + AckPolicy: jetstream.AckExplicitPolicy, + AckWait: 10 * time.Nanosecond, + MaxDeliver: 3, + BackOff: []time.Duration{time.Second, 5 * time.Minute}, + FilterSubject: "time.us.>", + ReplayPolicy: jetstream.ReplayInstantPolicy, + RateLimit: 512, + SampleFrequency: "25%", + MaxWaiting: 5, + MaxAckPending: 6, + HeadersOnly: true, + MaxRequestBatch: 7, + MaxRequestExpires: 8 * time.Second, + MaxRequestMaxBytes: 1024, + InactiveThreshold: 0, // TODO no value? + Replicas: 9, + MemoryStorage: true, + FilterSubjects: []string{"time.us.east", "time.us.west"}, + Metadata: map[string]string{ + "meta": "data", + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := consumerSpecToConfig(tt.spec) + if (err != nil) != tt.wantErr { + t.Errorf("consumerSpecToConfig() error = %v, wantErr %v", err, tt.wantErr) + return + } + assert.EqualValues(t, tt.want, got, "consumerSpecToConfig(%v)", tt.spec) + }) + } +} diff --git a/internal/controller/types.go b/internal/controller/types.go index f48d3c4b..de166267 100644 --- a/internal/controller/types.go +++ b/internal/controller/types.go @@ -1,6 +1,7 @@ package controller const ( - readyCondType = "Ready" - streamFinalizer = "stream.nats.io/finalizer" + readyCondType = "Ready" + streamFinalizer = "stream.nats.io/finalizer" + consumerFinalizer = "consumer.nats.io/finalizer" )