Skip to content

Commit

Permalink
Correct single quote escape character in DynamoDB [#3664] (#3667)
Browse files Browse the repository at this point in the history
Resolves a bug with escaped single quotes in the DynamoDB source by updating the AWS SDK to 2.21.23. Also, skip data that cannot be parsed entirely rather than silently send empty data. Resolves #3664.

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Nov 15, 2023
1 parent e848f63 commit 1ffb572
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 15 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ subprojects {

configure(subprojects.findAll {it.name != 'data-prepper-api'}) {
dependencies {
implementation platform('software.amazon.awssdk:bom:2.20.67')
implementation platform('software.amazon.awssdk:bom:2.21.23')
implementation 'jakarta.validation:jakarta.validation-api:3.0.2'
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,18 @@ public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List<Reco
int eventCount = 0;
for (Record record : records) {
final long bytes = record.dynamodb().sizeBytes();
// NewImage may be empty
Map<String, Object> data = convertData(record.dynamodb().newImage());
// Always get keys from dynamodb().keys()
Map<String, Object> keys = convertKeys(record.dynamodb().keys());
Map<String, Object> data;
Map<String, Object> keys;
try {
// NewImage may be empty
data = convertData(record.dynamodb().newImage());
// Always get keys from dynamodb().keys()
keys = convertKeys(record.dynamodb().keys());
} catch (final Exception e) {
LOG.error("Failed to parse and convert data from stream due to {}", e.getMessage());
changeEventErrorCounter.increment();
continue;
}

try {
bytesReceivedSummary.record(bytes);
Expand Down Expand Up @@ -101,13 +109,9 @@ public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List<Reco
/**
* Convert the DynamoDB attribute map to a normal map for data
*/
private Map<String, Object> convertData(Map<String, AttributeValue> data) {
try {
String jsonData = EnhancedDocument.fromAttributeValueMap(data).toJson();
return MAPPER.readValue(jsonData, MAP_TYPE_REFERENCE);
} catch (JsonProcessingException e) {
return null;
}
private Map<String, Object> convertData(Map<String, AttributeValue> data) throws JsonProcessingException {
String jsonData = EnhancedDocument.fromAttributeValueMap(data).toJson();
return MAPPER.readValue(jsonData, MAP_TYPE_REFERENCE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
Expand All @@ -27,6 +29,8 @@

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand All @@ -39,6 +43,7 @@
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
Expand Down Expand Up @@ -155,11 +160,92 @@ void test_writeSingleRecordToBuffer() throws Exception {
assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo("INSERT"));
assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(record.dynamodb().approximateCreationDateTime().toEpochMilli()));

assertThat(event.get(partitionKeyAttrName, String.class), notNullValue());
assertThat(event.get(sortKeyAttrName, String.class), notNullValue());

verifyNoInteractions(changeEventErrorCounter);
verify(bytesReceivedSummary).record(record.dynamodb().sizeBytes());
verify(bytesProcessedSummary).record(record.dynamodb().sizeBytes());
}

@ParameterizedTest
@ValueSource(strings = {
"Hello world.",
"I'm sorry.",
"I'm sorry, but I don't have access to that.",
"Re: colons",
"and/or",
"c:\\Home",
"I take\nup multiple\nlines",
"String with some \"backquotes\"."
})
void test_writeSingleRecordToBuffer_with_other_data(final String additionalString) throws Exception {

final Map<String, AttributeValue> additionalData = Map.of("otherData", AttributeValue.builder().s(additionalString).build());
List<software.amazon.awssdk.services.dynamodb.model.Record> records = buildRecords(1, Instant.now(), additionalData);
final ArgumentCaptor<Record> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class);
software.amazon.awssdk.services.dynamodb.model.Record record = records.get(0);
final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics);
doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture());

objectUnderTest.writeToBuffer(null, records);

verify(bufferAccumulator).add(any(Record.class));
verify(bufferAccumulator).flush();
verify(changeEventSuccessCounter).increment(anyDouble());
assertThat(recordArgumentCaptor.getValue().getData(), notNullValue());
JacksonEvent event = (JacksonEvent) recordArgumentCaptor.getValue().getData();

assertThat(event.getMetadata(), notNullValue());
String partitionKey = record.dynamodb().keys().get(partitionKeyAttrName).s();
String sortKey = record.dynamodb().keys().get(sortKeyAttrName).s();
assertThat(event.getMetadata().getAttribute(PARTITION_KEY_METADATA_ATTRIBUTE), equalTo(partitionKey));
assertThat(event.getMetadata().getAttribute(SORT_KEY_METADATA_ATTRIBUTE), equalTo(sortKey));
assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(partitionKey + "|" + sortKey));
assertThat(event.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.INDEX.toString()));
assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo("INSERT"));
assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(record.dynamodb().approximateCreationDateTime().toEpochMilli()));

assertThat(event.get(partitionKeyAttrName, String.class), notNullValue());
assertThat(event.get(sortKeyAttrName, String.class), notNullValue());
assertThat(event.get("otherData", String.class), equalTo(additionalString));

verifyNoInteractions(changeEventErrorCounter);
verify(bytesReceivedSummary).record(record.dynamodb().sizeBytes());
verify(bytesProcessedSummary).record(record.dynamodb().sizeBytes());
}

@Test
void test_writeSingleRecordToBuffer_with_bad_input_does_not_write() throws Exception {

final Map<String, AttributeValue> badData = Map.of("otherData", AttributeValue.builder().build());
List<software.amazon.awssdk.services.dynamodb.model.Record> badRecords = buildRecords(2, Instant.now(), badData);

final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics);

objectUnderTest.writeToBuffer(null, badRecords);

verify(bufferAccumulator, never()).add(any(Record.class));
}

@Test
void test_writeSingleRecordToBuffer_with_mixed_input_writes_good_records() throws Exception {

final Map<String, AttributeValue> badData = Map.of("otherData", AttributeValue.builder().build());
List<software.amazon.awssdk.services.dynamodb.model.Record> badRecords = buildRecords(2, Instant.now(), badData);
List<software.amazon.awssdk.services.dynamodb.model.Record> goodRecords = buildRecords(5, Instant.now());

final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics);

List<software.amazon.awssdk.services.dynamodb.model.Record> mixedRecords = new ArrayList<>();
mixedRecords.addAll(badRecords);
mixedRecords.addAll(goodRecords);

objectUnderTest.writeToBuffer(null, mixedRecords);

verify(bufferAccumulator, times(goodRecords.size())).add(any(Record.class));
}

@Test
void writingToBuffer_with_nth_event_in_that_second_returns_expected_that_timestamp() throws Exception {
final long currentSecond = 1699336310;
Expand Down Expand Up @@ -252,22 +338,39 @@ void writingToBuffer_with_nth_event_in_that_second_returns_expected_that_timesta
}

private List<software.amazon.awssdk.services.dynamodb.model.Record> buildRecords(int count, final Instant creationTime) {
return buildRecords(count, creationTime, Collections.emptyMap());
}

private List<software.amazon.awssdk.services.dynamodb.model.Record> buildRecords(
int count,
final Instant creationTime,
final Map<String, AttributeValue> additionalData) {
List<software.amazon.awssdk.services.dynamodb.model.Record> records = new ArrayList<>();
for (int i = 0; i < count; i++) {
records.add(buildRecord(creationTime));
records.add(buildRecord(creationTime, additionalData));
}

return records;
}

private software.amazon.awssdk.services.dynamodb.model.Record buildRecord(final Instant creationTime) {
Map<String, AttributeValue> data = Map.of(
return buildRecord(creationTime, Collections.emptyMap());
}

private software.amazon.awssdk.services.dynamodb.model.Record buildRecord(final Instant creationTime,
Map<String, AttributeValue> additionalData) {
Map<String, AttributeValue> keysData = Map.of(
partitionKeyAttrName, AttributeValue.builder().s(UUID.randomUUID().toString()).build(),
sortKeyAttrName, AttributeValue.builder().s(UUID.randomUUID().toString()).build());

final Map<String, AttributeValue> data = new HashMap<>();
data.putAll(keysData);
data.putAll(additionalData);

StreamRecord streamRecord = StreamRecord.builder()
.sizeBytes(RANDOM.nextLong())
.newImage(data)
.keys(data)
.keys(keysData)
.sequenceNumber(UUID.randomUUID().toString())
.approximateCreationDateTime(creationTime)
.build();
Expand Down

0 comments on commit 1ffb572

Please sign in to comment.