diff --git a/.gitignore b/.gitignore index 8571a5c6..8d5ae051 100644 --- a/.gitignore +++ b/.gitignore @@ -18,4 +18,5 @@ /nats-boot-config /nats-boot-config.docker /tools +/bin /.idea diff --git a/Makefile b/Makefile index 5976b07b..e3e78b1a 100644 --- a/Makefile +++ b/Makefile @@ -202,6 +202,7 @@ $(ENVTEST): $(LOCALBIN) .PHONY: test test: envtest go vet ./controllers/... ./pkg/natsreloader/... ./internal/controller/... + $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path ## Get k8s binaries go test -race -cover -count=1 -timeout 10s ./controllers/... ./pkg/natsreloader/... ./internal/controller/... .PHONY: clean diff --git a/cmd/jetstream-controller/main.go b/cmd/jetstream-controller/main.go index 1ec60c3d..62638feb 100644 --- a/cmd/jetstream-controller/main.go +++ b/cmd/jetstream-controller/main.go @@ -88,18 +88,23 @@ func run() error { if *controlLoop { klog.Warning("Starting jetStream controller in experimental control loop mode") + natsCfg := &controller.NatsConfig{ CRDConnect: *crdConnect, ClientName: "jetstream-controller", Credentials: *creds, NKey: *nkey, ServerURL: *server, - CA: *ca, + CAs: []string{}, Certificate: *cert, Key: *key, TLSFirst: *tlsfirst, } + if *ca != "" { + natsCfg.CAs = []string{*ca} + } + controllerCfg := &controller.Config{ ReadOnly: *readOnly, Namespace: *namespace, diff --git a/controllers/jetstream/consumer.go b/controllers/jetstream/consumer.go index 52f3f532..8dba6a89 100644 --- a/controllers/jetstream/consumer.go +++ b/controllers/jetstream/consumer.go @@ -462,7 +462,7 @@ func setConsumerOK(ctx context.Context, s *apis.Consumer, i typed.ConsumerInterf sc := s.DeepCopy() sc.Status.ObservedGeneration = s.Generation - sc.Status.Conditions = upsertCondition(sc.Status.Conditions, apis.Condition{ + sc.Status.Conditions = UpsertCondition(sc.Status.Conditions, apis.Condition{ Type: readyCondType, Status: k8sapi.ConditionTrue, LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano), @@ -490,7 +490,7 @@ func setConsumerErrored(ctx context.Context, s *apis.Consumer, sif typed.Consume } sc := s.DeepCopy() - sc.Status.Conditions = upsertCondition(sc.Status.Conditions, apis.Condition{ + sc.Status.Conditions = UpsertCondition(sc.Status.Conditions, apis.Condition{ Type: readyCondType, Status: k8sapi.ConditionFalse, LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano), diff --git a/controllers/jetstream/controller.go b/controllers/jetstream/controller.go index f4100ca2..8c009537 100644 --- a/controllers/jetstream/controller.go +++ b/controllers/jetstream/controller.go @@ -495,7 +495,7 @@ func processQueueNext(q workqueue.RateLimitingInterface, jmsClient jsmClientFunc q.Forget(item) } -func upsertCondition(cs []apis.Condition, next apis.Condition) []apis.Condition { +func UpsertCondition(cs []apis.Condition, next apis.Condition) []apis.Condition { for i := 0; i < len(cs); i++ { if cs[i].Type != next.Type { continue diff --git a/controllers/jetstream/controller_test.go b/controllers/jetstream/controller_test.go index 68f8f7fd..aae4d9b6 100644 --- a/controllers/jetstream/controller_test.go +++ b/controllers/jetstream/controller_test.go @@ -186,7 +186,7 @@ func TestUpsertCondition(t *testing.T) { var cs []apis.Condition - cs = upsertCondition(cs, apis.Condition{ + cs = UpsertCondition(cs, apis.Condition{ Type: readyCondType, Status: k8sapis.ConditionTrue, LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano), @@ -202,7 +202,7 @@ func TestUpsertCondition(t *testing.T) { t.Fatalf("got=%s; want=%s", got, want) } - cs = upsertCondition(cs, apis.Condition{ + cs = UpsertCondition(cs, apis.Condition{ Type: readyCondType, Status: k8sapis.ConditionFalse, LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano), @@ -218,7 +218,7 @@ func TestUpsertCondition(t *testing.T) { t.Fatalf("got=%s; want=%s", got, want) } - cs = upsertCondition(cs, apis.Condition{ + cs = UpsertCondition(cs, apis.Condition{ Type: "Foo", Status: k8sapis.ConditionTrue, LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano), diff --git a/controllers/jetstream/stream.go b/controllers/jetstream/stream.go index 7dc2ec89..a340b88f 100644 --- a/controllers/jetstream/stream.go +++ b/controllers/jetstream/stream.go @@ -571,7 +571,7 @@ func setStreamErrored(ctx context.Context, s *apis.Stream, sif typed.StreamInter } sc := s.DeepCopy() - sc.Status.Conditions = upsertCondition(sc.Status.Conditions, apis.Condition{ + sc.Status.Conditions = UpsertCondition(sc.Status.Conditions, apis.Condition{ Type: readyCondType, Status: k8sapi.ConditionFalse, LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano), @@ -600,7 +600,7 @@ func setStreamOK(ctx context.Context, s *apis.Stream, i typed.StreamInterface) ( sc := s.DeepCopy() sc.Status.ObservedGeneration = s.Generation - sc.Status.Conditions = upsertCondition(sc.Status.Conditions, apis.Condition{ + sc.Status.Conditions = UpsertCondition(sc.Status.Conditions, apis.Condition{ Type: readyCondType, Status: k8sapi.ConditionTrue, LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano), diff --git a/internal/controller/account_controller.go b/internal/controller/account_controller.go index a5686437..72418667 100644 --- a/internal/controller/account_controller.go +++ b/internal/controller/account_controller.go @@ -18,21 +18,15 @@ package controller import ( "context" - "github.com/nats-io/nats.go/jetstream" "k8s.io/klog/v2" jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" - "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" ) // AccountReconciler reconciles a Account object type AccountReconciler struct { - client.Client - Scheme *runtime.Scheme - Config *Config - JetStream jetstream.JetStream + JetStreamController } // Reconcile is part of the main kubernetes reconciliation loop which aims to diff --git a/internal/controller/account_controller_test.go b/internal/controller/account_controller_test.go index aa897363..46debade 100644 --- a/internal/controller/account_controller_test.go +++ b/internal/controller/account_controller_test.go @@ -69,8 +69,7 @@ var _ = Describe("Account Controller", func() { It("should successfully reconcile the resource", func() { By("Reconciling the created resource") controllerReconciler := &AccountReconciler{ - Client: k8sClient, - Scheme: k8sClient.Scheme(), + baseController, } _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ diff --git a/internal/controller/client.go b/internal/controller/client.go index 8c81994e..c20144b3 100644 --- a/internal/controller/client.go +++ b/internal/controller/client.go @@ -12,7 +12,7 @@ type NatsConfig struct { Credentials string NKey string ServerURL string - CA string + CAs []string Certificate string Key string TLSFirst bool @@ -46,19 +46,26 @@ func (o *NatsConfig) buildOptions() ([]nats.Option, error) { opts = append(opts, nats.ClientCert(o.Certificate, o.Key)) } - if o.CA != "" { - opts = append(opts, nats.RootCAs(o.CA)) + if o.CAs != nil && len(o.CAs) > 0 { + opts = append(opts, nats.RootCAs(o.CAs...)) } } return opts, nil } -func CreateJetStreamClient(cfg *NatsConfig, pedantic bool) (jetstream.JetStream, error) { +type Closable interface { + Close() +} + +// CreateJetStreamClient creates new Jetstream client with a connection based on the given NatsConfig. +// Returns a jetstream.Jetstream client and the Closable of the underlying connection. +// Close should be called when the client is no longer used. +func CreateJetStreamClient(cfg *NatsConfig, pedantic bool) (jetstream.JetStream, Closable, error) { opts, err := cfg.buildOptions() if err != nil { - return nil, fmt.Errorf("nats options: %w", err) + return nil, nil, fmt.Errorf("nats options: %w", err) } // Set pedantic option @@ -74,12 +81,12 @@ func CreateJetStreamClient(cfg *NatsConfig, pedantic bool) (jetstream.JetStream, nc, err := nats.Connect(cfg.ServerURL, opts...) if err != nil { - return nil, fmt.Errorf("nats connect: %w", err) + return nil, nil, fmt.Errorf("nats connect: %w", err) } js, err := jetstream.New(nc) if err != nil { - return nil, fmt.Errorf("new jetstream: %w", err) + return nil, nil, fmt.Errorf("new jetstream: %w", err) } - return js, nil + return js, nc, nil } diff --git a/internal/controller/consumer_controller.go b/internal/controller/consumer_controller.go index 6dbe7ba8..22905065 100644 --- a/internal/controller/consumer_controller.go +++ b/internal/controller/consumer_controller.go @@ -18,21 +18,15 @@ package controller import ( "context" - "github.com/nats-io/nats.go/jetstream" "k8s.io/klog/v2" jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" - "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" ) // ConsumerReconciler reconciles a Consumer object type ConsumerReconciler struct { - client.Client - Scheme *runtime.Scheme - Config *Config - JetStream jetstream.JetStream + JetStreamController } // Reconcile is part of the main kubernetes reconciliation loop which aims to diff --git a/internal/controller/consumer_controller_test.go b/internal/controller/consumer_controller_test.go index 1ffce13e..6ce92bcc 100644 --- a/internal/controller/consumer_controller_test.go +++ b/internal/controller/consumer_controller_test.go @@ -75,8 +75,7 @@ var _ = Describe("Consumer Controller", func() { It("should successfully reconcile the resource", func() { By("Reconciling the created resource") controllerReconciler := &ConsumerReconciler{ - Client: k8sClient, - Scheme: k8sClient.Scheme(), + baseController, } _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ diff --git a/internal/controller/helpers_test.go b/internal/controller/helpers_test.go new file mode 100644 index 00000000..8235f11c --- /dev/null +++ b/internal/controller/helpers_test.go @@ -0,0 +1,43 @@ +package controller + +import ( + api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" + "github.com/nats-io/nats-server/v2/server" + natsserver "github.com/nats-io/nats-server/v2/test" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + + "os" + "time" +) + +func assertReadyStateMatches(condition api.Condition, status v1.ConditionStatus, reason string, message string, transitionTime time.Time) { + GinkgoHelper() + + Expect(condition.Type).To(Equal(readyCondType)) + Expect(condition.Status).To(Equal(status)) + Expect(condition.Reason).To(Equal(reason)) + Expect(condition.Message).To(ContainSubstring(message)) + + // Assert valid transition time + t, err := time.Parse(time.RFC3339Nano, condition.LastTransitionTime) + Expect(err).NotTo(HaveOccurred()) + Expect(t).To(BeTemporally("~", transitionTime, time.Second)) +} + +func CreateTestServer() *server.Server { + opts := &natsserver.DefaultTestOptions + opts.JetStream = true + opts.Port = -1 + opts.Debug = true + + dir, err := os.MkdirTemp("", "nats-*") + Expect(err).NotTo(HaveOccurred()) + opts.StoreDir = dir + + ns := natsserver.RunServer(opts) + Expect(err).NotTo(HaveOccurred()) + + return ns +} diff --git a/internal/controller/jetstream_controller.go b/internal/controller/jetstream_controller.go new file mode 100644 index 00000000..3796fc47 --- /dev/null +++ b/internal/controller/jetstream_controller.go @@ -0,0 +1,169 @@ +package controller + +import ( + "fmt" + js "github.com/nats-io/nack/controllers/jetstream" + api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" + "github.com/nats-io/nats.go/jetstream" + v1 "k8s.io/api/core/v1" + "strings" + + "sigs.k8s.io/controller-runtime/pkg/client" + "time" +) + +type connectionOptions struct { + Account string `json:"account"` + Creds string `json:"creds"` + Nkey string `json:"nkey"` + Servers []string `json:"servers"` + TLS api.TLS `json:"tls"` +} + +type JetStreamController interface { + client.Client + + // ReadOnly returns true when no changes should be made by the controller. + ReadOnly() bool + + // ValidNamespace ok if the controllers namespace restriction allows the given namespace. + ValidNamespace(namespace string) bool + + // WithJetStreamClient provides a jetStream client to the given operation. + // The client uses the controllers connection configuration merged with opts. + // + // The given opts values take precedence over the controllers base configuration. + // + // Returns the error of the operation or errors during client setup. + WithJetStreamClient(opts *connectionOptions, op func(js jetstream.JetStream) error) error +} + +func NewJSController(k8sClient client.Client, natsConfig *NatsConfig, controllerConfig *Config) (JetStreamController, error) { + + return &jsController{ + Client: k8sClient, + config: natsConfig, + controllerConfig: controllerConfig, + }, nil +} + +type jsController struct { + client.Client + config *NatsConfig + controllerConfig *Config +} + +func (c *jsController) ReadOnly() bool { + return c.controllerConfig.ReadOnly +} + +func (c *jsController) ValidNamespace(namespace string) bool { + ns := c.controllerConfig.Namespace + return ns == "" || ns == namespace +} + +func (c *jsController) WithJetStreamClient(opts *connectionOptions, op func(js jetstream.JetStream) error) error { + + // Build single use client + // TODO(future-feature): Use client-pool instead of single use client + cfg := c.buildNatsConfig(opts) + + jsClient, closer, err := CreateJetStreamClient(cfg, true) + if err != nil { + return fmt.Errorf("create jetstream client: %w", err) + } + defer closer.Close() + + return op(jsClient) +} + +// buildNatsConfig uses given opts to override the base NatsConfig. +func (c *jsController) buildNatsConfig(opts *connectionOptions) *NatsConfig { + + serverUrls := strings.Join(opts.Servers, ",") + + // Takes opts values if present + cfg := &NatsConfig{ + CRDConnect: false, + ClientName: c.config.ClientName, + ServerURL: or(serverUrls, c.config.ServerURL), + TLSFirst: c.config.TLSFirst, // TODO(future-feature): expose TLSFirst in the spec config + } + + // Note: The opts.Account value coming from the resource spec is currently not considered. + // creds/nkey are associated with an account, the account field might be redundant. + // See https://github.com/nats-io/nack/pull/211#pullrequestreview-2511111670 + + // Authentication either from opts or base config + if opts.Creds != "" || opts.Nkey != "" { + cfg.Credentials = opts.Creds + cfg.NKey = opts.Nkey + } else { + cfg.Credentials = c.config.Credentials + cfg.NKey = c.config.NKey + } + + // CAs from opts or base config + if len(opts.TLS.RootCAs) > 0 { + cfg.CAs = opts.TLS.RootCAs + } else { + cfg.CAs = c.config.CAs + } + + // Client Cert and Key either from opts or base config + if opts.TLS.ClientCert != "" && opts.TLS.ClientKey != "" { + cfg.Certificate = opts.TLS.ClientCert + cfg.Key = opts.TLS.ClientKey + } else { + cfg.Certificate = c.config.Certificate + cfg.Key = c.config.Key + } + + return cfg +} + +// or returns the value if it is not the null value. Otherwise, the fallback value is returned +func or[T comparable](v T, fallback T) T { + if v == *new(T) { + return fallback + } + return v +} + +// updateReadyCondition returns the given conditions with an added or updated ready condition. +func updateReadyCondition(conditions []api.Condition, status v1.ConditionStatus, reason string, message string) []api.Condition { + + var currentStatus v1.ConditionStatus + var lastTransitionTime string + for _, condition := range conditions { + if condition.Type == readyCondType { + currentStatus = condition.Status + lastTransitionTime = condition.LastTransitionTime + break + } + } + + // Set transition time to now, when no previous ready condition or the status changed + if lastTransitionTime == "" || currentStatus != status { + lastTransitionTime = time.Now().UTC().Format(time.RFC3339Nano) + } + + newCondition := api.Condition{ + Type: readyCondType, + Status: status, + Reason: reason, + Message: message, + LastTransitionTime: lastTransitionTime, + } + if conditions == nil { + return []api.Condition{newCondition} + } else { + return js.UpsertCondition(conditions, newCondition) + } +} + +// asJsonString returns the given string wrapped in " and converted to []byte. +// Helper for mapping spec config to jetStream config using UnmarshalJSON. +func asJsonString(v string) []byte { + return []byte("\"" + v + "\"") +} diff --git a/internal/controller/jetstream_controller_test.go b/internal/controller/jetstream_controller_test.go new file mode 100644 index 00000000..d7176d04 --- /dev/null +++ b/internal/controller/jetstream_controller_test.go @@ -0,0 +1,138 @@ +package controller + +import ( + api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + "testing" + "time" +) + +func Test_updateReadyCondition(t *testing.T) { + + pastTransition := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339Nano) + updatedTransition := "now" + + otherCondition := api.Condition{ + Type: "other", + Status: v1.ConditionFalse, + Reason: "Reason", + Message: "Message", + LastTransitionTime: pastTransition, + } + + type args struct { + conditions []api.Condition + status v1.ConditionStatus + reason string + message string + } + tests := []struct { + name string + args args + want []api.Condition + }{ + { + name: "new ready condition", + args: args{ + conditions: nil, + status: v1.ConditionTrue, + reason: "Test", + message: "Test Message", + }, + want: []api.Condition{ + { + Type: readyCondType, + Status: v1.ConditionTrue, + Reason: "Test", + Message: "Test Message", + LastTransitionTime: updatedTransition, + }, + }, + }, + { + name: "update ready condition", + args: args{ + conditions: []api.Condition{ + otherCondition, + { + Type: readyCondType, + Status: v1.ConditionFalse, + Reason: "Test", + Message: "Test Message", + LastTransitionTime: pastTransition, + }, + }, + status: v1.ConditionTrue, + reason: "New Reason", + message: "New Message", + }, + want: []api.Condition{ + otherCondition, + { + Type: readyCondType, + Status: v1.ConditionTrue, + Reason: "New Reason", + Message: "New Message", + LastTransitionTime: updatedTransition, + }, + }, + }, + { + name: "should not update transition time when status is not changed", + args: args{ + conditions: []api.Condition{ + otherCondition, + { + Type: readyCondType, + Status: v1.ConditionTrue, + Reason: "Test", + Message: "Test Message", + LastTransitionTime: pastTransition, + }, + }, + status: v1.ConditionTrue, + reason: "New Reason", + message: "New Message", + }, + want: []api.Condition{ + otherCondition, + { + Type: readyCondType, + Status: v1.ConditionTrue, + Reason: "New Reason", + Message: "New Message", + LastTransitionTime: pastTransition, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert := assert.New(t) + + got := updateReadyCondition(tt.args.conditions, tt.args.status, tt.args.reason, tt.args.message) + + assert.Len(got, len(tt.want)) + for i, want := range tt.want { + actual := got[i] + + assert.Equal(actual.Type, want.Type) + assert.Equal(actual.Status, want.Status) + assert.Equal(actual.Reason, want.Reason) + assert.Equal(actual.Message, want.Message) + + // Assert transition time was updated + if want.LastTransitionTime == updatedTransition { + actualTransitionTime, err := time.Parse(time.RFC3339Nano, actual.LastTransitionTime) + assert.NoError(err) + assert.WithinDuration(actualTransitionTime, time.Now(), 5*time.Second) + } + // Assert transition time was not updated + if want.LastTransitionTime == pastTransition { + assert.Equal(pastTransition, actual.LastTransitionTime) + } + } + }) + } +} diff --git a/internal/controller/register.go b/internal/controller/register.go index 611ff157..5fa21b5a 100644 --- a/internal/controller/register.go +++ b/internal/controller/register.go @@ -20,36 +20,26 @@ type Config struct { // controllerCfg defines behaviour of the registered controllers. func RegisterAll(mgr ctrl.Manager, clientConfig *NatsConfig, config *Config) error { - // Controllers need access to a nats client in pedantic mode - js, err := CreateJetStreamClient(clientConfig, true) + baseController, err := NewJSController(mgr.GetClient(), clientConfig, config) if err != nil { - return fmt.Errorf("create jetstream client: %w", err) + return fmt.Errorf("create base jetstream controller: %w", err) } // Register controllers if err := (&AccountReconciler{ - Client: mgr.GetClient(), - Config: config, - Scheme: mgr.GetScheme(), - JetStream: js, + baseController, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("unable to create account controller: %w", err) } if err := (&ConsumerReconciler{ - Client: mgr.GetClient(), - Config: config, - Scheme: mgr.GetScheme(), - JetStream: js, + baseController, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("unable to create consumer controller: %w", err) } if err := (&StreamReconciler{ - Client: mgr.GetClient(), - Config: config, - Scheme: mgr.GetScheme(), - JetStream: js, + JetStreamController: baseController, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("unable to create stream controller: %w", err) } diff --git a/internal/controller/stream_controller.go b/internal/controller/stream_controller.go index 4469b7f0..1766c8ba 100644 --- a/internal/controller/stream_controller.go +++ b/internal/controller/stream_controller.go @@ -18,39 +18,371 @@ package controller import ( "context" + "errors" + "fmt" + "github.com/go-logr/logr" + api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" "github.com/nats-io/nats.go/jetstream" - "k8s.io/apimachinery/pkg/runtime" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - - jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "time" ) // StreamReconciler reconciles a Stream object type StreamReconciler struct { - client.Client - Scheme *runtime.Scheme - Config *Config - JetStream jetstream.JetStream + JetStreamController } // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // -// For more details, check Reconcile and its Result here: -// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.3/pkg/reconcile +// It performs three main operations: +// - Initialize finalizer and ready condition if not present +// - Delete stream if it is marked for deletion. +// - Create or Update the stream +// +// A call to reconcile may perform only one action, expecting the reconciliation to be triggered again by an update. +// For example: Setting the finalizer triggers a second reconciliation. Reconcile returns after setting the finalizer, +// to prevent parallel reconciliations performing the same steps. func (r *StreamReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := klog.FromContext(ctx) - log.Info("reconcile", "namespace", req.Namespace, "name", req.Name) - // TODO(user): your logic here + if ok := r.ValidNamespace(req.Namespace); !ok { + log.Info("Controller restricted to namespace, skipping reconciliation.") + return ctrl.Result{}, nil + } + + // Fetch stream resource + stream := &api.Stream{} + if err := r.Get(ctx, req.NamespacedName, stream); err != nil { + if apierrors.IsNotFound(err) { + log.Info("Stream resource not found. Ignoring since object must be deleted.") + return ctrl.Result{}, nil + } + return ctrl.Result{}, fmt.Errorf("get stream resource '%s': %w", req.NamespacedName.String(), err) + } + + log = log.WithValues("streamName", stream.Spec.Name) + + // Update ready status to unknown when no status is set + if stream.Status.Conditions == nil || len(stream.Status.Conditions) == 0 { + log.Info("Setting initial ready condition to unknown.") + stream.Status.Conditions = updateReadyCondition(stream.Status.Conditions, v1.ConditionUnknown, "Reconciling", "Starting reconciliation") + err := r.Status().Update(ctx, stream) + if err != nil { + return ctrl.Result{}, fmt.Errorf("set condition unknown: %w", err) + } + return ctrl.Result{Requeue: true}, nil + } + + // Add finalizer + if !controllerutil.ContainsFinalizer(stream, streamFinalizer) { + log.Info("Adding stream finalizer.") + if ok := controllerutil.AddFinalizer(stream, streamFinalizer); !ok { + return ctrl.Result{}, errors.New("failed to add finalizer to stream resource") + } + + if err := r.Update(ctx, stream); err != nil { + return ctrl.Result{}, fmt.Errorf("update stream resource to add finalizer: %w", err) + } + return ctrl.Result{}, nil + } + + // Check Deletion + markedForDeletion := stream.GetDeletionTimestamp() != nil + if markedForDeletion { + if controllerutil.ContainsFinalizer(stream, streamFinalizer) { + err := r.deleteStream(ctx, log, stream) + if err != nil { + return ctrl.Result{}, fmt.Errorf("delete stream: %w", err) + } + } else { + log.Info("Stream marked for deletion and already finalized. Ignoring.") + } + + return ctrl.Result{}, nil + } + + // Create or update stream + if err := r.createOrUpdate(ctx, log, stream); err != nil { + return ctrl.Result{}, fmt.Errorf("create or update: %s", err) + } return ctrl.Result{}, nil } +func (r *StreamReconciler) deleteStream(ctx context.Context, log logr.Logger, stream *api.Stream) error { + + // Set status to not false + stream.Status.Conditions = updateReadyCondition(stream.Status.Conditions, v1.ConditionFalse, "Finalizing", "Performing finalizer operations.") + if err := r.Status().Update(ctx, stream); err != nil { + return fmt.Errorf("update ready condition: %w", err) + } + + if !stream.Spec.PreventDelete && !r.ReadOnly() { + log.Info("Deleting stream.") + err := r.WithJetStreamClient(streamConnOpts(stream.Spec), func(js jetstream.JetStream) error { + return js.DeleteStream(ctx, stream.Spec.Name) + }) + if errors.Is(err, jetstream.ErrStreamNotFound) { + log.Info("Stream does not exist, unable to delete.", "streamName", stream.Spec.Name) + } else if err != nil { + return fmt.Errorf("delete stream during finalization: %w", err) + } + } else { + log.Info("Skipping stream deletion.", + "preventDelete", stream.Spec.PreventDelete, + "read-only", r.ReadOnly(), + ) + } + + log.Info("Removing stream finalizer.") + if ok := controllerutil.RemoveFinalizer(stream, streamFinalizer); !ok { + return errors.New("failed to remove stream finalizer") + } + if err := r.Update(ctx, stream); err != nil { + return fmt.Errorf("remove finalizer: %w", err) + } + + return nil +} + +func (r *StreamReconciler) createOrUpdate(ctx context.Context, log logr.Logger, stream *api.Stream) error { + + // Create or Update the stream based on the spec + if stream.Spec.PreventUpdate || r.ReadOnly() { + log.Info("Skipping stream creation or update.", + "preventDelete", stream.Spec.PreventDelete, + "read-only", r.ReadOnly(), + ) + return nil + } + + // Map spec to stream targetConfig + targetConfig, err := streamSpecToConfig(&stream.Spec) + if err != nil { + return fmt.Errorf("map spec to stream targetConfig: %w", err) + } + + // CreateOrUpdateStream is called on every reconciliation when the stream is not to be deleted. + // TODO(future-feature): Do we need to check if config differs? + err = r.WithJetStreamClient(streamConnOpts(stream.Spec), func(js jetstream.JetStream) error { + log.Info("Creating or updating stream.") + _, err = js.CreateOrUpdateStream(ctx, targetConfig) + return err + }) + if err != nil { + err = fmt.Errorf("create or update stream: %w", err) + stream.Status.Conditions = updateReadyCondition(stream.Status.Conditions, v1.ConditionFalse, "Errored", err.Error()) + if err := r.Status().Update(ctx, stream); err != nil { + log.Error(err, "Failed to update ready condition to Errored.") + } + return err + } + + // update the observed generation and ready status + stream.Status.ObservedGeneration = stream.Generation + stream.Status.Conditions = updateReadyCondition( + stream.Status.Conditions, + v1.ConditionTrue, + "Reconciling", + "Stream successfully created or updated.", + ) + err = r.Status().Update(ctx, stream) + if err != nil { + return fmt.Errorf("update ready condition: %w", err) + } + + return nil +} + +// streamConnOpts extracts nats connection relevant fields from the given stream spec as connectionOptions. +func streamConnOpts(spec api.StreamSpec) *connectionOptions { + return &connectionOptions{ + Account: spec.Account, + Creds: spec.Creds, + Nkey: spec.Nkey, + Servers: spec.Servers, + TLS: spec.TLS, + } +} + +// streamSpecToConfig creates a jetstream.StreamConfig matching the given stream resource spec +func streamSpecToConfig(spec *api.StreamSpec) (jetstream.StreamConfig, error) { + + // Set directly mapped fields + config := jetstream.StreamConfig{ + Name: spec.Name, + Description: spec.Description, + Subjects: spec.Subjects, + MaxConsumers: spec.MaxConsumers, + MaxMsgs: int64(spec.MaxMsgs), + MaxBytes: int64(spec.MaxBytes), + DiscardNewPerSubject: spec.DiscardPerSubject, + MaxMsgsPerSubject: int64(spec.MaxMsgsPerSubject), + MaxMsgSize: int32(spec.MaxMsgSize), + Replicas: spec.Replicas, + NoAck: spec.NoAck, + DenyDelete: spec.DenyDelete, + DenyPurge: spec.DenyPurge, + AllowRollup: spec.AllowRollup, + FirstSeq: spec.FirstSequence, + AllowDirect: spec.AllowDirect, + // Explicitly set not (yet) mapped fields + Sealed: false, + MirrorDirect: false, + ConsumerLimits: jetstream.StreamConsumerLimits{}, + } + + // Set not directly mapped fields + + // retention + if spec.Retention != "" { + // Wrap string in " to be properly unmarshalled as json string + err := config.Retention.UnmarshalJSON(asJsonString(spec.Retention)) + if err != nil { + return jetstream.StreamConfig{}, fmt.Errorf("invalid retention policy: %w", err) + } + } + + // discard + if spec.Discard != "" { + err := config.Discard.UnmarshalJSON(asJsonString(spec.Discard)) + if err != nil { + return jetstream.StreamConfig{}, fmt.Errorf("invalid retention policy: %w", err) + } + } + + // maxAge + if spec.MaxAge != "" { + d, err := time.ParseDuration(spec.MaxAge) + if err != nil { + return jetstream.StreamConfig{}, fmt.Errorf("parse max age: %w", err) + } + config.MaxAge = d + } + // storage + if spec.Storage != "" { + err := config.Storage.UnmarshalJSON(asJsonString(spec.Storage)) + if err != nil { + return jetstream.StreamConfig{}, fmt.Errorf("invalid storage: %w", err) + } + } + + // duplicates + if spec.DuplicateWindow != "" { + d, err := time.ParseDuration(spec.DuplicateWindow) + if err != nil { + return jetstream.StreamConfig{}, fmt.Errorf("parse duplicate window: %w", err) + } + config.Duplicates = d + } + + // placement + if spec.Placement != nil { + config.Placement = &jetstream.Placement{ + Cluster: spec.Placement.Cluster, + Tags: spec.Placement.Tags, + } + } + + // mirror + if spec.Mirror != nil { + ss, err := mapStreamSource(spec.Mirror) + if err != nil { + return jetstream.StreamConfig{}, fmt.Errorf("map mirror stream soruce: %w", err) + } + config.Mirror = ss + } + + // sources + if spec.Sources != nil { + config.Sources = []*jetstream.StreamSource{} + for _, source := range spec.Sources { + s, err := mapStreamSource(source) + if err != nil { + return jetstream.StreamConfig{}, fmt.Errorf("map stream soruce: %w", err) + } + config.Sources = append(config.Sources, s) + } + } + + // compression + if spec.Compression != "" { + err := config.Compression.UnmarshalJSON(asJsonString(spec.Compression)) + if err != nil { + return jetstream.StreamConfig{}, fmt.Errorf("invalid compression: %w", err) + } + } + + // subjectTransform + if spec.SubjectTransform != nil { + config.SubjectTransform = &jetstream.SubjectTransformConfig{ + Source: spec.SubjectTransform.Source, + Destination: spec.SubjectTransform.Dest, + } + } + + // rePublish + if spec.Republish != nil { + config.RePublish = &jetstream.RePublish{ + Source: spec.Republish.Source, + Destination: spec.Republish.Destination, + HeadersOnly: spec.Republish.HeadersOnly, + } + } + + // metadata + if spec.Metadata != nil { + config.Metadata = spec.Metadata + } + + return config, nil +} + +func mapStreamSource(ss *api.StreamSource) (*jetstream.StreamSource, error) { + jss := &jetstream.StreamSource{ + Name: ss.Name, + FilterSubject: ss.FilterSubject, + } + + if ss.OptStartSeq > 0 { + jss.OptStartSeq = uint64(ss.OptStartSeq) + } + if ss.OptStartTime != "" { + t, err := time.Parse(time.RFC3339, ss.OptStartTime) + if err != nil { + return nil, fmt.Errorf("parse opt start time: %w", err) + } + jss.OptStartTime = &t + } + + if ss.ExternalAPIPrefix != "" || ss.ExternalDeliverPrefix != "" { + jss.External = &jetstream.ExternalStream{ + APIPrefix: ss.ExternalAPIPrefix, + DeliverPrefix: ss.ExternalDeliverPrefix, + } + } + + for _, transform := range ss.SubjectTransforms { + jss.SubjectTransforms = append(jss.SubjectTransforms, jetstream.SubjectTransformConfig{ + Source: transform.Source, + Destination: transform.Dest, + }) + } + + return jss, nil +} + // SetupWithManager sets up the controller with the Manager. func (r *StreamReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). - For(&jetstreamnatsiov1beta2.Stream{}). + For(&api.Stream{}). + Owns(&api.Stream{}). + // Only trigger on generation changes + WithEventFilter(predicate.GenerationChangedPredicate{}). Complete(r) } diff --git a/internal/controller/stream_controller_test.go b/internal/controller/stream_controller_test.go index ddd0e1c8..e80d327b 100644 --- a/internal/controller/stream_controller_test.go +++ b/internal/controller/stream_controller_test.go @@ -17,74 +17,729 @@ limitations under the License. package controller import ( + "github.com/nats-io/nats.go/jetstream" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "k8s.io/apimachinery/pkg/api/errors" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "testing" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" + api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" ) var _ = Describe("Stream Controller", func() { - Context("When reconciling a resource", func() { - const resourceName = "test-resource" - typeNamespacedName := types.NamespacedName{ - Name: resourceName, - Namespace: "default", // TODO(user):Modify as needed + // The test stream resource + const resourceName = "test-stream" + const streamName = "orders" + typeNamespacedName := types.NamespacedName{ + Name: resourceName, + Namespace: "default", + } + stream := &api.Stream{} + + // The tested controller + var controller *StreamReconciler + + // Config to create minimal nats stream + emptyStreamConfig := jetstream.StreamConfig{ + Name: streamName, + Replicas: 1, + Retention: jetstream.WorkQueuePolicy, + Discard: jetstream.DiscardOld, + Storage: jetstream.FileStorage, + } + + BeforeEach(func(ctx SpecContext) { + By("creating a test stream resource") + err := k8sClient.Get(ctx, typeNamespacedName, stream) + if err != nil && k8serrors.IsNotFound(err) { + resource := &api.Stream{ + ObjectMeta: metav1.ObjectMeta{ + Name: resourceName, + Namespace: "default", + }, + Spec: api.StreamSpec{ + Name: streamName, + Replicas: 1, + Subjects: []string{"tests.*"}, + Description: "test stream", + Retention: "workqueue", + Discard: "old", + Storage: "file", + }, + } + Expect(k8sClient.Create(ctx, resource)).To(Succeed()) + // Re-fetch stream + Expect(k8sClient.Get(ctx, typeNamespacedName, stream)).To(Succeed()) } - stream := &jetstreamnatsiov1beta2.Stream{} + By("checking precondition: nats stream does not exist") + _, err = jsClient.Stream(ctx, streamName) + Expect(err).To(MatchError(jetstream.ErrStreamNotFound)) + + By("setting up the tested controller") + controller = &StreamReconciler{ + baseController, + } + }) + + AfterEach(func(ctx SpecContext) { + By("removing the test stream resource") + resource := &api.Stream{} + err := k8sClient.Get(ctx, typeNamespacedName, resource) + if err != nil { + Expect(err).To(MatchError(k8serrors.IsNotFound, "Is not found")) + } else { + if controllerutil.ContainsFinalizer(resource, streamFinalizer) { + By("removing the finalizer") + controllerutil.RemoveFinalizer(resource, streamFinalizer) + Expect(k8sClient.Update(ctx, resource)).To(Succeed()) + } + + By("removing the stream resource") + Expect(k8sClient.Delete(ctx, resource)). + To(SatisfyAny( + Succeed(), + MatchError(k8serrors.IsNotFound, "is not found"), + )) + } + + By("deleting the nats stream") + Expect(jsClient.DeleteStream(ctx, streamName)). + To(SatisfyAny( + Succeed(), + MatchError(jetstream.ErrStreamNotFound), + )) + }) + + When("reconciling a not existing resource", func() { + It("should stop reconciliation without error", func(ctx SpecContext) { + By("reconciling the created resource") + result, err := controller.Reconcile(ctx, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: "fake", + Name: "not-existing", + }, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal(ctrl.Result{})) + }) + }) + + When("reconciling a not initialized resource", func() { + + It("should initialize a new resource", func(ctx SpecContext) { + + By("re-queueing until it is initialized") + // Initialization can require multiple reconciliation loops + Eventually(func(ctx SpecContext) *api.Stream { + _, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + got := &api.Stream{} + Expect(k8sClient.Get(ctx, typeNamespacedName, got)).To(Succeed()) + return got + }).WithContext(ctx). + Should(SatisfyAll( + HaveField("Finalizers", HaveExactElements(streamFinalizer)), + HaveField("Status.Conditions", Not(BeEmpty())), + )) + + By("validating the ready condition") + // Fetch stream + Expect(k8sClient.Get(ctx, typeNamespacedName, stream)).To(Succeed()) + Expect(stream.Status.Conditions).To(HaveLen(1)) + + assertReadyStateMatches(stream.Status.Conditions[0], v1.ConditionUnknown, "Reconciling", "Starting reconciliation", time.Now()) + }) + + }) + + When("reconciling an initialized resource", func() { BeforeEach(func(ctx SpecContext) { - By("creating the custom resource for the Kind Stream") - err := k8sClient.Get(ctx, typeNamespacedName, stream) - if err != nil && errors.IsNotFound(err) { - resource := &jetstreamnatsiov1beta2.Stream{ - ObjectMeta: metav1.ObjectMeta{ - Name: resourceName, - Namespace: "default", - }, - Spec: jetstreamnatsiov1beta2.StreamSpec{ - Name: "test-stream", - Replicas: 1, - Discard: "old", - Storage: "file", - Retention: "workqueue", - }, + By("initializing the stream resource") + + By("setting the finalizer") + Expect(controllerutil.AddFinalizer(stream, streamFinalizer)).To(BeTrue()) + Expect(k8sClient.Update(ctx, stream)).To(Succeed()) + + By("setting an unknown ready state") + stream.Status.Conditions = []api.Condition{{ + Type: readyCondType, + Status: v1.ConditionUnknown, + Reason: "Test", + Message: "start condition", + LastTransitionTime: time.Now().Format(time.RFC3339Nano), + }} + Expect(k8sClient.Status().Update(ctx, stream)).To(Succeed()) + + }) + + It("should create a new stream", func(ctx SpecContext) { - // TODO(user): Specify other spec details if needed. + By("running Reconcile") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + // Fetch resource + Expect(k8sClient.Get(ctx, typeNamespacedName, stream)).To(Succeed()) + + By("checking if the ready state was updated") + Expect(stream.Status.Conditions).To(HaveLen(1)) + assertReadyStateMatches(stream.Status.Conditions[0], v1.ConditionTrue, "Reconciling", "created or updated", time.Now()) + + By("checking if the observed generation matches") + Expect(stream.Status.ObservedGeneration).To(Equal(stream.Generation)) + + By("checking if the stream was created") + natsStream, err := jsClient.Stream(ctx, streamName) + Expect(err).NotTo(HaveOccurred()) + streamInfo, err := natsStream.Info(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(streamInfo.Config.Name).To(Equal(streamName)) + Expect(streamInfo.Config.Description).To(Equal("test stream")) + Expect(streamInfo.Created).To(BeTemporally("~", time.Now(), time.Second)) + }) + + When("PreventUpdate is set", func() { + + BeforeEach(func(ctx SpecContext) { + By("setting preventDelete on the resource") + stream.Spec.PreventUpdate = true + Expect(k8sClient.Update(ctx, stream)).To(Succeed()) + }) + It("should not create the stream", func(ctx SpecContext) { + By("running Reconcile") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that no stream was created") + _, err = jsClient.Stream(ctx, streamName) + Expect(err).To(MatchError(jetstream.ErrStreamNotFound)) + }) + It("should not update the stream", func(ctx SpecContext) { + By("creating the stream") + _, err := jsClient.CreateStream(ctx, emptyStreamConfig) + Expect(err).NotTo(HaveOccurred()) + + By("running Reconcile") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that stream was not updated") + s, err := jsClient.Stream(ctx, streamName) + Expect(s.CachedInfo().Config.Description).To(BeEmpty()) + }) + }) + + When("read-only mode is enabled", func() { + + BeforeEach(func(ctx SpecContext) { + By("setting read only on the controller") + readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{ReadOnly: true}) + Expect(err).NotTo(HaveOccurred()) + controller = &StreamReconciler{ + JetStreamController: readOnly, } - Expect(k8sClient.Create(ctx, resource)).To(Succeed()) - } + }) + + It("should not create the stream", func(ctx SpecContext) { + By("running Reconcile") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that no stream was created") + _, err = jsClient.Stream(ctx, streamName) + Expect(err).To(MatchError(jetstream.ErrStreamNotFound)) + }) + It("should not update the stream", func(ctx SpecContext) { + By("creating the stream") + _, err := jsClient.CreateStream(ctx, emptyStreamConfig) + Expect(err).NotTo(HaveOccurred()) + + By("running Reconcile") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that stream was not updated") + s, err := jsClient.Stream(ctx, streamName) + Expect(s.CachedInfo().Config.Description).To(BeEmpty()) + }) + }) + + When("namespace restriction is enabled", func() { + + BeforeEach(func(ctx SpecContext) { + By("setting a namespace on the resource") + namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{Namespace: "other-namespace"}) + Expect(err).NotTo(HaveOccurred()) + controller = &StreamReconciler{ + JetStreamController: namespaced, + } + }) + + It("should not create the stream", func(ctx SpecContext) { + By("running Reconcile") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that no stream was created") + _, err = jsClient.Stream(ctx, streamName) + Expect(err).To(MatchError(jetstream.ErrStreamNotFound)) + }) + It("should not update the stream", func(ctx SpecContext) { + By("creating the stream") + _, err := jsClient.CreateStream(ctx, emptyStreamConfig) + Expect(err).NotTo(HaveOccurred()) + + By("running Reconcile") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that stream was not updated") + s, err := jsClient.Stream(ctx, streamName) + Expect(s.CachedInfo().Config.Description).To(BeEmpty()) + }) }) - AfterEach(func(ctx SpecContext) { - // TODO(user): Cleanup logic after each test, like removing the resource instance. - resource := &jetstreamnatsiov1beta2.Stream{} - err := k8sClient.Get(ctx, typeNamespacedName, resource) + It("should update an existing stream", func(ctx SpecContext) { + By("reconciling once to create the stream") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + // Fetch resource + Expect(k8sClient.Get(ctx, typeNamespacedName, stream)).To(Succeed()) + previousTransitionTime := stream.Status.Conditions[0].LastTransitionTime + + By("updating the resource") + stream.Spec.Description = "new description" + Expect(k8sClient.Update(ctx, stream)).To(Succeed()) + + By("reconciling the updated resource") + result, err = controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) - By("Cleanup the specific resource instance Stream") - Expect(k8sClient.Delete(ctx, resource)).To(Succeed()) + // Fetch resource + Expect(k8sClient.Get(ctx, typeNamespacedName, stream)).To(Succeed()) + + By("checking if the state transition time was not updated") + Expect(stream.Status.Conditions).To(HaveLen(1)) + Expect(stream.Status.Conditions[0].LastTransitionTime).To(Equal(previousTransitionTime)) + + By("checking if the observed generation matches") + Expect(stream.Status.ObservedGeneration).To(Equal(stream.Generation)) + + By("checking if the stream was updated") + natsStream, err := jsClient.Stream(ctx, streamName) + Expect(err).NotTo(HaveOccurred()) + + streamInfo, err := natsStream.Info(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(streamInfo.Config.Description).To(Equal("new description")) + // Other fields unchanged + Expect(streamInfo.Config.Subjects).To(Equal([]string{"tests.*"})) }) - It("should successfully reconcile the resource", func(ctx SpecContext) { - By("Reconciling the created resource") - controllerReconciler := &StreamReconciler{ - Client: k8sClient, - Scheme: k8sClient.Scheme(), + It("should set an error state when the nats server is not available", func(ctx SpecContext) { + + By("setting up controller with unavailable nats server") + // Setup client for not running server + // Use actual test server to ensure port not used by other service on test instance + sv := CreateTestServer() + base, err := NewJSController(k8sClient, &NatsConfig{ServerURL: sv.ClientURL()}, &Config{}) + Expect(err).NotTo(HaveOccurred()) + sv.Shutdown() + + controller := &StreamReconciler{ + base, } - _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ + By("reconciling resource") + result, err := controller.Reconcile(ctx, reconcile.Request{ NamespacedName: typeNamespacedName, }) + Expect(result).To(Equal(ctrl.Result{})) + Expect(err).To(HaveOccurred()) // Will be re-queued with back-off + + // Fetch resource + err = k8sClient.Get(ctx, typeNamespacedName, stream) Expect(err).NotTo(HaveOccurred()) - // TODO(user): Add more specific assertions depending on your controller's reconciliation logic. - // Example: If you expect a certain status condition after reconciliation, verify it here. + + By("checking if the status was updated") + Expect(stream.Status.Conditions).To(HaveLen(1)) + assertReadyStateMatches( + stream.Status.Conditions[0], + v1.ConditionFalse, + "Errored", + "create or update stream:", + time.Now(), + ) + + By("checking if the observed generation does not match") + Expect(stream.Status.ObservedGeneration).ToNot(Equal(stream.Generation)) + }) + + When("the resource is marked for deletion", func() { + + BeforeEach(func(ctx SpecContext) { + By("marking the resource for deletion") + Expect(k8sClient.Delete(ctx, stream)).To(Succeed()) + Expect(k8sClient.Get(ctx, typeNamespacedName, stream)).To(Succeed()) // re-fetch after update + }) + + It("should succeed deleting a not existing stream", func(ctx SpecContext) { + By("reconciling") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that the resource is deleted") + Eventually(k8sClient.Get). + WithArguments(ctx, typeNamespacedName, stream). + ShouldNot(Succeed()) + }) + + When("the underlying stream exists", func() { + BeforeEach(func(ctx SpecContext) { + By("creating the stream on the nats server") + _, err := jsClient.CreateStream(ctx, emptyStreamConfig) + Expect(err).NotTo(HaveOccurred()) + }) + + AfterEach(func(ctx SpecContext) { + err := jsClient.DeleteStream(ctx, streamName) + if err != nil { + Expect(err).To(MatchError(jetstream.ErrStreamNotFound)) + } + }) + + It("should delete the stream", func(ctx SpecContext) { + By("reconciling") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that the stream is deleted") + _, err = jsClient.Stream(ctx, streamName) + Expect(err).To(MatchError(jetstream.ErrStreamNotFound)) + + By("checking that the resource is deleted") + Eventually(k8sClient.Get). + WithArguments(ctx, typeNamespacedName, stream). + ShouldNot(Succeed()) + }) + + When("PreventDelete is set", func() { + BeforeEach(func(ctx SpecContext) { + By("setting preventDelete on the resource") + stream.Spec.PreventDelete = true + Expect(k8sClient.Update(ctx, stream)).To(Succeed()) + }) + It("Should delete the resource and not delete the nats stream", func(ctx SpecContext) { + By("reconciling") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that the stream is not deleted") + _, err = jsClient.Stream(ctx, streamName) + Expect(err).NotTo(HaveOccurred()) + + By("checking that the resource is deleted") + Eventually(k8sClient.Get). + WithArguments(ctx, typeNamespacedName, stream). + ShouldNot(Succeed()) + }) + }) + + When("read only is set", func() { + BeforeEach(func(ctx SpecContext) { + By("setting read only on the controller") + readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{ReadOnly: true}) + Expect(err).NotTo(HaveOccurred()) + controller = &StreamReconciler{ + JetStreamController: readOnly, + } + }) + It("should delete the resource and not delete the stream", func(ctx SpecContext) { + + By("reconciling") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that the stream is not deleted") + _, err = jsClient.Stream(ctx, streamName) + Expect(err).NotTo(HaveOccurred()) + + By("checking that the resource is deleted") + Eventually(k8sClient.Get). + WithArguments(ctx, typeNamespacedName, stream). + ShouldNot(Succeed()) + }) + }) + + When("controller is restricted to different namespace", func() { + BeforeEach(func(ctx SpecContext) { + namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{Namespace: "other-namespace"}) + Expect(err).NotTo(HaveOccurred()) + controller = &StreamReconciler{ + JetStreamController: namespaced, + } + }) + It("should not delete the resource and stream", func(ctx SpecContext) { + + By("reconciling") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking that the stream is not deleted") + _, err = jsClient.Stream(ctx, streamName) + Expect(err).NotTo(HaveOccurred()) + + By("checking that the finalizer is not removed") + Expect(k8sClient.Get(ctx, typeNamespacedName, stream)).To(Succeed()) + Expect(stream.Finalizers).To(ContainElement(streamFinalizer)) + }) + }) + }) + }) + + It("should update stream on different server as specified in spec", func(ctx SpecContext) { + By("setting up the alternative server") + // Setup altClient for alternate server + altServer := CreateTestServer() + defer altServer.Shutdown() + + By("setting the server in the stream spec") + stream.Spec.Servers = []string{altServer.ClientURL()} + Expect(k8sClient.Update(ctx, stream)).To(Succeed()) + + By("checking precondition, that the stream does not yet exist") + got, err := jsClient.Stream(ctx, streamName) + Expect(err).To(MatchError(jetstream.ErrStreamNotFound)) + + By("reconciling the resource") + result, err := controller.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking if the stream was created on the alternative server") + altClient, closer, err := CreateJetStreamClient(&NatsConfig{ServerURL: altServer.ClientURL()}, true) + defer closer.Close() + Expect(err).NotTo(HaveOccurred()) + + got, err = altClient.Stream(ctx, streamName) + Expect(err).NotTo(HaveOccurred()) + Expect(got.CachedInfo().Created).To(BeTemporally("~", time.Now(), time.Second)) + + By("checking that the stream was NOT created on the original server") + _, err = jsClient.Stream(ctx, streamName) + Expect(err).To(MatchError(jetstream.ErrStreamNotFound)) + }) }) }) + +func Test_mapSpecToConfig(t *testing.T) { + + date := time.Date(2024, 12, 03, 16, 55, 5, 0, time.UTC) + dateString := date.Format(time.RFC3339) + + tests := []struct { + name string + spec *api.StreamSpec + want jetstream.StreamConfig + wantErr bool + }{ + { + name: "emtpy spec", + spec: &api.StreamSpec{}, + want: jetstream.StreamConfig{}, + wantErr: false, + }, + { + name: "full spec", + spec: &api.StreamSpec{ + Account: "", + AllowDirect: true, + AllowRollup: true, + Creds: "", + DenyDelete: true, + DenyPurge: true, + Description: "stream description", + DiscardPerSubject: true, + PreventDelete: false, + PreventUpdate: false, + Discard: "new", + DuplicateWindow: "5s", + MaxAge: "30s", + MaxBytes: -1, + MaxConsumers: -1, + MaxMsgs: -1, + MaxMsgSize: -1, + MaxMsgsPerSubject: 10, + Mirror: &api.StreamSource{ + Name: "mirror", + OptStartSeq: 5, + OptStartTime: dateString, + FilterSubject: "orders", + ExternalAPIPrefix: "api", + ExternalDeliverPrefix: "deliver", + SubjectTransforms: []*api.SubjectTransform{{ + Source: "transform-source", + Dest: "transform-dest", + }}, + }, + Name: "stream-name", + Nkey: "", + NoAck: true, + Placement: &api.StreamPlacement{ + Cluster: "test-cluster", + Tags: []string{"tag"}, + }, + Replicas: 3, + Republish: &api.RePublish{ + Source: "re-publish-source", + Destination: "re-publish-dest", + HeadersOnly: true, + }, + SubjectTransform: &api.SubjectTransform{ + Source: "transform-source", + Dest: "transform-dest", + }, + FirstSequence: 42, + Compression: "s2", + Metadata: map[string]string{ + "meta": "data", + }, + Retention: "interest", + Servers: nil, + Sources: []*api.StreamSource{{ + Name: "source", + OptStartSeq: 5, + OptStartTime: dateString, + FilterSubject: "orders", + ExternalAPIPrefix: "api", + ExternalDeliverPrefix: "deliver", + SubjectTransforms: []*api.SubjectTransform{{ + Source: "transform-source", + Dest: "transform-dest", + }}, + }}, + Storage: "file", + Subjects: []string{"orders.*"}, + TLS: api.TLS{}, + }, + want: jetstream.StreamConfig{ + Name: "stream-name", + Description: "stream description", + Subjects: []string{"orders.*"}, + Retention: jetstream.InterestPolicy, + MaxConsumers: -1, + MaxMsgs: -1, + MaxBytes: -1, + Discard: jetstream.DiscardNew, + DiscardNewPerSubject: true, + MaxAge: time.Second * 30, + MaxMsgsPerSubject: 10, + MaxMsgSize: -1, + Storage: jetstream.FileStorage, + Replicas: 3, + NoAck: true, + Duplicates: time.Second * 5, + Placement: &jetstream.Placement{ + Cluster: "test-cluster", + Tags: []string{"tag"}, + }, + Mirror: &jetstream.StreamSource{ + Name: "mirror", + OptStartSeq: 5, + OptStartTime: &date, + FilterSubject: "orders", + SubjectTransforms: []jetstream.SubjectTransformConfig{{ + Source: "transform-source", + Destination: "transform-dest", + }}, + External: &jetstream.ExternalStream{ + APIPrefix: "api", + DeliverPrefix: "deliver", + }, + Domain: "", + }, + Sources: []*jetstream.StreamSource{{ + Name: "source", + OptStartSeq: 5, + OptStartTime: &date, + FilterSubject: "orders", + SubjectTransforms: []jetstream.SubjectTransformConfig{{ + Source: "transform-source", + Destination: "transform-dest", + }}, + External: &jetstream.ExternalStream{ + APIPrefix: "api", + DeliverPrefix: "deliver", + }, + Domain: "", + }}, + Sealed: false, + DenyDelete: true, + DenyPurge: true, + AllowRollup: true, + Compression: jetstream.S2Compression, + FirstSeq: 42, + SubjectTransform: &jetstream.SubjectTransformConfig{ + Source: "transform-source", + Destination: "transform-dest", + }, + RePublish: &jetstream.RePublish{ + Source: "re-publish-source", + Destination: "re-publish-dest", + HeadersOnly: true, + }, + AllowDirect: true, + MirrorDirect: false, + ConsumerLimits: jetstream.StreamConsumerLimits{}, + Metadata: map[string]string{ + "meta": "data", + }, + Template: "", + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert := assert.New(t) + got, err := streamSpecToConfig(tt.spec) + if (err != nil) != tt.wantErr { + t.Errorf("streamSpecToConfig() error = %v, wantErr %v", err, tt.wantErr) + return + } + + // Compare nested structs + assert.EqualValues(tt.want, got) + }) + } +} diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go index 2dac16e2..dc7a59b9 100644 --- a/internal/controller/suite_test.go +++ b/internal/controller/suite_test.go @@ -19,7 +19,6 @@ package controller import ( "fmt" "github.com/nats-io/nats-server/v2/server" - natsserver "github.com/nats-io/nats-server/v2/test" "github.com/nats-io/nats.go/jetstream" "os" "path/filepath" @@ -47,6 +46,7 @@ var k8sClient client.Client var testEnv *envtest.Environment var testServer *server.Server var jsClient jetstream.JetStream +var baseController JetStreamController func TestControllers(t *testing.T) { RegisterFailHandler(Fail) @@ -86,13 +86,11 @@ var _ = BeforeSuite(func() { By("bootstrapping the test server") testServer = CreateTestServer() - jsClient, err = CreateJetStreamClient( - &NatsConfig{ServerURL: testServer.ClientURL()}, - true, - ) Expect(err).NotTo(HaveOccurred()) - Expect(jsClient).NotTo(BeNil()) + testNatsConfig := &NatsConfig{ServerURL: testServer.ClientURL()} + baseController, err = NewJSController(k8sClient, testNatsConfig, &Config{}) + jsClient, _, err = CreateJetStreamClient(testNatsConfig, true) }) var _ = AfterSuite(func() { @@ -106,19 +104,3 @@ var _ = AfterSuite(func() { err = os.RemoveAll(storeDir) Expect(err).NotTo(HaveOccurred()) }) - -func CreateTestServer() *server.Server { - opts := &natsserver.DefaultTestOptions - opts.JetStream = true - opts.Port = -1 - opts.Debug = true - - dir, err := os.MkdirTemp("", "nats-*") - Expect(err).NotTo(HaveOccurred()) - opts.StoreDir = dir - - ns := natsserver.RunServer(opts) - Expect(err).NotTo(HaveOccurred()) - - return ns -} diff --git a/internal/controller/types.go b/internal/controller/types.go new file mode 100644 index 00000000..f48d3c4b --- /dev/null +++ b/internal/controller/types.go @@ -0,0 +1,6 @@ +package controller + +const ( + readyCondType = "Ready" + streamFinalizer = "stream.nats.io/finalizer" +)