Skip to content

Commit

Permalink
change sqs receiveMessage func to pass in a callback operation for th…
Browse files Browse the repository at this point in the history
…e calling func to do something when the process is waiting on an empty queue for a long time
  • Loading branch information
bpeng committed Nov 5, 2024
1 parent a6d3b0a commit d61206e
Showing 1 changed file with 23 additions and 5 deletions.
28 changes: 23 additions & 5 deletions aws/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type SQS struct {
client *sqs.Client
}

// define a func to callback when no message received from the sqs and the process continues
// to receive for a long time, for the calling func to do sth in the middle,
// e.g. send a heartbeat
type CallBackOperation func()

// New returns an SQS struct which wraps an SQS client using the default AWS credentials chain.
// This consults (in order) environment vars, config files, EC2 and ECS roles.
// It is an error if the AWS_REGION environment variable is not set.
Expand Down Expand Up @@ -111,20 +116,26 @@ func (s *SQS) ReceiveWithAttributes(queueURL string, visibilityTimeout int32, at
// to receive system stop signal, register the context with signal.NotifyContext before passing in this function,
// when system stop signal is received, an error with message '... context canceled' will be returned
// which can be used to safely stop the system
func (s *SQS) ReceiveWithContextAttributes(ctx context.Context, queueURL string, visibilityTimeout int32, attrs []types.QueueAttributeName) (Raw, error) {
func (s *SQS) ReceiveWithContextAttributesCallback(ctx context.Context, queueURL string, visibilityTimeout int32,
attrs []types.QueueAttributeName, callback CallBackOperation) (Raw, error) {
input := sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: 1,
VisibilityTimeout: visibilityTimeout,
WaitTimeSeconds: 20,
AttributeNames: attrs,
}
return s.receiveMessage(ctx, &input)
return s.receiveMessage(ctx, &input, callback)
}

func (s *SQS) ReceiveWithContextAttributes(ctx context.Context, queueURL string, visibilityTimeout int32,
attrs []types.QueueAttributeName) (Raw, error) {
return s.ReceiveWithContextAttributesCallback(ctx, queueURL, visibilityTimeout, attrs, nil)
}

// receiveMessage is the common code used internally to receive an SQS message based
// on the provided input.
func (s *SQS) receiveMessage(ctx context.Context, input *sqs.ReceiveMessageInput) (Raw, error) {
func (s *SQS) receiveMessage(ctx context.Context, input *sqs.ReceiveMessageInput, callBack CallBackOperation) (Raw, error) {

for {
r, err := s.client.ReceiveMessage(ctx, input)
Expand All @@ -135,6 +146,9 @@ func (s *SQS) receiveMessage(ctx context.Context, input *sqs.ReceiveMessageInput
switch {
case r == nil || len(r.Messages) == 0:
// no message received
if callBack != nil { //do something in the middle
callBack()
}
continue
case len(r.Messages) == 1:
raw := r.Messages[0]
Expand All @@ -155,14 +169,18 @@ func (s *SQS) receiveMessage(ctx context.Context, input *sqs.ReceiveMessageInput
// to receive system stop signal, register the context with signal.NotifyContext before passing in this function,
// when system stop signal is received, an error with message '... context canceled' will be returned
// which can be used to safely stop the system
func (s *SQS) ReceiveWithContext(ctx context.Context, queueURL string, visibilityTimeout int32) (Raw, error) {
func (s *SQS) ReceiveWithContextCallback(ctx context.Context, queueURL string, visibilityTimeout int32, callback CallBackOperation) (Raw, error) {
input := sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: 1,
VisibilityTimeout: visibilityTimeout,
WaitTimeSeconds: 20,
}
return s.receiveMessage(ctx, &input)
return s.receiveMessage(ctx, &input, callback)
}

func (s *SQS) ReceiveWithContext(ctx context.Context, queueURL string, visibilityTimeout int32) (Raw, error) {
return s.ReceiveWithContextCallback(ctx, queueURL, visibilityTimeout, nil)
}

// Delete deletes the message referred to by receiptHandle from the queue.
Expand Down

0 comments on commit d61206e

Please sign in to comment.