Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(controller-runtime): Add consumer controller #212

Merged
265 changes: 262 additions & 3 deletions internal/controller/consumer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,18 @@ package controller

import (
"context"
"errors"
"fmt"
"github.com/go-logr/logr"
"github.com/nats-io/nats.go/jetstream"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"time"

jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
ctrl "sigs.k8s.io/controller-runtime"
)

Expand All @@ -36,14 +45,264 @@ type ConsumerReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := klog.FromContext(ctx)
log.Info("reconcile", "namespace", req.Namespace, "name", req.Name)

if ok := r.ValidNamespace(req.Namespace); !ok {
log.Info("Controller restricted to namespace, skipping reconciliation.")
return ctrl.Result{}, nil
}

// Fetch consumer resource
consumer := &api.Consumer{}
if err := r.Get(ctx, req.NamespacedName, consumer); err != nil {
if apierrors.IsNotFound(err) {
log.Info("Consumer resource not found. Ignoring since object must be deleted.")
return ctrl.Result{}, nil
}
return ctrl.Result{}, fmt.Errorf("get consumer resource '%s': %w", req.NamespacedName.String(), err)
}

log = log.WithValues(
"streamName", consumer.Spec.StreamName,
"consumerName", consumer.Spec.DurableName,
)

// Update ready status to unknown when no status is set
if consumer.Status.Conditions == nil || len(consumer.Status.Conditions) == 0 {
log.Info("Setting initial ready condition to unknown.")
consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionUnknown, "Reconciling", "Starting reconciliation")
err := r.Status().Update(ctx, consumer)
if err != nil {
return ctrl.Result{}, fmt.Errorf("set condition unknown: %w", err)
}
return ctrl.Result{Requeue: true}, nil
}

// Add finalizer
if !controllerutil.ContainsFinalizer(consumer, consumerFinalizer) {
log.Info("Adding consumer finalizer.")
if ok := controllerutil.AddFinalizer(consumer, consumerFinalizer); !ok {
return ctrl.Result{}, errors.New("failed to add finalizer to consumer resource")
}

if err := r.Update(ctx, consumer); err != nil {
return ctrl.Result{}, fmt.Errorf("update consumer resource to add finalizer: %w", err)
}
return ctrl.Result{}, nil
}

// Check Deletion
markedForDeletion := consumer.GetDeletionTimestamp() != nil
if markedForDeletion {
if controllerutil.ContainsFinalizer(consumer, consumerFinalizer) {
err := r.deleteConsumer(ctx, log, consumer)
if err != nil {
return ctrl.Result{}, fmt.Errorf("delete consumer: %w", err)
}
} else {
log.Info("Consumer marked for deletion and already finalized. Ignoring.")
}

return ctrl.Result{}, nil
}

// Create or update stream
if err := r.createOrUpdate(ctx, log, consumer); err != nil {
return ctrl.Result{}, fmt.Errorf("create or update: %s", err)
}
return ctrl.Result{}, nil
}

func (r *ConsumerReconciler) deleteConsumer(ctx context.Context, log logr.Logger, consumer *api.Consumer) error {

// Set status to not false
consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, "Finalizing", "Performing finalizer operations.")
if err := r.Status().Update(ctx, consumer); err != nil {
return fmt.Errorf("update ready condition: %w", err)
}

if !consumer.Spec.PreventDelete && !r.ReadOnly() {
err := r.WithJetStreamClient(consumerConnOpts(consumer.Spec), func(js jetstream.JetStream) error {
return js.DeleteConsumer(ctx, consumer.Spec.StreamName, consumer.Spec.DurableName)
})
switch {
case errors.Is(err, jetstream.ErrConsumerNotFound):
log.Info("Consumer does not exist. Unable to delete.")
case errors.Is(err, jetstream.ErrStreamNotFound):
log.Info("Stream of consumer does not exist. Unable to delete.")
case err != nil:
return fmt.Errorf("delete jetstream consumer: %w", err)
default:
log.Info("Consumer deleted.")
}
} else {
log.Info("Skipping consumer deletion.",
"consumerName", consumer.Spec.DurableName,
"preventDelete", consumer.Spec.PreventDelete,
"read-only", r.ReadOnly(),
)
}

log.Info("Removing consumer finalizer.")
if ok := controllerutil.RemoveFinalizer(consumer, consumerFinalizer); !ok {
return errors.New("failed to remove consumer finalizer")
}
if err := r.Update(ctx, consumer); err != nil {
return fmt.Errorf("remove finalizer: %w", err)
}

return nil
}

func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger, consumer *api.Consumer) error {

// Create or Update the stream based on the spec
if consumer.Spec.PreventUpdate || r.ReadOnly() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that we still want to permit a create if PreventUpdate is true, and only block update operations. It looks like I missed this in the stream controller as well

log.Info("Skipping consumer creation or update.",
"preventDelete", consumer.Spec.PreventDelete,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this intended to log the preventUpdate setting in the spec rather than preventDelete?

"read-only", r.ReadOnly(),
)
return nil
}

// Map spec to consumer target config
targetConfig, err := consumerSpecToConfig(&consumer.Spec)
if err != nil {
return fmt.Errorf("map consumer spec to target config: %w", err)
}

err = r.WithJetStreamClient(consumerConnOpts(consumer.Spec), func(js jetstream.JetStream) error {
log.Info("Consumer created or updated.")
_, err := js.CreateOrUpdateConsumer(ctx, consumer.Spec.StreamName, *targetConfig)
return err
})
if err != nil {
err = fmt.Errorf("create or update consumer: %w", err)
consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, "Errored", err.Error())
if err := r.Status().Update(ctx, consumer); err != nil {
log.Error(err, "Failed to update ready condition to Errored.")
}
return err
}

// update the observed generation and ready status
consumer.Status.ObservedGeneration = consumer.Generation
consumer.Status.Conditions = updateReadyCondition(
consumer.Status.Conditions,
v1.ConditionTrue,
"Reconciling",
"Consumer successfully created or updated.",
)
err = r.Status().Update(ctx, consumer)
if err != nil {
return fmt.Errorf("update ready condition: %w", err)
}

return nil
}

func consumerConnOpts(spec api.ConsumerSpec) *connectionOptions {
return &connectionOptions{
Account: spec.Account,
Creds: spec.Creds,
Nkey: spec.Nkey,
Servers: spec.Servers,
TLS: spec.TLS,
}
}

func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, error) {

config := &jetstream.ConsumerConfig{
Durable: spec.DurableName,
Description: spec.Description,
OptStartSeq: uint64(spec.OptStartSeq),
MaxDeliver: spec.MaxDeliver,
FilterSubject: spec.FilterSubject,
RateLimit: uint64(spec.RateLimitBps),
SampleFrequency: spec.SampleFreq,
MaxWaiting: spec.MaxWaiting,
MaxAckPending: spec.MaxAckPending,
HeadersOnly: spec.HeadersOnly,
MaxRequestBatch: spec.MaxRequestBatch,
MaxRequestMaxBytes: spec.MaxRequestMaxBytes,
Replicas: spec.Replicas,
MemoryStorage: spec.MemStorage,
FilterSubjects: spec.FilterSubjects,
Metadata: spec.Metadata,

// Explicitly set not (yet) mapped fields
Name: "",
InactiveThreshold: 0,
}

// DeliverPolicy
if spec.DeliverPolicy != "" {
err := config.DeliverPolicy.UnmarshalJSON(asJsonString(spec.DeliverPolicy))
if err != nil {
return nil, fmt.Errorf("invalid delivery policy: %w", err)
}
}

// OptStartTime RFC3339
if spec.OptStartTime != "" {
t, err := time.Parse(time.RFC3339, spec.OptStartTime)
if err != nil {
return nil, fmt.Errorf("invalid opt start time: %w", err)
}
config.OptStartTime = &t
}

// AckPolicy
if spec.AckPolicy != "" {
err := config.AckPolicy.UnmarshalJSON(asJsonString(spec.AckPolicy))
if err != nil {
return nil, fmt.Errorf("invalid ack policy: %w", err)
}
}

// AckWait
if spec.AckWait != "" {
d, err := time.ParseDuration(spec.AckWait)
if err != nil {
return nil, fmt.Errorf("invalid ack wait duration: %w", err)
}
config.AckWait = d
}

//BackOff
for _, bo := range spec.BackOff {
d, err := time.ParseDuration(bo)
if err != nil {
return nil, fmt.Errorf("invalid backoff: %w", err)
}

config.BackOff = append(config.BackOff, d)
}

// ReplayPolicy
if spec.ReplayPolicy != "" {
err := config.ReplayPolicy.UnmarshalJSON(asJsonString(spec.ReplayPolicy))
if err != nil {
return nil, fmt.Errorf("invalid replay policy: %w", err)
}
}

// MaxRequestExpires
if spec.MaxRequestExpires != "" {
d, err := time.ParseDuration(spec.MaxRequestExpires)
if err != nil {
return nil, fmt.Errorf("invalid opt start time: %w", err)
}
config.MaxRequestExpires = d
}

return config, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *ConsumerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&jetstreamnatsiov1beta2.Consumer{}).
For(&api.Consumer{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
Complete(r)
}
Loading
Loading