Skip to content

Commit

Permalink
Added backoff for SQS to reduce logging (#2326) (#2358)
Browse files Browse the repository at this point in the history
(cherry picked from commit 99f2cfe)

Co-authored-by: Asif Sohail Mohammed <[email protected]>
  • Loading branch information
1 parent ec1cb00 commit 94f734b
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 7 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/s3-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ repositories {

dependencies {
implementation project(':data-prepper-api')
implementation libs.armeria.core
implementation 'io.micrometer:micrometer-core'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:sts'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source;

import com.linecorp.armeria.client.retry.Backoff;
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.source.configuration.OnErrorOption;
Expand Down Expand Up @@ -44,6 +45,7 @@ class SqsWorkerIT {
private PluginMetrics pluginMetrics;
private S3ObjectGenerator s3ObjectGenerator;
private String bucket;
private Backoff backoff;

@BeforeEach
void setUp() {
Expand All @@ -57,6 +59,9 @@ void setUp() {
.region(Region.of(System.getProperty("tests.s3source.region")))
.build();

backoff = Backoff.exponential(SqsService.INITIAL_DELAY, SqsService.MAXIMUM_DELAY).withJitter(SqsService.JITTER_RATE)
.withMaxAttempts(Integer.MAX_VALUE);

s3SourceConfig = mock(S3SourceConfig.class);
s3Service = mock(S3Service.class);

Expand All @@ -79,7 +84,7 @@ void setUp() {
}

private SqsWorker createObjectUnderTest() {
return new SqsWorker(sqsClient, s3Service, s3SourceConfig, pluginMetrics);
return new SqsWorker(sqsClient, s3Service, s3SourceConfig, pluginMetrics, backoff);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source;

import com.linecorp.armeria.client.retry.Backoff;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -14,6 +15,9 @@

public class SqsService {
private static final Logger LOG = LoggerFactory.getLogger(SqsService.class);
static final int INITIAL_DELAY = 1000;
static final int MAXIMUM_DELAY = 5 * 60 * 1000;
static final double JITTER_RATE = 0.20;

private final S3SourceConfig s3SourceConfig;
private final S3Service s3Accessor;
Expand All @@ -32,7 +36,9 @@ public SqsService(final S3SourceConfig s3SourceConfig,
}

public void start() {
sqsWorkerThread = new Thread(new SqsWorker(sqsClient, s3Accessor, s3SourceConfig, pluginMetrics));
final Backoff backoff = Backoff.exponential(INITIAL_DELAY, MAXIMUM_DELAY).withJitter(JITTER_RATE)
.withMaxAttempts(Integer.MAX_VALUE);
sqsWorkerThread = new Thread(new SqsWorker(sqsClient, s3Accessor, s3SourceConfig, pluginMetrics, backoff));
sqsWorkerThread.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
package org.opensearch.dataprepper.plugins.source;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.linecorp.armeria.client.retry.Backoff;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.source.configuration.OnErrorOption;
import org.opensearch.dataprepper.plugins.source.configuration.SqsOptions;
import org.opensearch.dataprepper.plugins.source.exception.SqsRetriesExhaustedException;
import org.opensearch.dataprepper.plugins.source.filter.ObjectCreatedFilter;
import org.opensearch.dataprepper.plugins.source.filter.S3EventFilter;
import org.slf4j.Logger;
Expand Down Expand Up @@ -49,16 +51,21 @@ public class SqsWorker implements Runnable {
private final Counter sqsMessagesDeletedCounter;
private final Counter sqsMessagesFailedCounter;
private final Timer sqsMessageDelayTimer;
private final Backoff standardBackoff;
private int failedAttemptCount;

public SqsWorker(final SqsClient sqsClient,
final S3Service s3Service,
final S3SourceConfig s3SourceConfig,
final PluginMetrics pluginMetrics) {
final PluginMetrics pluginMetrics,
final Backoff backoff) {
this.sqsClient = sqsClient;
this.s3Service = s3Service;
this.s3SourceConfig = s3SourceConfig;
this.standardBackoff = backoff;
sqsOptions = s3SourceConfig.getSqsOptions();
objectCreatedFilter = new ObjectCreatedFilter();
failedAttemptCount = 0;

sqsMessagesReceivedCounter = pluginMetrics.counter(SQS_MESSAGES_RECEIVED_METRIC_NAME);
sqsMessagesDeletedCounter = pluginMetrics.counter(SQS_MESSAGES_DELETED_METRIC_NAME);
Expand Down Expand Up @@ -108,13 +115,29 @@ int processSqsMessages() {
private List<Message> getMessagesFromSqs() {
try {
final ReceiveMessageRequest receiveMessageRequest = createReceiveMessageRequest();
return sqsClient.receiveMessage(receiveMessageRequest).messages();
} catch (SqsException e) {
LOG.error("Error reading from SQS: {}", e.getMessage());
final List<Message> messages = sqsClient.receiveMessage(receiveMessageRequest).messages();
failedAttemptCount = 0;
return messages;
} catch (final SqsException e) {
LOG.error("Error reading from SQS: {}. Retrying with exponential backoff.", e.getMessage());
applyBackoff();
return Collections.emptyList();
}
}

private void applyBackoff() {
final long delayMillis = standardBackoff.nextDelayMillis(++failedAttemptCount);
if (delayMillis < 0) {
Thread.currentThread().interrupt();
throw new SqsRetriesExhaustedException("SQS retries exhausted. Make sure that SQS configuration is valid and queue exists.");
}
try {
Thread.sleep(delayMillis);
} catch (final InterruptedException e){
LOG.error("Thread is interrupted while polling SQS with retry.", e);
}
}

private ReceiveMessageRequest createReceiveMessageRequest() {
return ReceiveMessageRequest.builder()
.queueUrl(sqsOptions.getSqsUrl())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.opensearch.dataprepper.plugins.source.exception;

/**
* This exception is thrown when SQS retries are exhausted
*
* @since 2.1
*/
public class SqsRetriesExhaustedException extends RuntimeException {

public SqsRetriesExhaustedException(final String errorMessage) {
super(errorMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

package org.opensearch.dataprepper.plugins.source;

import com.linecorp.armeria.client.retry.Backoff;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.source.configuration.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.source.configuration.OnErrorOption;
import org.opensearch.dataprepper.plugins.source.configuration.SqsOptions;
import org.opensearch.dataprepper.plugins.source.exception.SqsRetriesExhaustedException;
import org.opensearch.dataprepper.plugins.source.filter.ObjectCreatedFilter;
import org.opensearch.dataprepper.plugins.source.filter.S3EventFilter;
import io.micrometer.core.instrument.Counter;
Expand Down Expand Up @@ -37,6 +39,8 @@
import java.util.Collections;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.opensearch.dataprepper.plugins.source.SqsWorker.SQS_MESSAGES_DELETED_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.source.SqsWorker.SQS_MESSAGES_FAILED_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.source.SqsWorker.SQS_MESSAGES_RECEIVED_METRIC_NAME;
Expand All @@ -62,6 +66,7 @@ class SqsWorkerTest {
private S3SourceConfig s3SourceConfig;
private S3EventFilter objectCreatedFilter;
private PluginMetrics pluginMetrics;
private Backoff backoff;
private Counter sqsMessagesReceivedCounter;
private Counter sqsMessagesDeletedCounter;
private Counter sqsMessagesFailedCounter;
Expand All @@ -73,6 +78,7 @@ void setUp() {
s3Service = mock(S3Service.class);
s3SourceConfig = mock(S3SourceConfig.class);
objectCreatedFilter = new ObjectCreatedFilter();
backoff = mock(Backoff.class);

AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class);
when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1);
Expand All @@ -99,7 +105,7 @@ void setUp() {
when(pluginMetrics.counter(SQS_MESSAGES_FAILED_METRIC_NAME)).thenReturn(sqsMessagesFailedCounter);
when(pluginMetrics.timer(SQS_MESSAGE_DELAY_METRIC_NAME)).thenReturn(sqsMessageDelayTimer);

sqsWorker = new SqsWorker(sqsClient, s3Service, s3SourceConfig, pluginMetrics);
sqsWorker = new SqsWorker(sqsClient, s3Service, s3SourceConfig, pluginMetrics, backoff);
}

@AfterEach
Expand Down Expand Up @@ -182,6 +188,21 @@ void processSqsMessages_should_return_zero_messages_when_a_SqsException_is_throw
verify(sqsClient, never()).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
}

@Test
void processSqsMessages_should_return_zero_messages_with_backoff_when_a_SqsException_is_thrown() {
when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenThrow(SqsException.class);
final int messagesProcessed = sqsWorker.processSqsMessages();
verify(backoff).nextDelayMillis(1);
assertThat(messagesProcessed, equalTo(0));
}

@Test
void processSqsMessages_should_throw_when_a_SqsException_is_thrown_with_max_retries() {
when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenThrow(SqsException.class);
when(backoff.nextDelayMillis(anyInt())).thenReturn((long) -1);
assertThrows(SqsRetriesExhaustedException.class, () -> sqsWorker.processSqsMessages());
}

@ParameterizedTest
@ValueSource(strings = {"", "{\"foo\": \"bar\""})
void processSqsMessages_should_not_interact_with_S3Service_if_input_is_not_valid_JSON(String inputString) {
Expand Down

0 comments on commit 94f734b

Please sign in to comment.