Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add acknowledgments for the ddb source #3575

Merged
merged 2 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ public interface EnhancedSourceCoordinator {
*
* @param partition The partition to be updated.
* @param <T> The progress state class
* @param ownershipTimeoutRenewal The amount of time to update ownership of the partition before another instance can acquire it.
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException when the partition was not updated successfully
*/
<T> void saveProgressStateForPartition(EnhancedSourcePartition<T> partition);
<T> void saveProgressStateForPartition(EnhancedSourcePartition<T> partition, Duration ownershipTimeoutRenewal);

/**
* This method is used to release the lease of a partition in the coordination store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public boolean release(final EventHandle eventHandle, final boolean result) {
callbackFuture = executor.submit(() -> callback.accept(this.result));
return true;
} else if (pendingAcknowledgments.size() == 0) {
LOG.warn("Acknowledgement set is not completed. Delaying callback until it is completed");
LOG.debug("Acknowledgement set is not completed. Delaying callback until it is completed");
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public Optional<EnhancedSourcePartition> acquireAvailablePartition(String partit


@Override
public <T> void saveProgressStateForPartition(EnhancedSourcePartition<T> partition) {
public <T> void saveProgressStateForPartition(EnhancedSourcePartition<T> partition, final Duration ownershipTimeoutRenewal) {
String partitionType = partition.getPartitionType() == null ? DEFAULT_GLOBAL_STATE_PARTITION_TYPE : partition.getPartitionType();
LOG.debug("Try to save progress for partition {} (Type {})", partition.getPartitionKey(), partitionType);

Expand All @@ -140,7 +140,7 @@ public <T> void saveProgressStateForPartition(EnhancedSourcePartition<T> partiti
final SourcePartitionStoreItem updateItem = partition.getSourcePartitionStoreItem();
// Also extend the timeout of the lease (ownership)
if (updateItem.getPartitionOwnershipTimeout() != null) {
updateItem.setPartitionOwnershipTimeout(Instant.now().plus(DEFAULT_LEASE_TIMEOUT));
updateItem.setPartitionOwnershipTimeout(Instant.now().plus(ownershipTimeoutRenewal == null ? DEFAULT_LEASE_TIMEOUT : ownershipTimeoutRenewal));
}
updateItem.setPartitionProgressState(partition.convertPartitionProgressStatetoString(partition.getProgressState()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ void test_saveProgressStateForPartition() {
Optional<EnhancedSourcePartition> sourcePartition = coordinator.acquireAvailablePartition(DEFAULT_PARTITION_TYPE);
assertThat(sourcePartition.isPresent(), equalTo(true));
TestEnhancedSourcePartition partition = (TestEnhancedSourcePartition) sourcePartition.get();
coordinator.saveProgressStateForPartition(partition);
coordinator.saveProgressStateForPartition(partition, null);

verify(sourceCoordinationStore).tryAcquireAvailablePartition(anyString(), anyString(), any(Duration.class));
verify(sourceCoordinationStore).tryUpdateSourcePartitionItem(any(SourcePartitionStoreItem.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.AwsAuthenticationConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.ExportConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.s3.S3Client;
Expand All @@ -17,15 +20,19 @@ public class ClientFactory {

private final AwsCredentialsProvider awsCredentialsProvider;
private final AwsAuthenticationConfig awsAuthenticationConfig;
private final ExportConfig exportConfig;

public ClientFactory(AwsCredentialsSupplier awsCredentialsSupplier, AwsAuthenticationConfig awsAuthenticationConfig) {
public ClientFactory(final AwsCredentialsSupplier awsCredentialsSupplier,
final AwsAuthenticationConfig awsAuthenticationConfig,
final ExportConfig exportConfig) {
awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder()
.withRegion(awsAuthenticationConfig.getAwsRegion())
.withStsRoleArn(awsAuthenticationConfig.getAwsStsRoleArn())
.withStsExternalId(awsAuthenticationConfig.getAwsStsExternalId())
.withStsHeaderOverrides(awsAuthenticationConfig.getAwsStsHeaderOverrides())
.build());
this.awsAuthenticationConfig = awsAuthenticationConfig;
this.exportConfig = exportConfig;
}


Expand All @@ -47,8 +54,20 @@ public DynamoDbClient buildDynamoDBClient() {

public S3Client buildS3Client() {
return S3Client.builder()
.region(getS3ClientRegion())
.credentialsProvider(awsCredentialsProvider)
.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(retryPolicy -> retryPolicy.numRetries(5).build())
.build())
.build();
}

private Region getS3ClientRegion() {
if (exportConfig != null && exportConfig.getAwsRegion() != null) {
return exportConfig.getAwsRegion();
}

return awsAuthenticationConfig.getAwsRegion();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.source.dynamodb;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
Expand Down Expand Up @@ -59,6 +60,8 @@ public class DynamoDBService {
private final EnhancedSourceCoordinator coordinator;

private final DynamoDbClient dynamoDbClient;

private final DynamoDBSourceConfig dynamoDBSourceConfig;
//
private final DynamoDbStreamsClient dynamoDbStreamsClient;

Expand All @@ -70,13 +73,21 @@ public class DynamoDBService {

private final PluginMetrics pluginMetrics;

private final AcknowledgementSetManager acknowledgementSetManager;

static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60);
static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000;


public DynamoDBService(EnhancedSourceCoordinator coordinator, ClientFactory clientFactory, DynamoDBSourceConfig sourceConfig, PluginMetrics pluginMetrics) {
public DynamoDBService(final EnhancedSourceCoordinator coordinator,
final ClientFactory clientFactory,
final DynamoDBSourceConfig sourceConfig,
final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager) {
this.coordinator = coordinator;
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.dynamoDBSourceConfig = sourceConfig;

// Initialize AWS clients
dynamoDbClient = clientFactory.buildDynamoDBClient();
Expand All @@ -103,10 +114,10 @@ public void start(Buffer<Record<Event>> buffer) {
Runnable exportScheduler = new ExportScheduler(coordinator, dynamoDbClient, manifestFileReader, pluginMetrics);

DataFileLoaderFactory loaderFactory = new DataFileLoaderFactory(coordinator, s3Client, pluginMetrics, buffer);
Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics);
Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig);

ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, shardManager, buffer);
Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, shardManager, pluginMetrics);
Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, shardManager, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig);

// May consider start or shutdown the scheduler on demand
// Currently, event after the exports are done, the related scheduler will not be shutdown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
Expand Down Expand Up @@ -39,19 +40,26 @@ public class DynamoDBSource implements Source<Record<Event>>, UsesEnhancedSource

private final ClientFactory clientFactory;

private final AcknowledgementSetManager acknowledgementSetManager;

private EnhancedSourceCoordinator coordinator;

private DynamoDBService dynamoDBService;


@DataPrepperPluginConstructor
public DynamoDBSource(PluginMetrics pluginMetrics, final DynamoDBSourceConfig sourceConfig, final PluginFactory pluginFactory, final AwsCredentialsSupplier awsCredentialsSupplier) {
public DynamoDBSource(final PluginMetrics pluginMetrics,
final DynamoDBSourceConfig sourceConfig,
final PluginFactory pluginFactory,
final AwsCredentialsSupplier awsCredentialsSupplier,
final AcknowledgementSetManager acknowledgementSetManager) {
LOG.info("Create DynamoDB Source");
this.pluginMetrics = pluginMetrics;
this.sourceConfig = sourceConfig;
this.pluginFactory = pluginFactory;
this.acknowledgementSetManager = acknowledgementSetManager;

clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig());
clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig(), sourceConfig.getTableConfigs().get(0).getExportConfig());
}

@Override
Expand All @@ -61,7 +69,7 @@ public void start(Buffer<Record<Event>> buffer) {
coordinator.createPartition(new InitPartition());

// Create DynamoDB Service
dynamoDBService = new DynamoDBService(coordinator, clientFactory, sourceConfig, pluginMetrics);
dynamoDBService = new DynamoDBService(coordinator, clientFactory, sourceConfig, pluginMetrics, acknowledgementSetManager);
dynamoDBService.init();

LOG.info("Start DynamoDB service");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.AwsAuthenticationConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.TableConfig;

import java.time.Duration;
import java.util.Collections;
import java.util.List;

/**
Expand All @@ -19,14 +22,22 @@
public class DynamoDBSourceConfig {

@JsonProperty("tables")
private List<TableConfig> tableConfigs;

private List<TableConfig> tableConfigs = Collections.emptyList();

@JsonProperty("aws")
@NotNull
@Valid
private AwsAuthenticationConfig awsAuthenticationConfig;

@JsonProperty("acknowledgments")
private boolean acknowledgments = false;

@JsonProperty("shard_acknowledgment_timeout")
private Duration shardAcknowledgmentTimeout = Duration.ofMinutes(3);

@JsonProperty("s3_data_file_acknowledgment_timeout")
private Duration dataFileAcknowledgmentTimeout = Duration.ofMinutes(5);

public DynamoDBSourceConfig() {
}

Expand All @@ -38,4 +49,19 @@ public List<TableConfig> getTableConfigs() {
public AwsAuthenticationConfig getAwsAuthenticationConfig() {
return awsAuthenticationConfig;
}

public boolean isAcknowledgmentsEnabled() {
return acknowledgments;
}

public Duration getShardAcknowledgmentTimeout() {
return shardAcknowledgmentTimeout;
}

public Duration getDataFileAcknowledgmentTimeout() { return dataFileAcknowledgmentTimeout; }

@AssertTrue(message = "Exactly one table must be configured for the DynamoDb source.")
boolean isExactlyOneTableConfigured() {
return tableConfigs.size() == 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,30 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotBlank;
import software.amazon.awssdk.regions.Region;

public class ExportConfig {

@JsonProperty("s3_bucket")
@NotBlank(message = "Bucket Name is required for export")
private String s3Bucket;

@JsonProperty("s3_prefix")
private String s3Prefix;

@JsonProperty("s3_region")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really an issue, but just want to point out that I have never tested whether it can support s3 in a different region or not. Even it can, it will be strange since ddb and opensearch are in the same region, what is the point to write or read data files from another region.

Copy link
Member Author

@graytaylor0 graytaylor0 Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't know why anyone would do that but users have the option to export to a different region's bucket. By default this can be ignored for same region though, so it can't really hurt to have

private String s3Region;

public String getS3Bucket() {
return s3Bucket;
}

public String getS3Prefix() {
return s3Prefix;
}

public Region getAwsRegion() {
return s3Region != null ? Region.of(s3Region) : null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
Expand Down Expand Up @@ -58,13 +59,13 @@ String getEventType() {
return "EXPORT";
}

public void writeToBuffer(List<String> lines) {
public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List<String> lines) {

int eventCount = 0;
for (String line : lines) {
Map data = (Map<String, Object>) convertToMap(line).get(ITEM_KEY);
try {
addToBuffer(data);
addToBuffer(acknowledgementSet, data);
eventCount++;
} catch (Exception e) {
// will this cause too many logs?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.source.dynamodb.converter;

import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.JacksonEvent;
Expand Down Expand Up @@ -70,7 +71,7 @@ void flushBuffer() throws Exception {
* @param eventName Event name
* @throws Exception Exception if failed to write to buffer.
*/
public void addToBuffer(Map<String, Object> data, Map<String, Object> keys, long eventCreationTimeMillis, String eventName) throws Exception {
public void addToBuffer(final AcknowledgementSet acknowledgementSet, Map<String, Object> data, Map<String, Object> keys, long eventCreationTimeMillis, String eventName) throws Exception {
Event event = JacksonEvent.builder()
.withEventType(getEventType())
.withData(data)
Expand All @@ -90,13 +91,16 @@ public void addToBuffer(Map<String, Object> data, Map<String, Object> keys, long
} else {
eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, partitionKey);
}
if (acknowledgementSet != null) {
acknowledgementSet.add(event);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't fully understand how this works, but will this have any memory issue? I only see there is a add of event, will that be a clear of event somewhere else.

Copy link
Member Author

@graytaylor0 graytaylor0 Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just adds an eventHandle to the acknowledgment set, and this takes up a very small amount of memory. We release events when they are sent to the sink, and then after acknowledgment set is acked it will be cleaned up. In case of no ack then it will timeout and be cleaned up then. @kkondaka can explain more if needed

}
bufferAccumulator.add(new Record<>(event));
}

public void addToBuffer(Map<String, Object> data) throws Exception {
public void addToBuffer(final AcknowledgementSet acknowledgementSet, Map<String, Object> data) throws Exception {
// Export data doesn't have an event timestamp
// Default to current timestamp when the event is added to buffer
addToBuffer(data, data, System.currentTimeMillis(), null);
addToBuffer(acknowledgementSet, data, data, System.currentTimeMillis(), null);
}

private String mapStreamEventNameToBulkAction(final String streamEventName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.slf4j.Logger;
Expand Down Expand Up @@ -53,7 +54,7 @@ String getEventType() {
}


public void writeToBuffer(List<Record> records) {
public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List<Record> records) {

int eventCount = 0;
for (Record record : records) {
Expand All @@ -63,7 +64,7 @@ public void writeToBuffer(List<Record> records) {
Map<String, Object> keys = convertKeys(record.dynamodb().keys());

try {
addToBuffer(data, keys, record.dynamodb().approximateCreationDateTime().toEpochMilli(), record.eventNameAsString());
addToBuffer(acknowledgementSet, data, keys, record.dynamodb().approximateCreationDateTime().toEpochMilli(), record.eventNameAsString());
eventCount++;
} catch (Exception e) {
// will this cause too many logs?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class DataFilePartition extends EnhancedSourcePartition<DataFileProgressS
private final String bucket;
private final String key;

private final DataFileProgressState state;
private DataFileProgressState state;

public DataFilePartition(SourcePartitionStoreItem sourcePartitionStoreItem) {

Expand Down
Loading
Loading