Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(controller-runtime): Implement stream controller #211

Merged
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
38a7db2
Add account, consumer and stream controller stubs to be implemented
adriandieter Dec 2, 2024
52c5d9f
Add logs to Reconcile functions
adriandieter Dec 2, 2024
25f3fa0
Add jsClient to test suit variables
adriandieter Dec 2, 2024
60a28f5
Remove format from log string
adriandieter Dec 2, 2024
bffc435
Make upsertCondition public to be used in new controllers
adriandieter Dec 3, 2024
ac84508
Implement basic cases for stream reconciliation
adriandieter Dec 4, 2024
d041edc
refactor to use shared base controller
adriandieter Dec 4, 2024
4d57584
Support jetstream connection options in stream spec
adriandieter Dec 5, 2024
814e243
implement stream deletion
adriandieter Dec 5, 2024
fb431bc
update observedGeneration of status
adriandieter Dec 9, 2024
1f6be77
check Spec.PreventDelete before stream deletion
adriandieter Dec 9, 2024
37f44fc
remove base js client
adriandieter Dec 9, 2024
00c98e5
move asJsonString to jetstream_controller
adriandieter Dec 9, 2024
7e2006b
check namespace read only and prevent update mode
adriandieter Dec 9, 2024
1b8b96d
Update comments and log
adriandieter Dec 10, 2024
8ddea8d
Fix test docs and check precondition
adriandieter Dec 10, 2024
5a5a7f5
Add preventUpdate test cases
adriandieter Dec 10, 2024
6f90932
Add tests for read-only or namespace restricted mode
adriandieter Dec 10, 2024
19c6beb
fix empty ca when no ca set
adriandieter Dec 10, 2024
9418bcd
simplify error message
adriandieter Dec 10, 2024
f7abcc6
fix error loop when the underlying stream was deleted
adriandieter Dec 10, 2024
84619b5
refactor each phase into separate method
adriandieter Dec 10, 2024
28cb7b0
Fix errors during parallel reconciliation & Refactor tests
adriandieter Dec 10, 2024
8331720
make test description strings more uniform
adriandieter Dec 11, 2024
2f04e7f
Update docs and log messages
adriandieter Dec 11, 2024
ffdc4c7
extract configuration to buildNatsConfig method
adriandieter Dec 11, 2024
617f3c0
fix checking for preventDelete in the update step
adriandieter Dec 11, 2024
d603c91
fix k8s binaries not downloaded for tests
adriandieter Dec 12, 2024
ce91d08
add /bin to gitignore
adriandieter Dec 12, 2024
684a3b2
rename stream helper functions
adriandieter Dec 17, 2024
0fd239d
update naming as suggested
adriandieter Dec 18, 2024
a7a341e
fix assumed reason in log message
adriandieter Dec 18, 2024
0b21494
Update todo comments marked with review
adriandieter Dec 18, 2024
2bf9162
separate CA config from client cert and key
adriandieter Dec 18, 2024
d93aa63
set streamName and consumerName fields once on logger
adriandieter Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
/nats-boot-config
/nats-boot-config.docker
/tools
/bin
/.idea
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ $(ENVTEST): $(LOCALBIN)
.PHONY: test
test: envtest
go vet ./controllers/... ./pkg/natsreloader/... ./internal/controller/...
$(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path ## Get k8s binaries
go test -race -cover -count=1 -timeout 10s ./controllers/... ./pkg/natsreloader/... ./internal/controller/...

.PHONY: clean
Expand Down
7 changes: 6 additions & 1 deletion cmd/jetstream-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,23 @@ func run() error {

if *controlLoop {
klog.Warning("Starting jetStream controller in experimental control loop mode")

natsCfg := &controller.NatsConfig{
CRDConnect: *crdConnect,
ClientName: "jetstream-controller",
Credentials: *creds,
NKey: *nkey,
ServerURL: *server,
CA: *ca,
CAs: []string{},
Certificate: *cert,
Key: *key,
TLSFirst: *tlsfirst,
}

if *ca != "" {
natsCfg.CAs = []string{*ca}
}

controllerCfg := &controller.Config{
ReadOnly: *readOnly,
Namespace: *namespace,
Expand Down
4 changes: 2 additions & 2 deletions controllers/jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func setConsumerOK(ctx context.Context, s *apis.Consumer, i typed.ConsumerInterf
sc := s.DeepCopy()

sc.Status.ObservedGeneration = s.Generation
sc.Status.Conditions = upsertCondition(sc.Status.Conditions, apis.Condition{
sc.Status.Conditions = UpsertCondition(sc.Status.Conditions, apis.Condition{
Type: readyCondType,
Status: k8sapi.ConditionTrue,
LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano),
Expand Down Expand Up @@ -490,7 +490,7 @@ func setConsumerErrored(ctx context.Context, s *apis.Consumer, sif typed.Consume
}

sc := s.DeepCopy()
sc.Status.Conditions = upsertCondition(sc.Status.Conditions, apis.Condition{
sc.Status.Conditions = UpsertCondition(sc.Status.Conditions, apis.Condition{
Type: readyCondType,
Status: k8sapi.ConditionFalse,
LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano),
Expand Down
2 changes: 1 addition & 1 deletion controllers/jetstream/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func processQueueNext(q workqueue.RateLimitingInterface, jmsClient jsmClientFunc
q.Forget(item)
}

func upsertCondition(cs []apis.Condition, next apis.Condition) []apis.Condition {
func UpsertCondition(cs []apis.Condition, next apis.Condition) []apis.Condition {
for i := 0; i < len(cs); i++ {
if cs[i].Type != next.Type {
continue
Expand Down
6 changes: 3 additions & 3 deletions controllers/jetstream/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestUpsertCondition(t *testing.T) {

var cs []apis.Condition

cs = upsertCondition(cs, apis.Condition{
cs = UpsertCondition(cs, apis.Condition{
Type: readyCondType,
Status: k8sapis.ConditionTrue,
LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano),
Expand All @@ -202,7 +202,7 @@ func TestUpsertCondition(t *testing.T) {
t.Fatalf("got=%s; want=%s", got, want)
}

cs = upsertCondition(cs, apis.Condition{
cs = UpsertCondition(cs, apis.Condition{
Type: readyCondType,
Status: k8sapis.ConditionFalse,
LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano),
Expand All @@ -218,7 +218,7 @@ func TestUpsertCondition(t *testing.T) {
t.Fatalf("got=%s; want=%s", got, want)
}

cs = upsertCondition(cs, apis.Condition{
cs = UpsertCondition(cs, apis.Condition{
Type: "Foo",
Status: k8sapis.ConditionTrue,
LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano),
Expand Down
4 changes: 2 additions & 2 deletions controllers/jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ func setStreamErrored(ctx context.Context, s *apis.Stream, sif typed.StreamInter
}

sc := s.DeepCopy()
sc.Status.Conditions = upsertCondition(sc.Status.Conditions, apis.Condition{
sc.Status.Conditions = UpsertCondition(sc.Status.Conditions, apis.Condition{
Type: readyCondType,
Status: k8sapi.ConditionFalse,
LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano),
Expand Down Expand Up @@ -600,7 +600,7 @@ func setStreamOK(ctx context.Context, s *apis.Stream, i typed.StreamInterface) (
sc := s.DeepCopy()

sc.Status.ObservedGeneration = s.Generation
sc.Status.Conditions = upsertCondition(sc.Status.Conditions, apis.Condition{
sc.Status.Conditions = UpsertCondition(sc.Status.Conditions, apis.Condition{
Type: readyCondType,
Status: k8sapi.ConditionTrue,
LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano),
Expand Down
8 changes: 1 addition & 7 deletions internal/controller/account_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,15 @@ package controller

import (
"context"
"github.com/nats-io/nats.go/jetstream"
"k8s.io/klog/v2"

jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// AccountReconciler reconciles a Account object
type AccountReconciler struct {
client.Client
Scheme *runtime.Scheme
Config *Config
JetStream jetstream.JetStream
JetStreamController
}

// Reconcile is part of the main kubernetes reconciliation loop which aims to
Expand Down
3 changes: 1 addition & 2 deletions internal/controller/account_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ var _ = Describe("Account Controller", func() {
It("should successfully reconcile the resource", func() {
By("Reconciling the created resource")
controllerReconciler := &AccountReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
baseController,
}

_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
Expand Down
23 changes: 15 additions & 8 deletions internal/controller/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type NatsConfig struct {
Credentials string
NKey string
ServerURL string
CA string
CAs []string
Certificate string
Key string
TLSFirst bool
Expand Down Expand Up @@ -46,19 +46,26 @@ func (o *NatsConfig) buildOptions() ([]nats.Option, error) {
opts = append(opts, nats.ClientCert(o.Certificate, o.Key))
}

if o.CA != "" {
opts = append(opts, nats.RootCAs(o.CA))
if o.CAs != nil && len(o.CAs) > 0 {
opts = append(opts, nats.RootCAs(o.CAs...))
}
}

return opts, nil
}

func CreateJetStreamClient(cfg *NatsConfig, pedantic bool) (jetstream.JetStream, error) {
type Closable interface {
Close()
}

// CreateJetStreamClient creates new Jetstream client with a connection based on the given NatsConfig.
// Returns a jetstream.Jetstream client and the Closable of the underlying connection.
// Close should be called when the client is no longer used.
func CreateJetStreamClient(cfg *NatsConfig, pedantic bool) (jetstream.JetStream, Closable, error) {

opts, err := cfg.buildOptions()
if err != nil {
return nil, fmt.Errorf("nats options: %w", err)
return nil, nil, fmt.Errorf("nats options: %w", err)
}

// Set pedantic option
Expand All @@ -74,12 +81,12 @@ func CreateJetStreamClient(cfg *NatsConfig, pedantic bool) (jetstream.JetStream,

nc, err := nats.Connect(cfg.ServerURL, opts...)
if err != nil {
return nil, fmt.Errorf("nats connect: %w", err)
return nil, nil, fmt.Errorf("nats connect: %w", err)
}

js, err := jetstream.New(nc)
if err != nil {
return nil, fmt.Errorf("new jetstream: %w", err)
return nil, nil, fmt.Errorf("new jetstream: %w", err)
}
return js, nil
return js, nc, nil
}
8 changes: 1 addition & 7 deletions internal/controller/consumer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,15 @@ package controller

import (
"context"
"github.com/nats-io/nats.go/jetstream"
"k8s.io/klog/v2"

jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// ConsumerReconciler reconciles a Consumer object
type ConsumerReconciler struct {
client.Client
Scheme *runtime.Scheme
Config *Config
JetStream jetstream.JetStream
JetStreamController
}

// Reconcile is part of the main kubernetes reconciliation loop which aims to
Expand Down
3 changes: 1 addition & 2 deletions internal/controller/consumer_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ var _ = Describe("Consumer Controller", func() {
It("should successfully reconcile the resource", func() {
By("Reconciling the created resource")
controllerReconciler := &ConsumerReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
baseController,
}

_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
Expand Down
43 changes: 43 additions & 0 deletions internal/controller/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package controller

import (
api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
"github.com/nats-io/nats-server/v2/server"
natsserver "github.com/nats-io/nats-server/v2/test"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"

"os"
"time"
)

func assertReadyStateMatches(condition api.Condition, status v1.ConditionStatus, reason string, message string, transitionTime time.Time) {
GinkgoHelper()

Expect(condition.Type).To(Equal(readyCondType))
Expect(condition.Status).To(Equal(status))
Expect(condition.Reason).To(Equal(reason))
Expect(condition.Message).To(ContainSubstring(message))

// Assert valid transition time
t, err := time.Parse(time.RFC3339Nano, condition.LastTransitionTime)
Expect(err).NotTo(HaveOccurred())
Expect(t).To(BeTemporally("~", transitionTime, time.Second))
}

func CreateTestServer() *server.Server {
opts := &natsserver.DefaultTestOptions
opts.JetStream = true
opts.Port = -1
opts.Debug = true

dir, err := os.MkdirTemp("", "nats-*")
Expect(err).NotTo(HaveOccurred())
opts.StoreDir = dir

ns := natsserver.RunServer(opts)
Expect(err).NotTo(HaveOccurred())

return ns
}
Loading
Loading