Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sqs source: json codec support to split sqs message into multiple events #5330

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

jmsusanto
Copy link
Contributor

Description

adds json codec support and functionality to split message into multiple events

#5054

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@jmsusanto jmsusanto changed the title json codec support and functionality to split message into multiple events sqs source: json codec support to split sqs message into multiple events Jan 14, 2025
eventMetadata.setAttribute(lowerCamelCaseKey, entry.getValue());
}

for (Map.Entry<String, MessageAttributeValue> entry : message.messageAttributes().entrySet()) {
Copy link
Member

@dlvenable dlvenable Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this code is similar to the RawSqsMessageHandler. We should not duplicate this code.

I think the best option is to update the existing RawSqsMessageHandler to support an injectable message strategy.

It might look like:

interface MessageFieldStrategy {
  List<Event> parseEvents(String messageBody);
}

This has some advantages of being able to use buffer.writeAll for the whole batch and the attribute code is shared for all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the BulkMessageHandler and added strategies instead. buffer.writeAll is also used instead of buffer.write now

for (Map.Entry<String, MessageAttributeValue> entry : message.messageAttributes().entrySet()) {
final String originalKey = entry.getKey();
final String lowerCamelCaseKey = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);;
eventMetadata.setAttribute(lowerCamelCaseKey, entry.getValue().stringValue());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this processing can be shared, which has the added benefit of avoiding extra memory. You can do this in the refactoring that I suggest above.

That is, before the loop over the List<Event>, create a Map<String, String> for all the attributes. Then re-use those values. This should reduce both compute and memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great idea, the metadata/attributes of a bulk message would remain the same for every event in that message

@@ -0,0 +1,73 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the longer copyright header.

https://github.com/opensearch-project/data-prepper/blob/90575b1de56f82f44d1af36f31ff4b077a627bd7/CONTRIBUTING.md#license-headers

/*
 * Copyright OpenSearch Contributors
 * SPDX-License-Identifier: Apache-2.0
 *
 * The OpenSearch Contributors require contributions made to
 * this file be licensed under the Apache-2.0 license or a
 * compatible open source license.
 *
*/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added to every file

Jeremy Michael added 2 commits January 14, 2025 14:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants