Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NATS publisher support to reminder #4829

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions config/reminder-config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@ logging:
level: "debug"

events:
sql_connection:
dbhost: "watermill-postgres"
dbport: 5432
dbuser: postgres
dbpass: postgres
dbname: watermill
sslmode: disable
driver: "sql"
# only sql and cloudevents-nats drivers are supported
# driver: "cloudevents-nats"
sql:
connection:
dbhost: "watermill-postgres"
dbport: 5432
dbuser: postgres
dbpass: postgres
dbname: watermill
sslmode: disable
# nats:
# url: "nats://nats:4222"
# prefix: "minder"
# queue: "minder"
27 changes: 12 additions & 15 deletions internal/events/nats/natschannel.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
// SPDX-License-Identifier: Apache-2.0

// Package nats provides a nants+cloudevents implementation of the eventer interface
// Package nats provides a nats+cloudevents implementation of the eventer interface
package nats

import (
Expand All @@ -17,6 +17,7 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/client"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/rs/zerolog"

"github.com/mindersec/minder/internal/events/common"
Expand Down Expand Up @@ -114,25 +115,21 @@ func (c *cloudEventsNatsAdapter) ensureTopic(ctx context.Context, topic string,
return &state, nil
}

func (c *cloudEventsNatsAdapter) ensureStream(_ context.Context) error {
func (c *cloudEventsNatsAdapter) ensureStream(ctx context.Context) error {
conn, err := nats.Connect(c.cfg.URL)
if err != nil {
return err
}
defer conn.Close()
js, err := conn.JetStream()
js, err := jetstream.New(conn)
if err != nil {
return err
}
si, err := js.StreamInfo(c.cfg.Prefix)
if si == nil || err != nil && err.Error() == "stream not found" {
_, err = js.AddStream(&nats.StreamConfig{
Name: c.cfg.Prefix,
Subjects: []string{c.cfg.Prefix + ".>"},
})
return err
}
return nil
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: c.cfg.Prefix,
Subjects: []string{c.cfg.Prefix + ".>"},
})
return err
Comment on lines +128 to +132
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Newer jetstream API, cleaner code, context support. https://github.com/nats-io/nats.go/blob/main/jetstream/README.md

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 if this works; I think when I started looking, this wasn't cleaned up / operable with CloudEvents yet.

}

// Subscribe implements message.Subscriber.
Expand Down Expand Up @@ -191,13 +188,13 @@ func (c *cloudEventsNatsAdapter) Publish(topic string, messages ...*message.Mess

state, err := c.ensureTopic(ctx, subject, "sender")
if err != nil {
return fmt.Errorf("Error creating topic %q: %w", subject, err)
return fmt.Errorf("error creating topic %q: %w", subject, err)
}

for _, msg := range messages {
err := sendEvent(ctx, subject, state.ceClient, msg)
if err != nil {
return fmt.Errorf("Error sending event to %q: %w", subject, err)
return fmt.Errorf("error sending event to %q: %w", subject, err)
}
}
return nil
Expand All @@ -214,7 +211,7 @@ func sendEvent(
// All our current payloads are encoded JSON; we need to unmarshal
payload := map[string]any{}
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
return fmt.Errorf("Error unmarshalling payload: %w", err)
return fmt.Errorf("error unmarshalling payload: %w", err)
}

err := event.SetData("application/json", payload)
Expand Down
22 changes: 22 additions & 0 deletions internal/reminder/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
// SPDX-License-Identifier: Apache-2.0

package reminder

import (
"context"
"fmt"

"github.com/ThreeDotsLabs/watermill/message"

"github.com/mindersec/minder/pkg/eventer"
)

func (r *reminder) getMessagePublisher(ctx context.Context) (message.Publisher, error) {
pub, err := eventer.New(ctx, nil, &r.cfg.EventConfig)
if err != nil {
return nil, fmt.Errorf("failed to create publisher: %w", err)
}

return pub, nil
Comment on lines +16 to +21
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a slight preference for just leaving this code in-line, given that it could generally be reduced to:

Suggested change
pub, err := eventer.New(ctx, nil, &r.cfg.EventConfig)
if err != nil {
return nil, fmt.Errorf("failed to create publisher: %w", err)
}
return pub, nil
return eventer.New(ctx, nil, &r.cfg.EventConfig)

I don't feel very strongly, however. (Leaving it in-line reduces the need to "peek" into the implementation to see if there's anything fancy going on when tracing the code. This is admittedly less of an issue for one-time initialization code.)

}
10 changes: 5 additions & 5 deletions internal/reminder/reminder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/rs/zerolog"

"github.com/mindersec/minder/internal/db"
"github.com/mindersec/minder/internal/events/common"
remindermessages "github.com/mindersec/minder/internal/reminder/messages"
reminderconfig "github.com/mindersec/minder/pkg/config/reminder"
"github.com/mindersec/minder/pkg/eventer/constants"
Expand All @@ -43,7 +42,6 @@ type reminder struct {
ticker *time.Ticker

eventPublisher message.Publisher
eventDBCloser common.DriverCloser
}

// NewReminder creates a new reminder instance
Expand All @@ -59,13 +57,12 @@ func NewReminder(ctx context.Context, store db.Store, config *reminderconfig.Con
logger := zerolog.Ctx(ctx)
logger.Info().Msgf("initial repository cursor: %s", r.repositoryCursor)

pub, cl, err := r.setupSQLPublisher(ctx)
pub, err := r.getMessagePublisher(ctx)
if err != nil {
return nil, err
}

r.eventPublisher = pub
r.eventDBCloser = cl
return r, nil
}

Expand Down Expand Up @@ -118,7 +115,10 @@ func (r *reminder) Stop() {
}
r.stopOnce.Do(func() {
close(r.stop)
r.eventDBCloser()
err := r.eventPublisher.Close()
if err != nil {
zerolog.Ctx(context.Background()).Error().Err(err).Msg("error closing event publisher")
}
})
}

Expand Down
44 changes: 0 additions & 44 deletions internal/reminder/sql_publisher.go

This file was deleted.

27 changes: 23 additions & 4 deletions pkg/config/reminder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,23 @@
package reminder

import (
"fmt"
"strings"

"github.com/spf13/pflag"
"github.com/spf13/viper"

"github.com/mindersec/minder/pkg/config"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer/constants"
)

// Config contains the configuration for the reminder service
type Config struct {
Database config.DatabaseConfig `mapstructure:"database"`
RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"`
EventConfig EventConfig `mapstructure:"events"`
LoggingConfig LoggingConfig `mapstructure:"logging"`
Database config.DatabaseConfig `mapstructure:"database"`
RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"`
EventConfig serverconfig.EventConfig `mapstructure:"events"`
LoggingConfig LoggingConfig `mapstructure:"logging"`
}

// Validate validates the configuration
Expand All @@ -28,6 +31,11 @@ func (c Config) Validate() error {
return err
}

err = validateEventConfig(c.EventConfig)
if err != nil {
return err
}

return nil
}

Expand All @@ -52,3 +60,14 @@ func RegisterReminderFlags(v *viper.Viper, flags *pflag.FlagSet) error {

return registerRecurrenceFlags(v, flags)
}

func validateEventConfig(cfg serverconfig.EventConfig) error {
switch cfg.Driver {
case constants.NATSDriver:
case constants.SQLDriver:
default:
return fmt.Errorf("events.driver %s is not supported", cfg.Driver)
}

return nil
}
Comment on lines +64 to +73
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This prevents using e.g. the flag driver. I think that's okay, but I'd tend to write this as a deny-list of configuration that we know won't work rather than an allow-list (because we'll need to go back and expand the allow-list as we add new messaging configurations).

39 changes: 32 additions & 7 deletions pkg/config/reminder/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package reminder_test

import (
"bytes"
"fmt"
"os"
"testing"
"time"
Expand All @@ -16,6 +17,8 @@ import (

"github.com/mindersec/minder/pkg/config"
"github.com/mindersec/minder/pkg/config/reminder"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer/constants"
)

func TestValidateConfig(t *testing.T) {
Expand All @@ -34,9 +37,12 @@ func TestValidateConfig(t *testing.T) {
BatchSize: 100,
MinElapsed: parseTimeDuration(t, "1h"),
},
EventConfig: reminder.EventConfig{
Connection: config.DatabaseConfig{
Port: 8080,
EventConfig: serverconfig.EventConfig{
Driver: constants.SQLDriver,
SQLPubSub: serverconfig.SQLEventConfig{
Connection: config.DatabaseConfig{
Port: 8080,
},
},
},
},
Expand All @@ -49,6 +55,9 @@ func TestValidateConfig(t *testing.T) {
BatchSize: 100,
MinElapsed: parseTimeDuration(t, "1h"),
},
EventConfig: serverconfig.EventConfig{
Driver: constants.SQLDriver,
},
},
errMsg: "cannot be negative",
},
Expand All @@ -60,9 +69,26 @@ func TestValidateConfig(t *testing.T) {
BatchSize: 100,
MinElapsed: parseTimeDuration(t, "-1h"),
},
EventConfig: serverconfig.EventConfig{
Driver: constants.SQLDriver,
},
},
errMsg: "cannot be negative",
},
{
name: "UnsupportedDriver",
config: reminder.Config{
RecurrenceConfig: reminder.RecurrenceConfig{
Interval: parseTimeDuration(t, "1h"),
BatchSize: 100,
MinElapsed: parseTimeDuration(t, "1h"),
},
EventConfig: serverconfig.EventConfig{
Driver: constants.GoChannelDriver,
},
},
errMsg: fmt.Sprintf("%s is not supported", constants.GoChannelDriver),
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -153,10 +179,9 @@ func TestSetViperDefaults(t *testing.T) {
require.Equal(t, parseTimeDuration(t, "1h"), parseTimeDuration(t, v.GetString("recurrence.interval")))
require.Equal(t, 100, v.GetInt("recurrence.batch_size"))
require.Equal(t, parseTimeDuration(t, "1h"), parseTimeDuration(t, v.GetString("recurrence.min_elapsed")))
require.Equal(t, "reminder", v.GetString("events.sql_connection.dbname"))
require.Equal(t, "reminder-event-postgres", v.GetString("events.sql_connection.dbhost"))
require.Equal(t, "reminder-event-postgres", v.GetString("events.sql_connection.dbhost"))
require.Equal(t, "postgres", v.GetString("events.sql_connection.dbuser"))
require.Equal(t, "watermill", v.GetString("events.sql.connection.dbname"))
require.Equal(t, "localhost", v.GetString("events.sql.connection.dbhost"))
require.Equal(t, "postgres", v.GetString("events.sql.connection.dbuser"))
}

// TestOverrideConfigByEnvVar tests that the configuration can be overridden by environment variables
Expand Down
16 changes: 0 additions & 16 deletions pkg/config/reminder/events.go

This file was deleted.

Loading