Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(controller-runtime): Add consumer controller #212

Merged

Conversation

adriandieter
Copy link

Adds reconciliation of consumer resources in the consumer controller

Implementation is analogous to the stream controller.

  • implement consumerSpecToConfig
  • implement consumer resource initialization
  • implement consumer update/creation
  • implement preventUpdate, readonly and namespace restrictions
  • test consumer creation on alternative server
  • implement consumer deletion
  • handle deletion when the underlying stream was deleted
  • add missing GenerationChanged event filter to consumerReconciler
  • update logging

@Jarema
Copy link
Member

Jarema commented Dec 30, 2024

Hey.
Sorry for the delay - we will review this shortly.
This is an awesome effort!

Copy link
Member

@samuelattwood samuelattwood left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the holiday delay

Looks very solid, consistent with the stream controller. Comments are pretty much just nits. Thanks!

Comment on lines 25 to +26
"k8s.io/apimachinery/pkg/api/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate import

Comment on lines +320 to +321
c, err := jsClient.Consumer(ctx, streamName, consumerName)
Expect(c.CachedInfo().Config.Description).To(BeEmpty())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
c, err := jsClient.Consumer(ctx, streamName, consumerName)
Expect(c.CachedInfo().Config.Description).To(BeEmpty())
c, err := jsClient.Consumer(ctx, streamName, consumerName)
Expect(err).NotTo(HaveOccurred())
Expect(c.CachedInfo().Config.Description).To(BeEmpty())

Comment on lines +394 to +395
s, err := jsClient.Consumer(ctx, streamName, consumerName)
Expect(s.CachedInfo().Config.Description).To(BeEmpty())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
s, err := jsClient.Consumer(ctx, streamName, consumerName)
Expect(s.CachedInfo().Config.Description).To(BeEmpty())
s, err := jsClient.Consumer(ctx, streamName, consumerName)
Expect(err).NotTo(HaveOccurred())
Expect(s.CachedInfo().Config.Description).To(BeEmpty())

func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger, consumer *api.Consumer) error {

// Create or Update the stream based on the spec
if consumer.Spec.PreventUpdate || r.ReadOnly() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that we still want to permit a create if PreventUpdate is true, and only block update operations. It looks like I missed this in the stream controller as well

// Create or Update the stream based on the spec
if consumer.Spec.PreventUpdate || r.ReadOnly() {
log.Info("Skipping consumer creation or update.",
"preventDelete", consumer.Spec.PreventDelete,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this intended to log the preventUpdate setting in the spec rather than preventDelete?

Expect(result.IsZero()).To(BeTrue())

By("checking that consumer was not updated")
s, err := jsClient.Consumer(ctx, streamName, consumerName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
s, err := jsClient.Consumer(ctx, streamName, consumerName)
s, err := jsClient.Consumer(ctx, streamName, consumerName)
Expect(err).NotTo(HaveOccurred())

Expect(k8sClient.Update(ctx, consumer)).To(Succeed())

By("checking precondition, that the consumer does not yet exist")
got, err := jsClient.Consumer(ctx, streamName, consumerName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
got, err := jsClient.Consumer(ctx, streamName, consumerName)
_, err = jsClient.Consumer(ctx, streamName, consumerName)

@samuelattwood
Copy link
Member

samuelattwood commented Dec 31, 2024

I have a branch based on this branch about ready for merge, so I just went ahead and addressed the few comments there. I will merge this and open a PR with those changes shortly

#215

@samuelattwood samuelattwood merged commit 88f61b2 into nats-io:feature/controller-runtime Dec 31, 2024
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants