diff --git a/data-prepper-plugins/s3-source/build.gradle b/data-prepper-plugins/s3-source/build.gradle index 8b291dce43..c7c5d7e129 100644 --- a/data-prepper-plugins/s3-source/build.gradle +++ b/data-prepper-plugins/s3-source/build.gradle @@ -13,6 +13,7 @@ repositories { dependencies { implementation project(':data-prepper-api') + implementation libs.armeria.core implementation 'io.micrometer:micrometer-core' implementation 'software.amazon.awssdk:s3' implementation 'software.amazon.awssdk:sts' diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/SqsWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/SqsWorkerIT.java index dd3936f93c..6b57a5b28c 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/SqsWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/SqsWorkerIT.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source; +import com.linecorp.armeria.client.retry.Backoff; import io.micrometer.core.instrument.DistributionSummary; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.source.configuration.OnErrorOption; @@ -44,6 +45,7 @@ class SqsWorkerIT { private PluginMetrics pluginMetrics; private S3ObjectGenerator s3ObjectGenerator; private String bucket; + private Backoff backoff; @BeforeEach void setUp() { @@ -57,6 +59,9 @@ void setUp() { .region(Region.of(System.getProperty("tests.s3source.region"))) .build(); + backoff = Backoff.exponential(SqsService.INITIAL_DELAY, SqsService.MAXIMUM_DELAY).withJitter(SqsService.JITTER_RATE) + .withMaxAttempts(Integer.MAX_VALUE); + s3SourceConfig = mock(S3SourceConfig.class); s3Service = mock(S3Service.class); @@ -79,7 +84,7 @@ void setUp() { } private SqsWorker createObjectUnderTest() { - return new SqsWorker(sqsClient, s3Service, s3SourceConfig, pluginMetrics); + return new SqsWorker(sqsClient, s3Service, s3SourceConfig, pluginMetrics, backoff); } @AfterEach diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsService.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsService.java index 2ac97e0149..8190612366 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsService.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsService.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source; +import com.linecorp.armeria.client.retry.Backoff; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,6 +15,9 @@ public class SqsService { private static final Logger LOG = LoggerFactory.getLogger(SqsService.class); + static final int INITIAL_DELAY = 1000; + static final int MAXIMUM_DELAY = 5 * 60 * 1000; + static final double JITTER_RATE = 0.20; private final S3SourceConfig s3SourceConfig; private final S3Service s3Accessor; @@ -32,7 +36,9 @@ public SqsService(final S3SourceConfig s3SourceConfig, } public void start() { - sqsWorkerThread = new Thread(new SqsWorker(sqsClient, s3Accessor, s3SourceConfig, pluginMetrics)); + final Backoff backoff = Backoff.exponential(INITIAL_DELAY, MAXIMUM_DELAY).withJitter(JITTER_RATE) + .withMaxAttempts(Integer.MAX_VALUE); + sqsWorkerThread = new Thread(new SqsWorker(sqsClient, s3Accessor, s3SourceConfig, pluginMetrics, backoff)); sqsWorkerThread.start(); } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java index aee6cf83ca..2e2a2a0766 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java @@ -6,11 +6,13 @@ package org.opensearch.dataprepper.plugins.source; import com.fasterxml.jackson.core.JsonProcessingException; +import com.linecorp.armeria.client.retry.Backoff; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Timer; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.source.configuration.OnErrorOption; import org.opensearch.dataprepper.plugins.source.configuration.SqsOptions; +import org.opensearch.dataprepper.plugins.source.exception.SqsRetriesExhaustedException; import org.opensearch.dataprepper.plugins.source.filter.ObjectCreatedFilter; import org.opensearch.dataprepper.plugins.source.filter.S3EventFilter; import org.slf4j.Logger; @@ -49,16 +51,21 @@ public class SqsWorker implements Runnable { private final Counter sqsMessagesDeletedCounter; private final Counter sqsMessagesFailedCounter; private final Timer sqsMessageDelayTimer; + private final Backoff standardBackoff; + private int failedAttemptCount; public SqsWorker(final SqsClient sqsClient, final S3Service s3Service, final S3SourceConfig s3SourceConfig, - final PluginMetrics pluginMetrics) { + final PluginMetrics pluginMetrics, + final Backoff backoff) { this.sqsClient = sqsClient; this.s3Service = s3Service; this.s3SourceConfig = s3SourceConfig; + this.standardBackoff = backoff; sqsOptions = s3SourceConfig.getSqsOptions(); objectCreatedFilter = new ObjectCreatedFilter(); + failedAttemptCount = 0; sqsMessagesReceivedCounter = pluginMetrics.counter(SQS_MESSAGES_RECEIVED_METRIC_NAME); sqsMessagesDeletedCounter = pluginMetrics.counter(SQS_MESSAGES_DELETED_METRIC_NAME); @@ -108,13 +115,29 @@ int processSqsMessages() { private List getMessagesFromSqs() { try { final ReceiveMessageRequest receiveMessageRequest = createReceiveMessageRequest(); - return sqsClient.receiveMessage(receiveMessageRequest).messages(); - } catch (SqsException e) { - LOG.error("Error reading from SQS: {}", e.getMessage()); + final List messages = sqsClient.receiveMessage(receiveMessageRequest).messages(); + failedAttemptCount = 0; + return messages; + } catch (final SqsException e) { + LOG.error("Error reading from SQS: {}. Retrying with exponential backoff.", e.getMessage()); + applyBackoff(); return Collections.emptyList(); } } + private void applyBackoff() { + final long delayMillis = standardBackoff.nextDelayMillis(++failedAttemptCount); + if (delayMillis < 0) { + Thread.currentThread().interrupt(); + throw new SqsRetriesExhaustedException("SQS retries exhausted. Make sure that SQS configuration is valid and queue exists."); + } + try { + Thread.sleep(delayMillis); + } catch (final InterruptedException e){ + LOG.error("Thread is interrupted while polling SQS with retry.", e); + } + } + private ReceiveMessageRequest createReceiveMessageRequest() { return ReceiveMessageRequest.builder() .queueUrl(sqsOptions.getSqsUrl()) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/exception/SqsRetriesExhaustedException.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/exception/SqsRetriesExhaustedException.java new file mode 100644 index 0000000000..0c6a90b9fd --- /dev/null +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/exception/SqsRetriesExhaustedException.java @@ -0,0 +1,13 @@ +package org.opensearch.dataprepper.plugins.source.exception; + +/** + * This exception is thrown when SQS retries are exhausted + * + * @since 2.1 + */ +public class SqsRetriesExhaustedException extends RuntimeException { + + public SqsRetriesExhaustedException(final String errorMessage) { + super(errorMessage); + } +} diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/SqsWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/SqsWorkerTest.java index 8149b96de0..96f8d7512f 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/SqsWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/SqsWorkerTest.java @@ -5,10 +5,12 @@ package org.opensearch.dataprepper.plugins.source; +import com.linecorp.armeria.client.retry.Backoff; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.source.configuration.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.source.configuration.OnErrorOption; import org.opensearch.dataprepper.plugins.source.configuration.SqsOptions; +import org.opensearch.dataprepper.plugins.source.exception.SqsRetriesExhaustedException; import org.opensearch.dataprepper.plugins.source.filter.ObjectCreatedFilter; import org.opensearch.dataprepper.plugins.source.filter.S3EventFilter; import io.micrometer.core.instrument.Counter; @@ -37,6 +39,8 @@ import java.util.Collections; import java.util.UUID; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyInt; import static org.opensearch.dataprepper.plugins.source.SqsWorker.SQS_MESSAGES_DELETED_METRIC_NAME; import static org.opensearch.dataprepper.plugins.source.SqsWorker.SQS_MESSAGES_FAILED_METRIC_NAME; import static org.opensearch.dataprepper.plugins.source.SqsWorker.SQS_MESSAGES_RECEIVED_METRIC_NAME; @@ -62,6 +66,7 @@ class SqsWorkerTest { private S3SourceConfig s3SourceConfig; private S3EventFilter objectCreatedFilter; private PluginMetrics pluginMetrics; + private Backoff backoff; private Counter sqsMessagesReceivedCounter; private Counter sqsMessagesDeletedCounter; private Counter sqsMessagesFailedCounter; @@ -73,6 +78,7 @@ void setUp() { s3Service = mock(S3Service.class); s3SourceConfig = mock(S3SourceConfig.class); objectCreatedFilter = new ObjectCreatedFilter(); + backoff = mock(Backoff.class); AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1); @@ -99,7 +105,7 @@ void setUp() { when(pluginMetrics.counter(SQS_MESSAGES_FAILED_METRIC_NAME)).thenReturn(sqsMessagesFailedCounter); when(pluginMetrics.timer(SQS_MESSAGE_DELAY_METRIC_NAME)).thenReturn(sqsMessageDelayTimer); - sqsWorker = new SqsWorker(sqsClient, s3Service, s3SourceConfig, pluginMetrics); + sqsWorker = new SqsWorker(sqsClient, s3Service, s3SourceConfig, pluginMetrics, backoff); } @AfterEach @@ -182,6 +188,21 @@ void processSqsMessages_should_return_zero_messages_when_a_SqsException_is_throw verify(sqsClient, never()).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); } + @Test + void processSqsMessages_should_return_zero_messages_with_backoff_when_a_SqsException_is_thrown() { + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenThrow(SqsException.class); + final int messagesProcessed = sqsWorker.processSqsMessages(); + verify(backoff).nextDelayMillis(1); + assertThat(messagesProcessed, equalTo(0)); + } + + @Test + void processSqsMessages_should_throw_when_a_SqsException_is_thrown_with_max_retries() { + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenThrow(SqsException.class); + when(backoff.nextDelayMillis(anyInt())).thenReturn((long) -1); + assertThrows(SqsRetriesExhaustedException.class, () -> sqsWorker.processSqsMessages()); + } + @ParameterizedTest @ValueSource(strings = {"", "{\"foo\": \"bar\""}) void processSqsMessages_should_not_interact_with_S3Service_if_input_is_not_valid_JSON(String inputString) {