diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourceCoordinator.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourceCoordinator.java index 9341a13e59..5423732b6f 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourceCoordinator.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourceCoordinator.java @@ -46,9 +46,10 @@ public interface EnhancedSourceCoordinator { * * @param partition The partition to be updated. * @param The progress state class + * @param ownershipTimeoutRenewal The amount of time to update ownership of the partition before another instance can acquire it. * @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException when the partition was not updated successfully */ - void saveProgressStateForPartition(EnhancedSourcePartition partition); + void saveProgressStateForPartition(EnhancedSourcePartition partition, Duration ownershipTimeoutRenewal); /** * This method is used to release the lease of a partition in the coordination store. diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java index 7ada401383..5d4fb3ae77 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java @@ -127,7 +127,7 @@ public Optional acquireAvailablePartition(String partit @Override - public void saveProgressStateForPartition(EnhancedSourcePartition partition) { + public void saveProgressStateForPartition(EnhancedSourcePartition partition, final Duration ownershipTimeoutRenewal) { String partitionType = partition.getPartitionType() == null ? DEFAULT_GLOBAL_STATE_PARTITION_TYPE : partition.getPartitionType(); LOG.debug("Try to save progress for partition {} (Type {})", partition.getPartitionKey(), partitionType); @@ -140,7 +140,7 @@ public void saveProgressStateForPartition(EnhancedSourcePartition partiti final SourcePartitionStoreItem updateItem = partition.getSourcePartitionStoreItem(); // Also extend the timeout of the lease (ownership) if (updateItem.getPartitionOwnershipTimeout() != null) { - updateItem.setPartitionOwnershipTimeout(Instant.now().plus(DEFAULT_LEASE_TIMEOUT)); + updateItem.setPartitionOwnershipTimeout(Instant.now().plus(ownershipTimeoutRenewal == null ? DEFAULT_LEASE_TIMEOUT : ownershipTimeoutRenewal)); } updateItem.setPartitionProgressState(partition.convertPartitionProgressStatetoString(partition.getProgressState())); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinatorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinatorTest.java index 4e6e40c57a..6a1122ee56 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinatorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinatorTest.java @@ -163,7 +163,7 @@ void test_saveProgressStateForPartition() { Optional sourcePartition = coordinator.acquireAvailablePartition(DEFAULT_PARTITION_TYPE); assertThat(sourcePartition.isPresent(), equalTo(true)); TestEnhancedSourcePartition partition = (TestEnhancedSourcePartition) sourcePartition.get(); - coordinator.saveProgressStateForPartition(partition); + coordinator.saveProgressStateForPartition(partition, null); verify(sourceCoordinationStore).tryAcquireAvailablePartition(anyString(), anyString(), any(Duration.class)); verify(sourceCoordinationStore).tryUpdateSourcePartitionItem(any(SourcePartitionStoreItem.class)); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/ClientFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/ClientFactory.java index 7d72049a6a..0704694d0c 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/ClientFactory.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/ClientFactory.java @@ -8,7 +8,10 @@ import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.AwsAuthenticationConfig; +import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.ExportConfig; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; import software.amazon.awssdk.services.s3.S3Client; @@ -17,8 +20,11 @@ public class ClientFactory { private final AwsCredentialsProvider awsCredentialsProvider; private final AwsAuthenticationConfig awsAuthenticationConfig; + private final ExportConfig exportConfig; - public ClientFactory(AwsCredentialsSupplier awsCredentialsSupplier, AwsAuthenticationConfig awsAuthenticationConfig) { + public ClientFactory(final AwsCredentialsSupplier awsCredentialsSupplier, + final AwsAuthenticationConfig awsAuthenticationConfig, + final ExportConfig exportConfig) { awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder() .withRegion(awsAuthenticationConfig.getAwsRegion()) .withStsRoleArn(awsAuthenticationConfig.getAwsStsRoleArn()) @@ -26,6 +32,7 @@ public ClientFactory(AwsCredentialsSupplier awsCredentialsSupplier, AwsAuthentic .withStsHeaderOverrides(awsAuthenticationConfig.getAwsStsHeaderOverrides()) .build()); this.awsAuthenticationConfig = awsAuthenticationConfig; + this.exportConfig = exportConfig; } @@ -47,8 +54,20 @@ public DynamoDbClient buildDynamoDBClient() { public S3Client buildS3Client() { return S3Client.builder() + .region(getS3ClientRegion()) .credentialsProvider(awsCredentialsProvider) + .overrideConfiguration(ClientOverrideConfiguration.builder() + .retryPolicy(retryPolicy -> retryPolicy.numRetries(5).build()) + .build()) .build(); } + private Region getS3ClientRegion() { + if (exportConfig != null && exportConfig.getAwsRegion() != null) { + return exportConfig.getAwsRegion(); + } + + return awsAuthenticationConfig.getAwsRegion(); + } + } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java index f09b5b8ded..1916a5344f 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; @@ -59,6 +60,8 @@ public class DynamoDBService { private final EnhancedSourceCoordinator coordinator; private final DynamoDbClient dynamoDbClient; + + private final DynamoDBSourceConfig dynamoDBSourceConfig; // private final DynamoDbStreamsClient dynamoDbStreamsClient; @@ -70,13 +73,21 @@ public class DynamoDBService { private final PluginMetrics pluginMetrics; + private final AcknowledgementSetManager acknowledgementSetManager; + static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000; - public DynamoDBService(EnhancedSourceCoordinator coordinator, ClientFactory clientFactory, DynamoDBSourceConfig sourceConfig, PluginMetrics pluginMetrics) { + public DynamoDBService(final EnhancedSourceCoordinator coordinator, + final ClientFactory clientFactory, + final DynamoDBSourceConfig sourceConfig, + final PluginMetrics pluginMetrics, + final AcknowledgementSetManager acknowledgementSetManager) { this.coordinator = coordinator; this.pluginMetrics = pluginMetrics; + this.acknowledgementSetManager = acknowledgementSetManager; + this.dynamoDBSourceConfig = sourceConfig; // Initialize AWS clients dynamoDbClient = clientFactory.buildDynamoDBClient(); @@ -103,10 +114,10 @@ public void start(Buffer> buffer) { Runnable exportScheduler = new ExportScheduler(coordinator, dynamoDbClient, manifestFileReader, pluginMetrics); DataFileLoaderFactory loaderFactory = new DataFileLoaderFactory(coordinator, s3Client, pluginMetrics, buffer); - Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics); + Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig); ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, shardManager, buffer); - Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, shardManager, pluginMetrics); + Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, shardManager, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig); // May consider start or shutdown the scheduler on demand // Currently, event after the exports are done, the related scheduler will not be shutdown diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java index ffb1809ac6..7ecd565ec4 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; @@ -39,19 +40,26 @@ public class DynamoDBSource implements Source>, UsesEnhancedSource private final ClientFactory clientFactory; + private final AcknowledgementSetManager acknowledgementSetManager; + private EnhancedSourceCoordinator coordinator; private DynamoDBService dynamoDBService; @DataPrepperPluginConstructor - public DynamoDBSource(PluginMetrics pluginMetrics, final DynamoDBSourceConfig sourceConfig, final PluginFactory pluginFactory, final AwsCredentialsSupplier awsCredentialsSupplier) { + public DynamoDBSource(final PluginMetrics pluginMetrics, + final DynamoDBSourceConfig sourceConfig, + final PluginFactory pluginFactory, + final AwsCredentialsSupplier awsCredentialsSupplier, + final AcknowledgementSetManager acknowledgementSetManager) { LOG.info("Create DynamoDB Source"); this.pluginMetrics = pluginMetrics; this.sourceConfig = sourceConfig; this.pluginFactory = pluginFactory; + this.acknowledgementSetManager = acknowledgementSetManager; - clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig()); + clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig(), sourceConfig.getTableConfigs().get(0).getExportConfig()); } @Override @@ -61,7 +69,7 @@ public void start(Buffer> buffer) { coordinator.createPartition(new InitPartition()); // Create DynamoDB Service - dynamoDBService = new DynamoDBService(coordinator, clientFactory, sourceConfig, pluginMetrics); + dynamoDBService = new DynamoDBService(coordinator, clientFactory, sourceConfig, pluginMetrics, acknowledgementSetManager); dynamoDBService.init(); LOG.info("Start DynamoDB service"); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java index 6e4a8fe7ae..c375ffa3db 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java @@ -7,10 +7,13 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; +import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotNull; import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.AwsAuthenticationConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.TableConfig; +import java.time.Duration; +import java.util.Collections; import java.util.List; /** @@ -19,14 +22,22 @@ public class DynamoDBSourceConfig { @JsonProperty("tables") - private List tableConfigs; - + private List tableConfigs = Collections.emptyList(); @JsonProperty("aws") @NotNull @Valid private AwsAuthenticationConfig awsAuthenticationConfig; + @JsonProperty("acknowledgments") + private boolean acknowledgments = false; + + @JsonProperty("shard_acknowledgment_timeout") + private Duration shardAcknowledgmentTimeout = Duration.ofMinutes(10); + + @JsonProperty("s3_data_file_acknowledgment_timeout") + private Duration dataFileAcknowledgmentTimeout = Duration.ofMinutes(15); + public DynamoDBSourceConfig() { } @@ -38,4 +49,19 @@ public List getTableConfigs() { public AwsAuthenticationConfig getAwsAuthenticationConfig() { return awsAuthenticationConfig; } + + public boolean isAcknowledgmentsEnabled() { + return acknowledgments; + } + + public Duration getShardAcknowledgmentTimeout() { + return shardAcknowledgmentTimeout; + } + + public Duration getDataFileAcknowledgmentTimeout() { return dataFileAcknowledgmentTimeout; } + + @AssertTrue(message = "Exactly one table must be configured for the DynamoDb source.") + boolean isExactlyOneTableConfigured() { + return tableConfigs.size() == 1; + } } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/ExportConfig.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/ExportConfig.java index cb3463a3b6..d34017599a 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/ExportConfig.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/ExportConfig.java @@ -7,15 +7,20 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.NotBlank; +import software.amazon.awssdk.regions.Region; public class ExportConfig { @JsonProperty("s3_bucket") @NotBlank(message = "Bucket Name is required for export") private String s3Bucket; + @JsonProperty("s3_prefix") private String s3Prefix; + @JsonProperty("s3_region") + private String s3Region; + public String getS3Bucket() { return s3Bucket; } @@ -23,4 +28,9 @@ public String getS3Bucket() { public String getS3Prefix() { return s3Prefix; } + + public Region getAwsRegion() { + return s3Region != null ? Region.of(s3Region) : null; + } + } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java index 9f770dff69..c259ccbcaa 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java @@ -10,6 +10,7 @@ import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; @@ -58,13 +59,13 @@ String getEventType() { return "EXPORT"; } - public void writeToBuffer(List lines) { + public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List lines) { int eventCount = 0; for (String line : lines) { Map data = (Map) convertToMap(line).get(ITEM_KEY); try { - addToBuffer(data); + addToBuffer(acknowledgementSet, data); eventCount++; } catch (Exception e) { // will this cause too many logs? diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java index 4d5693db64..cf820ff734 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.converter; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.event.JacksonEvent; @@ -70,7 +71,7 @@ void flushBuffer() throws Exception { * @param eventName Event name * @throws Exception Exception if failed to write to buffer. */ - public void addToBuffer(Map data, Map keys, long eventCreationTimeMillis, String eventName) throws Exception { + public void addToBuffer(final AcknowledgementSet acknowledgementSet, Map data, Map keys, long eventCreationTimeMillis, String eventName) throws Exception { Event event = JacksonEvent.builder() .withEventType(getEventType()) .withData(data) @@ -91,12 +92,15 @@ public void addToBuffer(Map data, Map keys, long eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, partitionKey); } bufferAccumulator.add(new Record<>(event)); + if (acknowledgementSet != null) { + acknowledgementSet.add(event); + } } - public void addToBuffer(Map data) throws Exception { + public void addToBuffer(final AcknowledgementSet acknowledgementSet, Map data) throws Exception { // Export data doesn't have an event timestamp // Default to current timestamp when the event is added to buffer - addToBuffer(data, data, System.currentTimeMillis(), null); + addToBuffer(acknowledgementSet, data, data, System.currentTimeMillis(), null); } private String mapStreamEventNameToBulkAction(final String streamEventName) { diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java index 4b0bba0a4a..17c4e41e93 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java @@ -11,6 +11,7 @@ import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.slf4j.Logger; @@ -53,7 +54,7 @@ String getEventType() { } - public void writeToBuffer(List records) { + public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List records) { int eventCount = 0; for (Record record : records) { @@ -63,7 +64,7 @@ public void writeToBuffer(List records) { Map keys = convertKeys(record.dynamodb().keys()); try { - addToBuffer(data, keys, record.dynamodb().approximateCreationDateTime().toEpochMilli(), record.eventNameAsString()); + addToBuffer(acknowledgementSet, data, keys, record.dynamodb().approximateCreationDateTime().toEpochMilli(), record.eventNameAsString()); eventCount++; } catch (Exception e) { // will this cause too many logs? diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/DataFilePartition.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/DataFilePartition.java index 63222b0d22..14422c6e8c 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/DataFilePartition.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/DataFilePartition.java @@ -23,7 +23,7 @@ public class DataFilePartition extends EnhancedSourcePartition> bufferAccumulator = BufferAccumulator.create(builder.buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); recordConverter = new ExportRecordConverter(bufferAccumulator, builder.tableInfo, builder.pluginMetrics); + this.acknowledgementSet = builder.acknowledgementSet; + this.dataFileAcknowledgmentTimeout = builder.dataFileAcknowledgmentTimeout; } public static Builder builder(final S3ObjectReader s3ObjectReader, final PluginMetrics pluginMetrics, final Buffer> buffer) { @@ -100,6 +105,10 @@ static class Builder { private String key; + private AcknowledgementSet acknowledgementSet; + + private Duration dataFileAcknowledgmentTimeout; + private int startLine; public Builder(final S3ObjectReader objectReader, final PluginMetrics pluginMetrics, final Buffer> buffer) { @@ -133,6 +142,16 @@ public Builder startLine(int startLine) { return this; } + public Builder acknowledgmentSet(AcknowledgementSet acknowledgementSet) { + this.acknowledgementSet = acknowledgementSet; + return this; + } + + public Builder acknowledgmentSetTimeout(Duration dataFileAcknowledgmentTimeout) { + this.dataFileAcknowledgmentTimeout = dataFileAcknowledgmentTimeout; + return this; + } + public DataFileLoader build() { return new DataFileLoader(this); } @@ -174,7 +193,7 @@ public void run() { if ((lineCount - startLine) % DEFAULT_BATCH_SIZE == 0) { // LOG.debug("Write to buffer for line " + (lineCount - DEFAULT_BATCH_SIZE) + " to " + lineCount); - recordConverter.writeToBuffer(lines); + recordConverter.writeToBuffer(acknowledgementSet, lines); lines.clear(); lastLineProcessed = lineCount; } @@ -189,17 +208,22 @@ public void run() { } if (!lines.isEmpty()) { // Do final checkpoint. - recordConverter.writeToBuffer(lines); + recordConverter.writeToBuffer(acknowledgementSet, lines); checkpointer.checkpoint(lineCount); } lines.clear(); - reader.close(); gzipInputStream.close(); inputStream.close(); - LOG.info("Complete loading s3://{}/{}", bucketName, key); - } catch (IOException e) { + + LOG.info("Completed loading s3://{}/{} to buffer", bucketName, key); + + if (acknowledgementSet != null) { + checkpointer.updateDatafileForAcknowledgmentWait(dataFileAcknowledgmentTimeout); + acknowledgementSet.complete(); + } + } catch (Exception e) { checkpointer.checkpoint(lineCount); String errorMessage = String.format("Loading of s3://%s/%s completed with Exception: %S", bucketName, key, e.getMessage()); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactory.java index c8e5318f19..7fee416bbc 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactory.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactory.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.export; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; @@ -14,6 +15,8 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import software.amazon.awssdk.services.s3.S3Client; +import java.time.Duration; + /** * Factory class for DataFileLoader thread. */ @@ -25,14 +28,20 @@ public class DataFileLoaderFactory { private final PluginMetrics pluginMetrics; private final Buffer> buffer; - public DataFileLoaderFactory(EnhancedSourceCoordinator coordinator, S3Client s3Client, PluginMetrics pluginMetrics, final Buffer> buffer) { + public DataFileLoaderFactory(final EnhancedSourceCoordinator coordinator, + final S3Client s3Client, + final PluginMetrics pluginMetrics, + final Buffer> buffer) { this.coordinator = coordinator; this.pluginMetrics = pluginMetrics; this.buffer = buffer; objectReader = new S3ObjectReader(s3Client); } - public Runnable createDataFileLoader(DataFilePartition dataFilePartition, TableInfo tableInfo) { + public Runnable createDataFileLoader(final DataFilePartition dataFilePartition, + final TableInfo tableInfo, + final AcknowledgementSet acknowledgementSet, + final Duration acknowledgmentTimeout) { DataFileCheckpointer checkpointer = new DataFileCheckpointer(coordinator, dataFilePartition); @@ -42,7 +51,10 @@ public Runnable createDataFileLoader(DataFilePartition dataFilePartition, TableI .key(dataFilePartition.getKey()) .tableInfo(tableInfo) .checkpointer(checkpointer) - .startLine(dataFilePartition.getProgressState().get().getLoaded()) + .acknowledgmentSet(acknowledgementSet) + .acknowledgmentSetTimeout(acknowledgmentTimeout) + // We can't checkpoint with acks enabled yet + .startLine(acknowledgementSet == null ? dataFilePartition.getProgressState().get().getLoaded() : 0) .build(); return loader; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java index 79b99bf817..3f0ae2dd82 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java @@ -7,8 +7,11 @@ import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.dynamodb.model.LoadStatus; @@ -54,15 +57,26 @@ public class DataFileScheduler implements Runnable { private final PluginMetrics pluginMetrics; + private final AcknowledgementSetManager acknowledgementSetManager; + + private final DynamoDBSourceConfig dynamoDBSourceConfig; + private final Counter exportFileSuccessCounter; private final AtomicLong activeExportS3ObjectConsumersGauge; - public DataFileScheduler(EnhancedSourceCoordinator coordinator, DataFileLoaderFactory loaderFactory, PluginMetrics pluginMetrics) { + public DataFileScheduler(final EnhancedSourceCoordinator coordinator, + final DataFileLoaderFactory loaderFactory, + final PluginMetrics pluginMetrics, + final AcknowledgementSetManager acknowledgementSetManager, + final DynamoDBSourceConfig dynamoDBSourceConfig) { this.coordinator = coordinator; this.pluginMetrics = pluginMetrics; this.loaderFactory = loaderFactory; + this.acknowledgementSetManager = acknowledgementSetManager; + this.dynamoDBSourceConfig = dynamoDBSourceConfig; + executor = Executors.newFixedThreadPool(MAX_JOB_COUNT); this.exportFileSuccessCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_PROCESSED_COUNT); @@ -74,16 +88,36 @@ private void processDataFilePartition(DataFilePartition dataFilePartition) { String tableArn = getTableArn(exportArn); TableInfo tableInfo = getTableInfo(tableArn); - - Runnable loader = loaderFactory.createDataFileLoader(dataFilePartition, tableInfo); + + final boolean acknowledgmentsEnabled = dynamoDBSourceConfig.isAcknowledgmentsEnabled(); + + AcknowledgementSet acknowledgementSet = null; + if (acknowledgmentsEnabled) { + acknowledgementSet = acknowledgementSetManager.create((result) -> { + if (result == true) { + completeDataLoader(dataFilePartition).accept(null, null); + LOG.info("Received acknowledgment of completion from sink for data file {}", dataFilePartition.getKey()); + } else { + LOG.warn("Negative acknowledgment received for data file {}, retrying", dataFilePartition.getKey()); + coordinator.giveUpPartition(dataFilePartition); + } + }, dynamoDBSourceConfig.getDataFileAcknowledgmentTimeout()); + } + + Runnable loader = loaderFactory.createDataFileLoader(dataFilePartition, tableInfo, acknowledgementSet, dynamoDBSourceConfig.getDataFileAcknowledgmentTimeout()); CompletableFuture runLoader = CompletableFuture.runAsync(loader, executor); - runLoader.whenComplete(completeDataLoader(dataFilePartition)); + + if (!acknowledgmentsEnabled) { + runLoader.whenComplete(completeDataLoader(dataFilePartition)); + } else { + runLoader.whenComplete((v, ex) -> numOfWorkers.decrementAndGet()); + } numOfWorkers.incrementAndGet(); } @Override public void run() { - LOG.debug("Start running Data File Scheduler"); + LOG.debug("Starting Data File Scheduler to process S3 data files for export"); while (!Thread.currentThread().isInterrupted()) { try { @@ -145,7 +179,10 @@ private String getStreamArn(String exportArn) { private BiConsumer completeDataLoader(DataFilePartition dataFilePartition) { return (v, ex) -> { - numOfWorkers.decrementAndGet(); + + if (!dynamoDBSourceConfig.isAcknowledgmentsEnabled()) { + numOfWorkers.decrementAndGet(); + } if (ex == null) { exportFileSuccessCounter.increment(); // Update global state @@ -155,8 +192,7 @@ private BiConsumer completeDataLoader(DataFilePartition dataFilePartition) { } else { // The data loader must have already done one last checkpointing. - LOG.debug("Data Loader completed with exception"); - LOG.error("{}", ex); + LOG.error("Loading S3 data files completed with an exception: {}", ex); // Release the ownership coordinator.giveUpPartition(dataFilePartition); } @@ -190,27 +226,24 @@ private void updateState(String exportArn, int loaded) { GlobalState globalState = (GlobalState) globalPartition.get(); LoadStatus loadStatus = LoadStatus.fromMap(globalState.getProgressState().get()); - LOG.debug("Current status: total {} loaded {}", loadStatus.getTotalFiles(), loadStatus.getLoadedFiles()); + LOG.info("Current status: total {} loaded {}", loadStatus.getTotalFiles(), loadStatus.getLoadedFiles()); loadStatus.setLoadedFiles(loadStatus.getLoadedFiles() + 1); loadStatus.setLoadedRecords(loadStatus.getLoadedRecords() + loaded); globalState.setProgressState(loadStatus.toMap()); try { - coordinator.saveProgressStateForPartition(globalState); + coordinator.saveProgressStateForPartition(globalState, null); // if all load are completed. if (streamArn != null && loadStatus.getLoadedFiles() == loadStatus.getTotalFiles()) { - LOG.debug("All Exports are done, streaming can continue..."); + LOG.info("All Exports are done, streaming can continue..."); coordinator.createPartition(new GlobalState(streamArn, Optional.empty())); } break; } catch (Exception e) { - LOG.error("Failed to update the global status, looks like the status was out of dated, will retry.."); + LOG.error("Failed to update the global status, looks like the status was out of date, will retry.."); } - } - - } } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java index 9ea0d0dde9..9ace3aa6ae 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java @@ -139,7 +139,7 @@ public void run() { private BiConsumer completeExport(ExportPartition exportPartition) { return (status, ex) -> { if (ex != null) { - LOG.debug("Check export status for {} failed with error {}", exportPartition.getPartitionKey(), ex.getMessage()); + LOG.warn("Check export status for {} failed with error {}", exportPartition.getPartitionKey(), ex.getMessage()); // closeExportPartitionWithError(exportPartition); enhancedSourceCoordinator.giveUpPartition(exportPartition); } else { @@ -182,7 +182,7 @@ private BiConsumer completeExport(ExportPartition exportParti private void createDataFilePartitions(String exportArn, String bucketName, Map dataFileInfo) { - LOG.info("Totally {} data files generated for export {}", dataFileInfo.size(), exportArn); + LOG.info("Total of {} data files generated for export {}", dataFileInfo.size(), exportArn); AtomicInteger totalRecords = new AtomicInteger(); AtomicInteger totalFiles = new AtomicInteger(); dataFileInfo.forEach((key, size) -> { @@ -206,6 +206,7 @@ private void createDataFilePartitions(String exportArn, String bucketName, Map DEFAULT_CHECKPOINT_INTERVAL_MILLS) { - enhancedSourceCoordinator.saveProgressStateForPartition(exportPartition); + enhancedSourceCoordinator.saveProgressStateForPartition(exportPartition, null); lastCheckpointTime = System.currentTimeMillis(); } @@ -266,7 +267,7 @@ private String getOrCreateExportArn(ExportPartition exportPartition) { if (exportArn != null) { LOG.info("Export arn is " + exportArn); state.setExportArn(exportArn); - enhancedSourceCoordinator.saveProgressStateForPartition(exportPartition); + enhancedSourceCoordinator.saveProgressStateForPartition(exportPartition, null); } return exportArn; } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index f80b145b8f..48e093f7b6 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -5,12 +5,12 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.stream; - import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.slf4j.Logger; @@ -79,15 +79,20 @@ public class ShardConsumer implements Runnable { private boolean waitForExport; + private final AcknowledgementSet acknowledgementSet; + + private final Duration shardAcknowledgmentTimeout; + private ShardConsumer(Builder builder) { this.dynamoDbStreamsClient = builder.dynamoDbStreamsClient; this.checkpointer = builder.checkpointer; this.shardIterator = builder.shardIterator; this.startTime = builder.startTime; this.waitForExport = builder.waitForExport; - final BufferAccumulator> bufferAccumulator = BufferAccumulator.create(builder.buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); recordConverter = new StreamRecordConverter(bufferAccumulator, builder.tableInfo, builder.pluginMetrics); + this.acknowledgementSet = builder.acknowledgementSet; + this.shardAcknowledgmentTimeout = builder.dataFileAcknowledgmentTimeout; } public static Builder builder(final DynamoDbStreamsClient dynamoDbStreamsClient, final PluginMetrics pluginMetrics, final Buffer> buffer) { @@ -114,6 +119,9 @@ static class Builder { private boolean waitForExport; + private AcknowledgementSet acknowledgementSet; + private Duration dataFileAcknowledgmentTimeout; + public Builder(final DynamoDbStreamsClient dynamoDbStreamsClient, final PluginMetrics pluginMetrics, final Buffer> buffer) { this.dynamoDbStreamsClient = dynamoDbStreamsClient; this.pluginMetrics = pluginMetrics; @@ -145,6 +153,16 @@ public Builder waitForExport(boolean waitForExport) { return this; } + public Builder acknowledgmentSet(AcknowledgementSet acknowledgementSet) { + this.acknowledgementSet = acknowledgementSet; + return this; + } + + public Builder acknowledgmentSetTimeout(Duration dataFileAcknowledgmentTimeout) { + this.dataFileAcknowledgmentTimeout = dataFileAcknowledgmentTimeout; + return this; + } + public ShardConsumer build() { return new ShardConsumer(this); } @@ -154,7 +172,7 @@ public ShardConsumer build() { @Override public void run() { - LOG.info("Shard Consumer start to run..."); + LOG.debug("Shard Consumer start to run..."); long lastCheckpointTime = System.currentTimeMillis(); String sequenceNumber = ""; @@ -162,7 +180,7 @@ public void run() { while (!shouldStop) { if (shardIterator == null) { // End of Shard - LOG.debug("Reach end of shard"); + LOG.debug("Reached end of shard"); checkpointer.checkpoint(sequenceNumber); break; } @@ -211,7 +229,7 @@ public void run() { } else { records = response.records(); } - recordConverter.writeToBuffer(records); + recordConverter.writeToBuffer(acknowledgementSet, records); } try { // Idle between get records call. @@ -221,6 +239,11 @@ public void run() { } } + if (acknowledgementSet != null) { + checkpointer.updateShardForAcknowledgmentWait(shardAcknowledgmentTimeout); + acknowledgementSet.complete(); + } + // interrupted if (shouldStop) { // Do last checkpoint and then quit diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java index df605941e1..7cdceba3ff 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.stream; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; @@ -19,6 +20,7 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; +import java.time.Duration; import java.time.Instant; import java.util.Optional; @@ -51,7 +53,9 @@ public ShardConsumerFactory(final EnhancedSourceCoordinator enhancedSourceCoordi } - public Runnable createConsumer(StreamPartition streamPartition) { + public Runnable createConsumer(final StreamPartition streamPartition, + final AcknowledgementSet acknowledgementSet, + final Duration shardAcknowledgmentTimeout) { LOG.info("Try to start a Shard Consumer for " + streamPartition.getShardId()); @@ -61,7 +65,8 @@ public Runnable createConsumer(StreamPartition streamPartition) { Instant startTime = null; boolean waitForExport = false; if (progressState.isPresent()) { - sequenceNumber = progressState.get().getSequenceNumber(); + // We can't checkpoint with acks yet + sequenceNumber = acknowledgementSet == null ? null : progressState.get().getSequenceNumber(); waitForExport = progressState.get().shouldWaitForExport(); if (progressState.get().getStartTime() != 0) { startTime = Instant.ofEpochMilli(progressState.get().getStartTime()); @@ -85,6 +90,8 @@ public Runnable createConsumer(StreamPartition streamPartition) { .shardIterator(shardIter) .startTime(startTime) .waitForExport(waitForExport) + .acknowledgmentSet(acknowledgementSet) + .acknowledgmentSetTimeout(shardAcknowledgmentTimeout) .build(); return shardConsumer; } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardManager.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardManager.java index 32ae8b573b..50c170972e 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardManager.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardManager.java @@ -129,9 +129,8 @@ public String getShardIterator(String streamArn, String shardId, String sequence .shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER) .sequenceNumber(sequenceNumber) .build(); - } else { - LOG.debug("Get Shard Iterator from beginning (TRIM_HORIZON)"); + LOG.info("Get Shard Iterator from beginning (TRIM_HORIZON) for shard {}", shardId); getShardIteratorRequest = GetShardIteratorRequest.builder() .shardId(shardId) .streamArn(streamArn) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java index ae807c2ca1..9e3113732a 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java @@ -12,6 +12,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.Optional; /** @@ -48,12 +49,10 @@ private void setSequenceNumber(String sequenceNumber) { * * @param sequenceNumber The last sequence number */ - public void checkpoint(String sequenceNumber) { LOG.debug("Checkpoint shard " + streamPartition.getShardId() + " with sequenceNumber " + sequenceNumber); setSequenceNumber(sequenceNumber); - coordinator.saveProgressStateForPartition(streamPartition); - + coordinator.saveProgressStateForPartition(streamPartition, null); } /** @@ -88,4 +87,7 @@ public boolean isExportDone() { return globalPartition.isPresent(); } + public void updateShardForAcknowledgmentWait(final Duration acknowledgmentSetTimeout) { + coordinator.saveProgressStateForPartition(streamPartition, acknowledgmentSetTimeout); + } } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java index cd4d43baaf..ea799bfc24 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java @@ -6,8 +6,11 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.stream; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; import org.slf4j.Logger; @@ -52,26 +55,53 @@ public class StreamScheduler implements Runnable { private final ShardManager shardManager; private final PluginMetrics pluginMetrics; private final AtomicLong activeChangeEventConsumers; + private final AcknowledgementSetManager acknowledgementSetManager; + private final DynamoDBSourceConfig dynamoDBSourceConfig; public StreamScheduler(final EnhancedSourceCoordinator coordinator, final ShardConsumerFactory consumerFactory, final ShardManager shardManager, - final PluginMetrics pluginMetrics) { + final PluginMetrics pluginMetrics, + final AcknowledgementSetManager acknowledgementSetManager, + final DynamoDBSourceConfig dynamoDBSourceConfig) { this.coordinator = coordinator; this.shardManager = shardManager; this.consumerFactory = consumerFactory; this.pluginMetrics = pluginMetrics; + this.acknowledgementSetManager = acknowledgementSetManager; + this.dynamoDBSourceConfig = dynamoDBSourceConfig; executor = Executors.newFixedThreadPool(MAX_JOB_COUNT); activeChangeEventConsumers = pluginMetrics.gauge(ACTIVE_CHANGE_EVENT_CONSUMERS, new AtomicLong()); } private void processStreamPartition(StreamPartition streamPartition) { - Runnable shardConsumer = consumerFactory.createConsumer(streamPartition); + final boolean acknowledgmentsEnabled = dynamoDBSourceConfig.isAcknowledgmentsEnabled(); + AcknowledgementSet acknowledgementSet = null; + + if (acknowledgmentsEnabled) { + acknowledgementSet = acknowledgementSetManager.create((result) -> { + if (result == true) { + LOG.info("Received acknowledgment of completion from sink for shard {}", streamPartition.getShardId()); + completeConsumer(streamPartition).accept(null, null); + } else { + LOG.warn("Negative acknowledgment received for shard {}, it will be retried", streamPartition.getShardId()); + coordinator.giveUpPartition(streamPartition); + } + }, dynamoDBSourceConfig.getShardAcknowledgmentTimeout()); + } + + Runnable shardConsumer = consumerFactory.createConsumer(streamPartition, acknowledgementSet, dynamoDBSourceConfig.getShardAcknowledgmentTimeout()); if (shardConsumer != null) { + CompletableFuture runConsumer = CompletableFuture.runAsync(shardConsumer, executor); - runConsumer.whenComplete(completeConsumer(streamPartition)); + + if (acknowledgmentsEnabled) { + runConsumer.whenComplete((v, ex) -> numOfWorkers.decrementAndGet()); + } else { + runConsumer.whenComplete(completeConsumer(streamPartition)); + } numOfWorkers.incrementAndGet(); } else { // If failed to create a new consumer. @@ -84,7 +114,6 @@ public void run() { LOG.debug("Stream Scheduler start to run..."); while (!Thread.currentThread().isInterrupted()) { try { - if (numOfWorkers.get() < MAX_JOB_COUNT) { final Optional sourcePartition = coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE); if (sourcePartition.isPresent()) { @@ -102,7 +131,7 @@ public void run() { break; } } catch (final Exception e) { - LOG.error("Received an exception while trying to get an S3 data file to process, backing off and retrying", e); + LOG.error("Received an exception while processing a shard for streams, backing off and retrying", e); try { Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS); } catch (final InterruptedException ex) { @@ -122,7 +151,9 @@ public void run() { private BiConsumer completeConsumer(StreamPartition streamPartition) { return (v, ex) -> { - numOfWorkers.decrementAndGet(); + if (!dynamoDBSourceConfig.isAcknowledgmentsEnabled()) { + numOfWorkers.decrementAndGet(); + } if (ex == null) { LOG.info("Shard consumer for {} is completed", streamPartition.getShardId()); LOG.debug("Start creating new stream partitions for Child Shards"); @@ -147,8 +178,6 @@ private BiConsumer completeConsumer(StreamPartition streamPartition) { LOG.debug("Shard consumer completed with exception"); LOG.error(ex.toString()); coordinator.giveUpPartition(streamPartition); - - } }; } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBServiceTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBServiceTest.java index 0032bcee92..9b436321fb 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBServiceTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBServiceTest.java @@ -11,6 +11,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; @@ -102,6 +103,9 @@ class DynamoDBServiceTest { @Mock private Buffer> buffer; + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + private DynamoDBService dynamoDBService; private Collection keySchema; @@ -162,7 +166,7 @@ void setup() { } private DynamoDBService createObjectUnderTest() { - DynamoDBService objectUnderTest = new DynamoDBService(coordinator, clientFactory, sourceConfig, pluginMetrics); + DynamoDBService objectUnderTest = new DynamoDBService(coordinator, clientFactory, sourceConfig, pluginMetrics, acknowledgementSetManager); return objectUnderTest; } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java index a31a642b8e..872e125ae1 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java @@ -31,6 +31,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.times; @@ -107,7 +108,7 @@ void test_writeToBuffer() throws Exception { List data = generateData(numberOfRecords); ExportRecordConverter recordConverter = new ExportRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); - recordConverter.writeToBuffer(data); + recordConverter.writeToBuffer(null, data); verify(bufferAccumulator, times(numberOfRecords)).add(any(Record.class)); verify(exportRecordSuccess).increment(anyDouble()); @@ -127,7 +128,7 @@ void test_writeSingleRecordToBuffer() throws Exception { doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture()); // doNothing().when(bufferAccumulator).flush(); - recordConverter.writeToBuffer(List.of(line)); + recordConverter.writeToBuffer(eq(null), List.of(line)); verify(bufferAccumulator).add(any(Record.class)); verify(bufferAccumulator).flush(); assertThat(recordArgumentCaptor.getValue().getData(), notNullValue()); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java index cd02172da2..944ca4c99f 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java @@ -102,7 +102,7 @@ void test_writeToBuffer() throws Exception { StreamRecordConverter recordConverter = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); - recordConverter.writeToBuffer(records); + recordConverter.writeToBuffer(null, records); verify(bufferAccumulator, times(numberOfRecords)).add(any(Record.class)); verify(bufferAccumulator).flush(); verify(changeEventSuccessCounter).increment(anyDouble()); @@ -120,7 +120,7 @@ void test_writeSingleRecordToBuffer() throws Exception { StreamRecordConverter recordConverter = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture()); - recordConverter.writeToBuffer(records); + recordConverter.writeToBuffer(null, records); verify(bufferAccumulator).add(any(Record.class)); verify(bufferAccumulator).flush(); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactoryTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactoryTest.java index 061cb91a93..0c954740da 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactoryTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactoryTest.java @@ -11,6 +11,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; @@ -21,12 +22,14 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata; import software.amazon.awssdk.services.s3.S3Client; +import java.time.Duration; import java.util.Optional; import java.util.Random; import java.util.UUID; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Mockito.mock; @ExtendWith(MockitoExtension.class) class DataFileLoaderFactoryTest { @@ -84,10 +87,19 @@ void setup() { @Test void test_createDataFileLoader() { - DataFileLoaderFactory loaderFactory = new DataFileLoaderFactory(coordinator, s3Client, pluginMetrics, buffer); - Runnable loader = loaderFactory.createDataFileLoader(dataFilePartition, tableInfo); + Runnable loader = loaderFactory.createDataFileLoader(dataFilePartition, tableInfo, null, null); assertThat(loader, notNullValue()); + } + + @Test + void test_createDataFileLoader_with_acknowledgments() { + final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); + final Duration acknowledgmentTimeout = Duration.ofSeconds(30); + DataFileLoaderFactory loaderFactory = new DataFileLoaderFactory(coordinator, s3Client, pluginMetrics, buffer); + + Runnable loader = loaderFactory.createDataFileLoader(dataFilePartition, tableInfo, acknowledgementSet, acknowledgmentTimeout); + assertThat(loader, notNullValue()); } } \ No newline at end of file diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java index 443e68f2fb..7c5c4bee56 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java @@ -14,6 +14,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; @@ -33,6 +34,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Optional; import java.util.Random; import java.util.UUID; @@ -40,9 +42,11 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -68,7 +72,6 @@ class DataFileLoaderTest { @Mock private BufferAccumulator> bufferAccumulator; - @Mock private Counter testCounter; @@ -120,7 +123,7 @@ void setup() throws Exception { lenient().when(coordinator.createPartition(any(EnhancedSourcePartition.class))).thenReturn(true); lenient().doNothing().when(coordinator).completePartition(any(EnhancedSourcePartition.class)); - lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(EnhancedSourcePartition.class)); + lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(EnhancedSourcePartition.class), eq(null)); lenient().doNothing().when(coordinator).giveUpPartition(any(EnhancedSourcePartition.class)); doNothing().when(bufferAccumulator).add(any(Record.class)); @@ -166,14 +169,46 @@ private ResponseInputStream generateGzipInputStream(int numbe void test_run_loadFile_correctly() throws Exception { DataFileLoader loader; try ( - final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class) - ) { + final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { + bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); + loader = DataFileLoader.builder(objectReader, pluginMetrics, buffer) + .bucketName(bucketName) + .key(manifestKey) + .checkpointer(checkpointer) + .tableInfo(tableInfo) + .build(); + } + + loader.run(); + + // Should call s3 getObject + verify(s3Client).getObject(any(GetObjectRequest.class)); + + // Should write to buffer + verify(bufferAccumulator, times(total)).add(any(Record.class)); + verify(bufferAccumulator).flush(); + + // Should do one last checkpoint when done. + verify(coordinator).saveProgressStateForPartition(any(DataFilePartition.class), eq(null)); + } + + @Test + void run_loadFile_with_acknowledgments_processes_correctly() throws Exception { + + final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); + final Duration acknowledgmentTimeout = Duration.ofSeconds(30); + + DataFileLoader loader; + try ( + final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); loader = DataFileLoader.builder(objectReader, pluginMetrics, buffer) .bucketName(bucketName) .key(manifestKey) .checkpointer(checkpointer) .tableInfo(tableInfo) + .acknowledgmentSet(acknowledgementSet) + .acknowledgmentSetTimeout(acknowledgmentTimeout) .build(); } @@ -187,8 +222,9 @@ void test_run_loadFile_correctly() throws Exception { verify(bufferAccumulator).flush(); // Should do one last checkpoint when done. - verify(coordinator).saveProgressStateForPartition(any(DataFilePartition.class)); + verify(coordinator).saveProgressStateForPartition(any(DataFilePartition.class), eq(null)); + verify(acknowledgementSet).complete(); } } \ No newline at end of file diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java index 66378ee406..07dc9bfff9 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java @@ -12,8 +12,11 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.DataFileProgressState; @@ -21,6 +24,7 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata; +import java.time.Duration; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -28,6 +32,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -35,7 +40,9 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileScheduler.ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE; @@ -47,6 +54,12 @@ class DataFileSchedulerTest { @Mock private EnhancedSourceCoordinator coordinator; + @Mock + private DynamoDBSourceConfig dynamoDBSourceConfig; + + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + @Mock private PluginMetrics pluginMetrics; @@ -113,16 +126,18 @@ void setup() { lenient().when(coordinator.createPartition(any(EnhancedSourcePartition.class))).thenReturn(true); lenient().doNothing().when(coordinator).completePartition(any(EnhancedSourcePartition.class)); lenient().doNothing().when(coordinator).giveUpPartition(any(EnhancedSourcePartition.class)); - - lenient().when(loaderFactory.createDataFileLoader(any(DataFilePartition.class), any(TableInfo.class))).thenReturn(() -> System.out.println("Hello")); - } @Test public void test_run_DataFileLoader_correctly() throws InterruptedException { + given(loaderFactory.createDataFileLoader(any(DataFilePartition.class), any(TableInfo.class), eq(null), any(Duration.class))).willReturn(() -> System.out.println("Hello")); + + given(coordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE)).willReturn(Optional.of(dataFilePartition)).willReturn(Optional.empty()); + given(dynamoDBSourceConfig.isAcknowledgmentsEnabled()).willReturn(false); + given(dynamoDBSourceConfig.getDataFileAcknowledgmentTimeout()).willReturn(Duration.ofSeconds(10)); - scheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics); + scheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig); ExecutorService executorService = Executors.newSingleThreadExecutor(); final Future future = executorService.submit(() -> scheduler.run()); @@ -134,11 +149,11 @@ public void test_run_DataFileLoader_correctly() throws InterruptedException { // Should acquire data file partition verify(coordinator).acquireAvailablePartition(DataFilePartition.PARTITION_TYPE); // Should create a loader - verify(loaderFactory).createDataFileLoader(any(DataFilePartition.class), any(TableInfo.class)); + verify(loaderFactory).createDataFileLoader(any(DataFilePartition.class), any(TableInfo.class), eq(null), any(Duration.class)); // Need to call getPartition for 3 times (3 global states, 2 TableInfo) verify(coordinator, times(3)).getPartition(anyString()); // Should update global state with load status - verify(coordinator).saveProgressStateForPartition(any(GlobalState.class)); + verify(coordinator).saveProgressStateForPartition(any(GlobalState.class), eq(null)); // Should create a partition to inform streaming can start. verify(coordinator).createPartition(any(GlobalState.class)); // Should mask the partition as completed. @@ -148,14 +163,57 @@ public void test_run_DataFileLoader_correctly() throws InterruptedException { executorService.shutdownNow(); + } + @Test + void run_DataFileLoader_with_acknowledgments_enabled_processes_correctly() throws InterruptedException { + given(coordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE)).willReturn(Optional.of(dataFilePartition)).willReturn(Optional.empty()); + given(dynamoDBSourceConfig.isAcknowledgmentsEnabled()).willReturn(true); + + final Duration dataFileAcknowledgmentTimeout = Duration.ofSeconds(30); + given(dynamoDBSourceConfig.getDataFileAcknowledgmentTimeout()).willReturn(dataFileAcknowledgmentTimeout); + + final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); + doAnswer(invocation -> { + Consumer consumer = invocation.getArgument(0); + consumer.accept(true); + return acknowledgementSet; + }).when(acknowledgementSetManager).create(any(Consumer.class), eq(dataFileAcknowledgmentTimeout)); + + given(loaderFactory.createDataFileLoader(any(DataFilePartition.class), any(TableInfo.class), eq(acknowledgementSet), eq(dataFileAcknowledgmentTimeout))).willReturn(() -> System.out.println("Hello")); + + scheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + final Future future = executorService.submit(() -> scheduler.run()); + Thread.sleep(100); + executorService.shutdown(); + future.cancel(true); + assertThat(executorService.awaitTermination(1000, TimeUnit.MILLISECONDS), equalTo(true)); + + // Should acquire data file partition + verify(coordinator).acquireAvailablePartition(DataFilePartition.PARTITION_TYPE); + // Should create a loader + verify(loaderFactory).createDataFileLoader(any(DataFilePartition.class), any(TableInfo.class), eq(acknowledgementSet), eq(dataFileAcknowledgmentTimeout)); + // Need to call getPartition for 3 times (3 global states, 2 TableInfo) + verify(coordinator, times(3)).getPartition(anyString()); + // Should update global state with load status + verify(coordinator).saveProgressStateForPartition(any(GlobalState.class), eq(null)); + // Should create a partition to inform streaming can start. + verify(coordinator).createPartition(any(GlobalState.class)); + // Should mask the partition as completed. + verify(coordinator).completePartition(any(DataFilePartition.class)); + // Should update metrics. + verify(exportFileSuccess).increment(); + + executorService.shutdownNow(); } @Test void run_catches_exception_and_retries_when_exception_is_thrown_during_processing() throws InterruptedException { given(coordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE)).willThrow(RuntimeException.class); - scheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics); + scheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig); ExecutorService executorService = Executors.newSingleThreadExecutor(); final Future future = executorService.submit(() -> scheduler.run()); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java index c2f3bd1cf1..6e2a487d78 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java @@ -90,7 +90,7 @@ public void test_create_shardConsumer_correctly() { ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, shardManager, buffer); - Runnable consumer = consumerFactory.createConsumer(streamPartition); + Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null); assertThat(consumer, notNullValue()); } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java index 417787becc..ec9df68189 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java @@ -14,6 +14,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; @@ -31,6 +32,7 @@ import software.amazon.awssdk.services.dynamodb.model.StreamRecord; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -43,7 +45,9 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.doNothing; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -96,7 +100,7 @@ class ShardConsumerTest { private final Random random = new Random(); - private final int total = random.nextInt(10); + private final int total = random.nextInt(10) + 1; @BeforeEach @@ -121,7 +125,7 @@ void setup() throws Exception { lenient().when(coordinator.createPartition(any(EnhancedSourcePartition.class))).thenReturn(true); lenient().doNothing().when(coordinator).completePartition(any(EnhancedSourcePartition.class)); - lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(EnhancedSourcePartition.class)); + lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(EnhancedSourcePartition.class), eq(null)); lenient().doNothing().when(coordinator).giveUpPartition(any(EnhancedSourcePartition.class)); doNothing().when(bufferAccumulator).add(any(org.opensearch.dataprepper.model.record.Record.class)); @@ -161,12 +165,47 @@ void test_run_shardConsumer_correctly() throws Exception { // Should call GetRecords verify(dynamoDbStreamsClient).getRecords(any(GetRecordsRequest.class)); + // Should write to buffer + verify(bufferAccumulator, times(total)).add(any(org.opensearch.dataprepper.model.record.Record.class)); + verify(bufferAccumulator).flush(); + // Should complete the consumer as reach to end of shard + verify(coordinator).saveProgressStateForPartition(any(StreamPartition.class), eq(null)); + } + + @Test + void test_run_shardConsumer_with_acknowledgments_correctly() throws Exception { + final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); + final Duration acknowledgmentTimeout = Duration.ofSeconds(30); + + ShardConsumer shardConsumer; + try ( + final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class) + ) { + bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); + shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, buffer) + .shardIterator(shardIterator) + .checkpointer(checkpointer) + .tableInfo(tableInfo) + .startTime(null) + .acknowledgmentSetTimeout(acknowledgmentTimeout) + .acknowledgmentSet(acknowledgementSet) + .waitForExport(false) + .build(); + } + + shardConsumer.run(); + + // Should call GetRecords + verify(dynamoDbStreamsClient).getRecords(any(GetRecordsRequest.class)); + // Should write to buffer verify(bufferAccumulator, times(total)).add(any(org.opensearch.dataprepper.model.record.Record.class)); verify(bufferAccumulator).flush(); // Should complete the consumer as reach to end of shard - verify(coordinator).saveProgressStateForPartition(any(StreamPartition.class)); + verify(coordinator).saveProgressStateForPartition(any(StreamPartition.class), eq(null)); + + verify(acknowledgementSet).complete(); } /** diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java index a551a6052f..4c6dfdcd0d 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java @@ -11,12 +11,16 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; +import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Optional; @@ -26,6 +30,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -33,7 +38,9 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamScheduler.ACTIVE_CHANGE_EVENT_CONSUMERS; @@ -47,6 +54,12 @@ class StreamSchedulerTest { @Mock private DynamoDbStreamsClient dynamoDbStreamsClient; + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + + @Mock + private DynamoDBSourceConfig dynamoDBSourceConfig; + @Mock private ShardManager shardManager; @@ -90,10 +103,8 @@ void setup() { // Mock Coordinator methods lenient().when(coordinator.createPartition(any(EnhancedSourcePartition.class))).thenReturn(true); lenient().doNothing().when(coordinator).completePartition(any(EnhancedSourcePartition.class)); - lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(EnhancedSourcePartition.class)); + lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(EnhancedSourcePartition.class), eq(null)); lenient().doNothing().when(coordinator).giveUpPartition(any(EnhancedSourcePartition.class)); - - lenient().when(consumerFactory.createConsumer(any(StreamPartition.class))).thenReturn(() -> System.out.println("Hello")); lenient().when(shardManager.getChildShardIds(anyString(), anyString())).thenReturn(List.of(shardId)); when(pluginMetrics.gauge(eq(ACTIVE_CHANGE_EVENT_CONSUMERS), any(AtomicLong.class))).thenReturn(activeShardConsumers); @@ -103,9 +114,10 @@ void setup() { @Test public void test_normal_run() throws InterruptedException { - given(coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).willReturn(Optional.of(streamPartition)).willReturn(Optional.empty()); + when(consumerFactory.createConsumer(any(StreamPartition.class), eq(null), any(Duration.class))).thenReturn(() -> System.out.println("Hello")); + when(coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).thenReturn(Optional.of(streamPartition)).thenReturn(Optional.empty()); - scheduler = new StreamScheduler(coordinator, consumerFactory, shardManager, pluginMetrics); + scheduler = new StreamScheduler(coordinator, consumerFactory, shardManager, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig); ExecutorService executorService = Executors.newSingleThreadExecutor(); @@ -117,7 +129,46 @@ public void test_normal_run() throws InterruptedException { // Should acquire the stream partition verify(coordinator).acquireAvailablePartition(StreamPartition.PARTITION_TYPE); // Should start a new consumer - verify(consumerFactory).createConsumer(any(StreamPartition.class)); + verify(consumerFactory).createConsumer(any(StreamPartition.class), eq(null), any(Duration.class)); + // Should create stream partition for child shards. + verify(coordinator).createPartition(any(StreamPartition.class)); + // Should mask the stream partition as completed. + verify(coordinator).completePartition(any(StreamPartition.class)); + + executorService.shutdownNow(); + } + + @Test + public void test_normal_run_with_acknowledgments() throws InterruptedException { + given(coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).willReturn(Optional.of(streamPartition)).willReturn(Optional.empty()); + given(dynamoDBSourceConfig.isAcknowledgmentsEnabled()).willReturn(true); + + final Duration shardAcknowledgmentTimeout = Duration.ofSeconds(30); + given(dynamoDBSourceConfig.getShardAcknowledgmentTimeout()).willReturn(shardAcknowledgmentTimeout); + + final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); + doAnswer(invocation -> { + Consumer consumer = invocation.getArgument(0); + consumer.accept(true); + return acknowledgementSet; + }).when(acknowledgementSetManager).create(any(Consumer.class), eq(shardAcknowledgmentTimeout)); + + when(consumerFactory.createConsumer(any(StreamPartition.class), eq(acknowledgementSet), eq(shardAcknowledgmentTimeout))).thenReturn(() -> System.out.println("Hello")); + + scheduler = new StreamScheduler(coordinator, consumerFactory, shardManager, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + + final Future future = executorService.submit(() -> scheduler.run()); + Thread.sleep(3000); + executorService.shutdown(); + future.cancel(true); + assertThat(executorService.awaitTermination(1000, TimeUnit.MILLISECONDS), equalTo(true)); + + // Should acquire the stream partition + verify(coordinator).acquireAvailablePartition(StreamPartition.PARTITION_TYPE); + // Should start a new consumer + verify(consumerFactory).createConsumer(any(StreamPartition.class), any(AcknowledgementSet.class), any(Duration.class)); // Should create stream partition for child shards. verify(coordinator).createPartition(any(StreamPartition.class)); // Should mask the stream partition as completed. @@ -129,8 +180,7 @@ public void test_normal_run() throws InterruptedException { @Test void run_catches_exception_and_retries_when_exception_is_thrown_during_processing() throws InterruptedException { given(coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).willThrow(RuntimeException.class); - - scheduler = new StreamScheduler(coordinator, consumerFactory, shardManager, pluginMetrics); + scheduler = new StreamScheduler(coordinator, consumerFactory, shardManager, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig); ExecutorService executorService = Executors.newSingleThreadExecutor(); final Future future = executorService.submit(() -> scheduler.run());