Skip to content

Commit

Permalink
Add bug fixes and improvements to DDB source (#3534)
Browse files Browse the repository at this point in the history
Signed-off-by: Aiden Dai <[email protected]>
  • Loading branch information
daixba authored Oct 23, 2023
1 parent 2f8372b commit fe750f5
Show file tree
Hide file tree
Showing 14 changed files with 264 additions and 136 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/dynamodb-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion'

implementation project(path: ':data-prepper-plugins:aws-plugin-api')
implementation project(path: ':data-prepper-plugins:buffer-common')


testImplementation platform('org.junit:junit-bom:5.9.1')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.s3.S3Client;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -69,6 +70,9 @@ public class DynamoDBService {

private final PluginMetrics pluginMetrics;

static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60);
static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000;


public DynamoDBService(EnhancedSourceCoordinator coordinator, ClientFactory clientFactory, DynamoDBSourceConfig sourceConfig, PluginMetrics pluginMetrics) {
this.coordinator = coordinator;
Expand Down Expand Up @@ -156,14 +160,13 @@ public void init() {
Instant startTime = Instant.now();

if (tableInfo.getMetadata().isExportRequired()) {
// exportTime = Instant.now();
createExportPartition(tableInfo.getTableArn(), startTime, tableInfo.getMetadata().getExportBucket(), tableInfo.getMetadata().getExportPrefix());
}

if (tableInfo.getMetadata().isStreamRequired()) {
List<String> shardIds;
// start position by default is TRIM_HORIZON if not provided.
if (tableInfo.getMetadata().isExportRequired() || String.valueOf(StreamStartPosition.LATEST).equals(tableInfo.getMetadata().getStreamStartPosition())) {
if (tableInfo.getMetadata().isExportRequired() || tableInfo.getMetadata().getStreamStartPosition() == StreamStartPosition.LATEST) {
// For a continued data extraction process that involves both export and stream
// The export must be completed and loaded before stream can start.
// Moreover, there should not be any gaps between the export time and the time start reading the stream
Expand Down Expand Up @@ -274,15 +277,13 @@ private TableInfo getTableInfo(TableConfig tableConfig) {
throw new InvalidPluginConfigurationException(errorMessage);
}
// Validate view type of DynamoDB stream
if (describeTableResult.table().streamSpecification() != null) {
String viewType = describeTableResult.table().streamSpecification().streamViewTypeAsString();
LOG.debug("The stream view type for table " + tableName + " is " + viewType);
List<String> supportedType = List.of("NEW_IMAGE", "NEW_AND_OLD_IMAGES");
if (!supportedType.contains(viewType)) {
String errorMessage = "Stream " + tableConfig.getTableArn() + " is enabled with " + viewType + ". Supported types are " + supportedType;
LOG.error(errorMessage);
throw new InvalidPluginConfigurationException(errorMessage);
}
String viewType = describeTableResult.table().streamSpecification().streamViewTypeAsString();
LOG.debug("The stream view type for table " + tableName + " is " + viewType);
List<String> supportedType = List.of("NEW_IMAGE", "NEW_AND_OLD_IMAGES");
if (!supportedType.contains(viewType)) {
String errorMessage = "Stream " + tableConfig.getTableArn() + " is enabled with " + viewType + ". Supported types are " + supportedType;
LOG.error(errorMessage);
throw new InvalidPluginConfigurationException(errorMessage);
}
streamStartPosition = tableConfig.getStreamConfig().getStartPosition();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -46,7 +45,7 @@ public class DynamoDBSource implements Source<Record<Event>>, UsesEnhancedSource


@DataPrepperPluginConstructor
public DynamoDBSource(PluginMetrics pluginMetrics, final DynamoDBSourceConfig sourceConfig, final PluginFactory pluginFactory, final PluginSetting pluginSetting, final AwsCredentialsSupplier awsCredentialsSupplier) {
public DynamoDBSource(PluginMetrics pluginMetrics, final DynamoDBSourceConfig sourceConfig, final PluginFactory pluginFactory, final AwsCredentialsSupplier awsCredentialsSupplier) {
LOG.info("Create DynamoDB Source");
this.pluginMetrics = pluginMetrics;
this.sourceConfig = sourceConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.dataformat.ion.IonObjectMapper;
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
Expand All @@ -18,7 +18,6 @@

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class ExportRecordConverter extends RecordConverter {

Expand All @@ -37,8 +36,8 @@ public class ExportRecordConverter extends RecordConverter {
private final Counter exportRecordSuccessCounter;
private final Counter exportRecordErrorCounter;

public ExportRecordConverter(Buffer<Record<Event>> buffer, TableInfo tableInfo, PluginMetrics pluginMetrics) {
super(buffer, tableInfo);
public ExportRecordConverter(final BufferAccumulator<Record<Event>> bufferAccumulator, TableInfo tableInfo, PluginMetrics pluginMetrics) {
super(bufferAccumulator, tableInfo);
this.pluginMetrics = pluginMetrics;
this.exportRecordSuccessCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT);
this.exportRecordErrorCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSING_ERROR_COUNT);
Expand All @@ -60,21 +59,26 @@ String getEventType() {
}

public void writeToBuffer(List<String> lines) {
List<Map<String, Object>> data = lines.stream()
.map(this::convertToMap)
.map(d -> (Map<String, Object>) d.get(ITEM_KEY))
.collect(Collectors.toList());

List<Record<Event>> events = data.stream().map(this::convertToEvent).collect(Collectors.toList());
int eventCount = 0;
for (String line : lines) {
Map data = (Map<String, Object>) convertToMap(line).get(ITEM_KEY);
try {
addToBuffer(data);
eventCount++;
} catch (Exception e) {
// will this cause too many logs?
LOG.error("Failed to add event to buffer due to {}", e.getMessage());
}
}

try {
writeEventsToBuffer(events);
exportRecordSuccessCounter.increment(events.size());
flushBuffer();
exportRecordSuccessCounter.increment(eventCount);
} catch (Exception e) {
LOG.error("Failed to write {} events to buffer due to {}", events.size(), e.getMessage());
exportRecordErrorCounter.increment(events.size());
LOG.error("Failed to write {} events to buffer due to {}", eventCount, e.getMessage());
exportRecordErrorCounter.increment(eventCount);
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
package org.opensearch.dataprepper.plugins.source.dynamodb.converter;

public class MetadataKeyAttributes {
static final String PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE = "primary_key";
static final String PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE = "primary_key";

static final String PARTITION_KEY_METADATA_ATTRIBUTE = "partition_key";
static final String PARTITION_KEY_METADATA_ATTRIBUTE = "partition_key";

static final String SORT_KEY_METADATA_ATTRIBUTE = "sort_key";
static final String SORT_KEY_METADATA_ATTRIBUTE = "sort_key";

static final String EVENT_TIMESTAMP_METADATA_ATTRIBUTE = "ts";
static final String EVENT_TIMESTAMP_METADATA_ATTRIBUTE = "ts";

static final String STREAM_EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE = "op";
static final String EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE = "op";

static final String EVENT_TABLE_NAME_METADATA_ATTRIBUTE = "table_name";
static final String EVENT_TABLE_NAME_METADATA_ATTRIBUTE = "table_name";
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,22 @@

package org.opensearch.dataprepper.plugins.source.dynamodb.converter;

import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;

import java.time.Instant;
import java.util.List;
import java.util.Map;

import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.SORT_KEY_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.STREAM_EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE;

/**
* Base Record Processor definition.
Expand All @@ -33,68 +31,72 @@ public abstract class RecordConverter {

private static final String DEFAULT_ACTION = OpenSearchBulkActions.INDEX.toString();

private static final int DEFAULT_WRITE_TIMEOUT_MILLIS = 60_000;

private final Buffer<Record<Event>> buffer;
private final BufferAccumulator<Record<Event>> bufferAccumulator;

private final TableInfo tableInfo;

public RecordConverter(Buffer<Record<Event>> buffer, TableInfo tableInfo) {
this.buffer = buffer;
public RecordConverter(final BufferAccumulator<Record<Event>> bufferAccumulator, TableInfo tableInfo) {
this.bufferAccumulator = bufferAccumulator;
this.tableInfo = tableInfo;
}


abstract String getEventType();

/**
* Default method to conduct the document ID value,
* Using partition key plus sort key (if any)
* Extract the value based on attribute map
*
* @param data A map of attribute name and value
* @param attributeName Attribute name
* @return the related attribute value, return null if the attribute name doesn't exist.
*/
String getId(Map<String, Object> data) {
String partitionKey = String.valueOf(data.get(tableInfo.getMetadata().getPartitionKeyAttributeName()));
if (tableInfo.getMetadata().getSortKeyAttributeName() == null) {
return partitionKey;
private String getAttributeValue(final Map<String, Object> data, String attributeName) {
if (data.containsKey(attributeName)) {
return String.valueOf(data.get(attributeName));
}
String sortKey = String.valueOf(data.get(tableInfo.getMetadata().getSortKeyAttributeName()));
return partitionKey + "_" + sortKey;
return null;
}

String getPartitionKey(final Map<String, Object> data) {
return String.valueOf(data.get(tableInfo.getMetadata().getPartitionKeyAttributeName()));
void flushBuffer() throws Exception {
bufferAccumulator.flush();
}

String getSortKey(final Map<String, Object> data) {
return String.valueOf(data.get(tableInfo.getMetadata().getSortKeyAttributeName()));
}

void writeEventsToBuffer(List<Record<Event>> events) throws Exception {
buffer.writeAll(events, DEFAULT_WRITE_TIMEOUT_MILLIS);
}

public Record<Event> convertToEvent(Map<String, Object> data, Instant eventCreationTime, String streamEventName) {
Event event;
event = JacksonEvent.builder()
/**
* Add event record to buffer
*
* @param data A map to hold event data, note that it may be empty.
* @param keys A map to hold the keys (partition key and sort key)
* @param eventCreationTimeMillis Creation timestamp of the event
* @param eventName Event name
* @throws Exception Exception if failed to write to buffer.
*/
public void addToBuffer(Map<String, Object> data, Map<String, Object> keys, long eventCreationTimeMillis, String eventName) throws Exception {
Event event = JacksonEvent.builder()
.withEventType(getEventType())
.withData(data)
.build();
EventMetadata eventMetadata = event.getMetadata();

eventMetadata.setAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE, tableInfo.getTableName());
if (eventCreationTime != null) {
eventMetadata.setAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreationTime.toEpochMilli());
eventMetadata.setAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreationTimeMillis);
eventMetadata.setAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE, mapStreamEventNameToBulkAction(eventName));
String partitionKey = getAttributeValue(keys, tableInfo.getMetadata().getPartitionKeyAttributeName());
eventMetadata.setAttribute(PARTITION_KEY_METADATA_ATTRIBUTE, partitionKey);

String sortKey = getAttributeValue(keys, tableInfo.getMetadata().getSortKeyAttributeName());
if (sortKey != null) {
eventMetadata.setAttribute(SORT_KEY_METADATA_ATTRIBUTE, sortKey);
eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, partitionKey + "_" + sortKey);
} else {
eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, partitionKey);
}

eventMetadata.setAttribute(STREAM_EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE, mapStreamEventNameToBulkAction(streamEventName));
eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, getId(data));
eventMetadata.setAttribute(PARTITION_KEY_METADATA_ATTRIBUTE, getPartitionKey(data));
eventMetadata.setAttribute(SORT_KEY_METADATA_ATTRIBUTE, getSortKey(data));

return new Record<>(event);
bufferAccumulator.add(new Record<>(event));
}

public Record<Event> convertToEvent(Map<String, Object> data) {
return convertToEvent(data, null, null);
public void addToBuffer(Map<String, Object> data) throws Exception {
// Export data doesn't have an event timestamp
// Default to current timestamp when the event is added to buffer
addToBuffer(data, data, System.currentTimeMillis(), null);
}

private String mapStreamEventNameToBulkAction(final String streamEventName) {
Expand Down
Loading

0 comments on commit fe750f5

Please sign in to comment.