Skip to content

Commit

Permalink
feat(464): add async actions consumer SDK to act API (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hamsajj authored May 28, 2024
1 parent cd15436 commit 8b05bf4
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 0 deletions.
79 changes: 79 additions & 0 deletions act/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package act

import (
"context"
"errors"
"github.com/hashicorp/go-hclog"
"github.com/redis/go-redis/v9"
)

type MessageHandlerFn func(ctx context.Context, payload []byte) error

type IConsumer interface {
Subscribe(ctx context.Context, handler MessageHandlerFn) error
}

var _ IConsumer = (*Consumer)(nil)

type Consumer struct {
logger hclog.Logger
rdb *redis.Client
workersCount int
channel string
redisPubSub *redis.PubSub
redisChan <-chan *redis.Message
}

func NewConsumer(
logger hclog.Logger, redisClient *redis.Client, workersCount int, channelName string,
) (*Consumer, error) {
if logger == nil {
return nil, errors.New("logger is required")
}
if err := redisClient.Ping(context.Background()).Err(); err != nil {
logger.Error("failed to connect to redis", "error", err)
return nil, err
}
return &Consumer{
logger: logger,
rdb: redisClient,
channel: channelName,
workersCount: workersCount,
}, nil
}

func (c *Consumer) Subscribe(ctx context.Context, handler MessageHandlerFn) error {
c.redisPubSub = c.rdb.Subscribe(ctx, c.channel)
if err := c.redisPubSub.Ping(context.Background()); err != nil {
c.logger.Error("failed to subscribe to redis channel", "error", err, "channel", c.channel)
return err
}
c.redisChan = c.redisPubSub.Channel()
for i := 0; i < c.workersCount; i++ {
go c.run(ctx, handler)
}
return nil
}

func (c *Consumer) run(ctx context.Context, handler MessageHandlerFn) {
for {
select {
case <-ctx.Done():
c.logger.Info("context done, stopping redis consumer")
if err := c.redisPubSub.Close(); err != nil {
c.logger.Error("failed to close redis pubsub", "error", err)
}
return
case msg, ok := <-c.redisChan:
if !ok {
c.logger.Warn("redis channel closed")
return
}
if err := handler(ctx, []byte(msg.Payload)); err != nil {
c.logger.Error("failed to process task", "error", err, "payload", msg.Payload)
} else {
c.logger.Debug("async redis task processed successfully", "payload", msg.Payload)
}
}
}
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/jackc/pgx/v5 v5.5.5
github.com/pganalyze/pg_query_go/v5 v5.1.0
github.com/prometheus/client_golang v1.19.1
github.com/redis/go-redis/v9 v9.5.1
github.com/rs/zerolog v1.32.0
github.com/stretchr/testify v1.9.0
github.com/wasilibs/go-pgquery v0.0.0-20240510022537-eb0917feddeb
Expand All @@ -22,6 +23,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fatih/color v1.17.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 8b05bf4

Please sign in to comment.