Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add acknowledgments for the ddb source #3575

Merged
merged 2 commits into from
Nov 2, 2023

Conversation

graytaylor0
Copy link
Member

@graytaylor0 graytaylor0 commented Nov 1, 2023

Description

This change adds end to end acknowledgments to the dynamodb source. This includes configurable acknowledgment timeouts for both shards and data files.

the new parameters are

Boolean to enable acknowledgments
acknowledgments:  true
 Duration to timeout before data file is reprocessed (from the time the last line of data file is written to the buffer to the time that it is reached by the sink (Data Prepper duration format)).

s3_data_file_acknowledgment_timeout: "30s" // Default value is 5 minutes (is this too high?)
 Duration to timeout before shard is reprocessed (from the time the last record in the shard is written to buffer till it is reached by the sink (Data Prepper duration format)

shard_acknowledgment_timeout: "30s" // Default value is 3 minutes (is this too high?)

Additionally, another parameter was added in this PR for the s3_region for the export. The default will be to use the same region for the bucket as the aws.region for sts, but this can be set to something different now with the s3_region parameter. Setting this region on the client fixes an issue where GetObject requests would hang infinitely in some scenarios (for unknown reasons)

Issues Resolved

Resolves #3538

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Comment on lines 35 to 39
@JsonProperty("shard_acknowledgment_timeout")
private Duration shardAcknowledgmentTimeout = Duration.ofMinutes(10);

@JsonProperty("s3_data_file_acknowledgment_timeout")
private Duration dataFileAcknowledgmentTimeout = Duration.ofMinutes(15);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are safe defaults, but technically I'm thinking since it is the time from last record written to buffer to the time it reaches the sink (along with all previous records), that this timeout can likely be 30 seconds in many cases, but I'm scared to default to that without some thorough testing. These high timeout values may be better for default, and then depending on use cases users can lower as much as they can. Thoughts?

@JsonProperty("s3_prefix")
private String s3Prefix;

@JsonProperty("s3_region")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really an issue, but just want to point out that I have never tested whether it can support s3 in a different region or not. Even it can, it will be strange since ddb and opensearch are in the same region, what is the point to write or read data files from another region.

Copy link
Member Author

@graytaylor0 graytaylor0 Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't know why anyone would do that but users have the option to export to a different region's bucket. By default this can be ignored for same region though, so it can't really hurt to have

@@ -91,12 +92,15 @@ public void addToBuffer(Map<String, Object> data, Map<String, Object> keys, long
eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, partitionKey);
}
bufferAccumulator.add(new Record<>(event));
if (acknowledgementSet != null) {
acknowledgementSet.add(event);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't fully understand how this works, but will this have any memory issue? I only see there is a add of event, will that be a clear of event somewhere else.

Copy link
Member Author

@graytaylor0 graytaylor0 Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just adds an eventHandle to the acknowledgment set, and this takes up a very small amount of memory. We release events when they are sent to the sink, and then after acknowledgment set is acked it will be cleaned up. In case of no ack then it will timeout and be cleaned up then. @kkondaka can explain more if needed

Signed-off-by: Taylor Gray <[email protected]>
@graytaylor0 graytaylor0 merged commit d2007bc into opensearch-project:main Nov 2, 2023
54 checks passed
@graytaylor0 graytaylor0 deleted the AcksDdb branch November 2, 2023 16:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

End to end acknowledgments for Dynamo source
3 participants