diff --git a/pkg/source/sqs/sqs_source.go b/pkg/source/sqs/sqs_source.go index cd87c27e..31b511e6 100644 --- a/pkg/source/sqs/sqs_source.go +++ b/pkg/source/sqs/sqs_source.go @@ -121,10 +121,20 @@ var ConfigPair = config.ConfigurationPair{ // newSQSSourceWithInterfaces allows you to provide an SQS client directly to allow // for mocking and localstack usage func newSQSSourceWithInterfaces(client sqsiface.SQSAPI, awsAccountID string, concurrentWrites int, region string, queueName string) (*sqsSource, error) { + + urlResult, err := client.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(queueName), + }) + if err != nil { + return nil, errors.Wrap(err, "Failed to get SQS queue URL") + } + // Ensures as even as possible distribution of UUIDs uuid.EnableRandPool() + return &sqsSource{ client: client, + queueURL: *urlResult.QueueUrl, queueName: queueName, concurrentWrites: concurrentWrites, region: region, @@ -139,14 +149,6 @@ func newSQSSourceWithInterfaces(client sqsiface.SQSAPI, awsAccountID string, con func (ss *sqsSource) Read(sf *sourceiface.SourceFunctions) error { ss.log.Info("Reading messages from queue ...") - urlResult, err := ss.client.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String(ss.queueName), - }) - if err != nil { - return errors.Wrap(err, "Failed to get SQS queue URL") - } - ss.queueURL = *urlResult.QueueUrl - throttle := make(chan struct{}, ss.concurrentWrites) wg := sync.WaitGroup{} diff --git a/pkg/source/sqs/sqs_source_test.go b/pkg/source/sqs/sqs_source_test.go index e818421a..669d7b2c 100644 --- a/pkg/source/sqs/sqs_source_test.go +++ b/pkg/source/sqs/sqs_source_test.go @@ -74,8 +74,7 @@ func TestNewSQSSourceWithInterfaces_Failure(t *testing.T) { } */ -// TODO: When we address https://github.com/snowplow/snowbridge/issues/151, this test will need to change. -func TestSQSSource_ReadFailure(t *testing.T) { +func TestSQSSource_SetupFailure(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") } @@ -84,12 +83,7 @@ func TestSQSSource_ReadFailure(t *testing.T) { client := testutil.GetAWSLocalstackSQSClient() - source, err := newSQSSourceWithInterfaces(client, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists") - assert.Nil(err) - assert.NotNil(source) - assert.Equal("arn:aws:sqs:us-east-1:00000000000:not-exists", source.GetID()) - - err = source.Read(nil) + _, err := newSQSSourceWithInterfaces(client, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists") assert.NotNil(err) if err != nil { assert.Contains(err.Error(), "Failed to get SQS queue URL:")