From d61206eeeeb5b520a666e4dd8ccf1a35a0542530 Mon Sep 17 00:00:00 2001 From: bpeng Date: Wed, 6 Nov 2024 09:49:04 +1300 Subject: [PATCH] change sqs receiveMessage func to pass in a callback operation for the calling func to do something when the process is waiting on an empty queue for a long time --- aws/sqs/sqs.go | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/aws/sqs/sqs.go b/aws/sqs/sqs.go index 6d24d68..6b1db84 100644 --- a/aws/sqs/sqs.go +++ b/aws/sqs/sqs.go @@ -27,6 +27,11 @@ type SQS struct { client *sqs.Client } +// define a func to callback when no message received from the sqs and the process continues +// to receive for a long time, for the calling func to do sth in the middle, +// e.g. send a heartbeat +type CallBackOperation func() + // New returns an SQS struct which wraps an SQS client using the default AWS credentials chain. // This consults (in order) environment vars, config files, EC2 and ECS roles. // It is an error if the AWS_REGION environment variable is not set. @@ -111,7 +116,8 @@ func (s *SQS) ReceiveWithAttributes(queueURL string, visibilityTimeout int32, at // to receive system stop signal, register the context with signal.NotifyContext before passing in this function, // when system stop signal is received, an error with message '... context canceled' will be returned // which can be used to safely stop the system -func (s *SQS) ReceiveWithContextAttributes(ctx context.Context, queueURL string, visibilityTimeout int32, attrs []types.QueueAttributeName) (Raw, error) { +func (s *SQS) ReceiveWithContextAttributesCallback(ctx context.Context, queueURL string, visibilityTimeout int32, + attrs []types.QueueAttributeName, callback CallBackOperation) (Raw, error) { input := sqs.ReceiveMessageInput{ QueueUrl: aws.String(queueURL), MaxNumberOfMessages: 1, @@ -119,12 +125,17 @@ func (s *SQS) ReceiveWithContextAttributes(ctx context.Context, queueURL string, WaitTimeSeconds: 20, AttributeNames: attrs, } - return s.receiveMessage(ctx, &input) + return s.receiveMessage(ctx, &input, callback) +} + +func (s *SQS) ReceiveWithContextAttributes(ctx context.Context, queueURL string, visibilityTimeout int32, + attrs []types.QueueAttributeName) (Raw, error) { + return s.ReceiveWithContextAttributesCallback(ctx, queueURL, visibilityTimeout, attrs, nil) } // receiveMessage is the common code used internally to receive an SQS message based // on the provided input. -func (s *SQS) receiveMessage(ctx context.Context, input *sqs.ReceiveMessageInput) (Raw, error) { +func (s *SQS) receiveMessage(ctx context.Context, input *sqs.ReceiveMessageInput, callBack CallBackOperation) (Raw, error) { for { r, err := s.client.ReceiveMessage(ctx, input) @@ -135,6 +146,9 @@ func (s *SQS) receiveMessage(ctx context.Context, input *sqs.ReceiveMessageInput switch { case r == nil || len(r.Messages) == 0: // no message received + if callBack != nil { //do something in the middle + callBack() + } continue case len(r.Messages) == 1: raw := r.Messages[0] @@ -155,14 +169,18 @@ func (s *SQS) receiveMessage(ctx context.Context, input *sqs.ReceiveMessageInput // to receive system stop signal, register the context with signal.NotifyContext before passing in this function, // when system stop signal is received, an error with message '... context canceled' will be returned // which can be used to safely stop the system -func (s *SQS) ReceiveWithContext(ctx context.Context, queueURL string, visibilityTimeout int32) (Raw, error) { +func (s *SQS) ReceiveWithContextCallback(ctx context.Context, queueURL string, visibilityTimeout int32, callback CallBackOperation) (Raw, error) { input := sqs.ReceiveMessageInput{ QueueUrl: aws.String(queueURL), MaxNumberOfMessages: 1, VisibilityTimeout: visibilityTimeout, WaitTimeSeconds: 20, } - return s.receiveMessage(ctx, &input) + return s.receiveMessage(ctx, &input, callback) +} + +func (s *SQS) ReceiveWithContext(ctx context.Context, queueURL string, visibilityTimeout int32) (Raw, error) { + return s.ReceiveWithContextCallback(ctx, queueURL, visibilityTimeout, nil) } // Delete deletes the message referred to by receiptHandle from the queue.