diff --git a/act/consumer.go b/act/consumer.go new file mode 100644 index 0000000..7890d3e --- /dev/null +++ b/act/consumer.go @@ -0,0 +1,79 @@ +package act + +import ( + "context" + "errors" + "github.com/hashicorp/go-hclog" + "github.com/redis/go-redis/v9" +) + +type MessageHandlerFn func(ctx context.Context, payload []byte) error + +type IConsumer interface { + Subscribe(ctx context.Context, handler MessageHandlerFn) error +} + +var _ IConsumer = (*Consumer)(nil) + +type Consumer struct { + logger hclog.Logger + rdb *redis.Client + workersCount int + channel string + redisPubSub *redis.PubSub + redisChan <-chan *redis.Message +} + +func NewConsumer( + logger hclog.Logger, redisClient *redis.Client, workersCount int, channelName string, +) (*Consumer, error) { + if logger == nil { + return nil, errors.New("logger is required") + } + if err := redisClient.Ping(context.Background()).Err(); err != nil { + logger.Error("failed to connect to redis", "error", err) + return nil, err + } + return &Consumer{ + logger: logger, + rdb: redisClient, + channel: channelName, + workersCount: workersCount, + }, nil +} + +func (c *Consumer) Subscribe(ctx context.Context, handler MessageHandlerFn) error { + c.redisPubSub = c.rdb.Subscribe(ctx, c.channel) + if err := c.redisPubSub.Ping(context.Background()); err != nil { + c.logger.Error("failed to subscribe to redis channel", "error", err, "channel", c.channel) + return err + } + c.redisChan = c.redisPubSub.Channel() + for i := 0; i < c.workersCount; i++ { + go c.run(ctx, handler) + } + return nil +} + +func (c *Consumer) run(ctx context.Context, handler MessageHandlerFn) { + for { + select { + case <-ctx.Done(): + c.logger.Info("context done, stopping redis consumer") + if err := c.redisPubSub.Close(); err != nil { + c.logger.Error("failed to close redis pubsub", "error", err) + } + return + case msg, ok := <-c.redisChan: + if !ok { + c.logger.Warn("redis channel closed") + return + } + if err := handler(ctx, []byte(msg.Payload)); err != nil { + c.logger.Error("failed to process task", "error", err, "payload", msg.Payload) + } else { + c.logger.Debug("async redis task processed successfully", "payload", msg.Payload) + } + } + } +} diff --git a/go.mod b/go.mod index 09ddb94..e38f8f3 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/jackc/pgx/v5 v5.5.5 github.com/pganalyze/pg_query_go/v5 v5.1.0 github.com/prometheus/client_golang v1.19.1 + github.com/redis/go-redis/v9 v9.5.1 github.com/rs/zerolog v1.32.0 github.com/stretchr/testify v1.9.0 github.com/wasilibs/go-pgquery v0.0.0-20240510022537-eb0917feddeb @@ -22,6 +23,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fatih/color v1.17.0 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/hashicorp/yamux v0.1.1 // indirect diff --git a/go.sum b/go.sum index 607804c..bed74cb 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,9 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA= github.com/bufbuild/protocompile v0.4.0/go.mod h1:3v93+mbWn/v3xzN+31nwkJfrEpAUwp+BagBSZWx+TP8= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -9,6 +13,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/expr-lang/expr v1.16.7 h1:gCIiHt5ODA0xIaDbD0DPKyZpM9Drph3b3lolYAYq2Kw= github.com/expr-lang/expr v1.16.7/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= @@ -66,6 +72,8 @@ github.com/prometheus/common v0.53.0 h1:U2pL9w9nmJwJDa4qqLQ3ZaePJ6ZTwt7cMD3AG3+a github.com/prometheus/common v0.53.0/go.mod h1:BrxBKv3FWBIGXw89Mg1AeBq7FSyRzXWI3l3e7W3RN5U= github.com/prometheus/procfs v0.14.0 h1:Lw4VdGGoKEZilJsayHf0B+9YgLGREba2C6xr+Fdfq6s= github.com/prometheus/procfs v0.14.0/go.mod h1:XL+Iwz8k8ZabyZfMFHPiilCniixqQarAy5Mu67pHlNQ= +github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8= +github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=