Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for storing raw bytes in Kafka Buffer #3519

Merged
merged 8 commits into from
Oct 20, 2023

Conversation

kkondaka
Copy link
Collaborator

Description

Adding support for storing raw bytes in Kafka Buffer.

  • Added writeBytes api to Buffer
  • Added ByteDecoder that's set in the buffer based on the source during pipeline creation
  • HTTP source modified to call writeBytes API
  • Kafka buffer producer writes bytes to kafka
  • Kafka buffer consumer reads bytes and invokes decoder to create records

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • [X ] New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • [ X] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

*/
void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException;

// ByteDecoder getDecoder();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this.

@@ -35,4 +36,8 @@ public interface Source<T extends Record<?>> {
default boolean areAcknowledgementsEnabled() {
return false;
}

default ByteDecoder getDecoder() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should make this a new interface instead of adding to Source. These could potentially be decoupled in the future if we saw the need.

public interface HasByteDecoder {
  ByteDecoder getDecoder();
}

* @return A new instance of your plugin, configured
* @since 2.6
*/
<T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting, Object argument);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can avoid a new interface and prepare for future extensions by using varargs here. There is no reason that we might not have more arguments in the future.

You can modify the existing loadPlugin(Class<T>, PluginSetting) by adding Object...

 <T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting, Object... arguments);

Existing calls will remain compatible because zero arguments are allowed.

@@ -77,7 +81,13 @@ private ComponentPluginArgumentsContext(final Builder builder) {
}

@Override
public Object[] createArguments(final Class<?>[] parameterTypes) {
public Object[] createArguments(final Class<?>[] parameterTypes, Optional<Object> optionalArgument) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not create Optional<> parameters. This is an anti-pattern. There is not guarantee that Optional is not null. So, you would need to write code like:

if(optionalSometing != null && optionalSomething.isPresent())

Also, make this a varargs as well. Object....

interface PluginArgumentsContext {
Object[] createArguments(final Class<?>[] parameterTypes);
Object[] createArguments(final Class<?>[] parameterTypes, final Optional<Object> optionalArgument);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not create Optional<> parameters. This is an anti-pattern. There is not guarantee that Optional is not null. So, you would need to write code like:

if(optionalSometing != null && optionalSomething.isPresent())

Also, make this a varargs as well. Object....


@DataPrepperPluginConstructor
public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory,
final PipelineDescription pipelineDescription) {
this.sourceConfig = sourceConfig;
this.pluginMetrics = pluginMetrics;
this.pipelineName = pipelineDescription.getPipelineName();
this.byteDecoder = new JsonDecoder();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, there is a slightly different decoder for the byte buffer versus the codec. This could lead to hard-to-find bugs.

We should use the exact same code path for decoding regardless of the buffer type. Please refactor this to connect the ByteDecoder directly to the codec.

try {
buffer.writeAll(records, bufferWriteTimeoutInMillis);
} catch (Exception e) {
LOG.error("Failed to write the request of size {} due to: {}", content.length(), e.getMessage());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should perform the writeBytes and writeAll in the same try-catch. This way, we have the same error handling on both. This split can lead to programming errors quite easily.


try {
jsonList = jsonCodec.parse(content);
} catch (IOException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned about not doing any parsing even with the byte buffer. With this change, callers will not get a Bad Request when sending bad data. It will only become apparent downstream. This means that a different buffer results in different fundamental behavior between pipelines. This doesn't seem quite right.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you suggest doing parse and ignore the parse result in case of byte buffer?

topicMetrics.getNumberOfBufferSizeOverflows().increment();
Thread.sleep(100);
}
if (schema == MessageFormat.BYTES && byteDecoder != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make use of the new SerializationFactory or add a similar concept to avoid having these conditionals in the consumer? Maybe a future improvement.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. we need to better organize it. I will open an issue for this.

try {
send(topicName, key, bytes).get();
} catch (Exception e) {
LOG.error("Error occurred while publishing " + e.getMessage());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use string interpolation with log lines:

LOG.error("Error occurred while publishing {}", e.getMessage());

Krishna Kondaka added 4 commits October 18, 2023 22:36
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good. I have a few comments, and I think we can also expand some test cases.

ByteDecoder byteDecoder = (ByteDecoder) optionalArgument.get();
public Object[] createArguments(final Class<?>[] parameterTypes, final Object ... args) {
Map<Class<?>, Supplier<Object>> optionalArgumentsSuppliers = new HashMap<>();
Boolean byteDecoderInitialized = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plugin framework shouldn't know or care about ByteDecoder at all. Any special code for that should be in PipelineParser.

Also, I'm not sure what this condition is really doing other than possibly avoiding passing the parameter in twice.

I think this code can be:

Map<Class<?>, Supplier<Object>> optionalArgumentsSuppliers = new HashMap<>();
for(final Object arg: args) {
  optionalArgumentsSuppliers.put(arg.getClass(), () -> arg);
}

optionalArgumentsSuppliers.put(ByteDecoder.class, () -> byteDecoder);
}
}
return Arrays.stream(parameterTypes)
.map(this::getRequiredArgumentSupplier)
.map((parameterType) -> getRequiredArgumentSupplier(parameterType, optionalArgumentsSuppliers))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You don't need the parenthesis around parameterType.

.map(parameterType -> getRequiredArgumentSupplier(parameterType, optionalArgumentsSuppliers))

Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to my previous comments, I've added some specific unit test cases we can cover.

Also, we should cover this change in DefaultPluginFactoryIT.

@@ -256,7 +256,7 @@ void loadPlugins_should_return_a_single_instance_when_the_the_numberOfInstances_
verify(pluginCreator).newPluginInstance(eq(expectedPluginClass), pluginArgumentsContextArgCapture.capture(), eq(pluginName));
final ComponentPluginArgumentsContext actualPluginArgumentsContext = pluginArgumentsContextArgCapture.getValue();
final List<Class> classes = List.of(PipelineDescription.class);
final Object[] pipelineDescriptionObj = actualPluginArgumentsContext.createArguments(classes.toArray(new Class[1]));
final Object[] pipelineDescriptionObj = actualPluginArgumentsContext.createArguments(classes.toArray(new Class[1]), Optional.empty());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove this Optional.empty()?

@@ -292,7 +292,7 @@ void loadPlugins_should_return_an_instance_for_the_total_count() {
assertThat(actualPluginArgumentsContextList.size(), equalTo(3));
actualPluginArgumentsContextList.forEach(pluginArgumentsContext -> {
final List<Class> classes = List.of(PipelineDescription.class);
final Object[] pipelineDescriptionObj = pluginArgumentsContext.createArguments(classes.toArray(new Class[1]));
final Object[] pipelineDescriptionObj = pluginArgumentsContext.createArguments(classes.toArray(new Class[1]), Optional.empty());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove this Optional.empty()?

@@ -174,7 +175,7 @@ void loadExtensions_invokes_newPluginInstance_with_PluginArgumentsContext_not_su
final PluginArgumentsContext actualPluginArgumentsContext = contextArgumentCaptor.getValue();

final Class[] inputClasses = {String.class};
assertThrows(InvalidPluginDefinitionException.class, () -> actualPluginArgumentsContext.createArguments(inputClasses));
assertThrows(InvalidPluginDefinitionException.class, () -> actualPluginArgumentsContext.createArguments(inputClasses, Optional.empty()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove this Optional.empty()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we might want a couple variations on the test.

createArguments(inputClasses)

and

final Object someArgument = UUID.randomUUID().toString();
createArguments(inputClasses, someArgument)


final PluginArgumentsContext actualPluginArgumentsContext = contextArgumentCaptor.getValue();

final Object[] arguments = actualPluginArgumentsContext.createArguments(new Class[]{});
final Object[] arguments = actualPluginArgumentsContext.createArguments(new Class[]{}, Optional.empty());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the Optional.empty(). Also, we might want two test cases here.

final String pluginName = pluginSetting.getName();
final Class<? extends T> pluginClass = getPluginClass(baseClass, pluginName);

final ComponentPluginArgumentsContext constructionContext = getConstructionContext(pluginSetting, pluginClass, null);

return pluginCreator.newPluginInstance(pluginClass, constructionContext, pluginName);
return pluginCreator.newPluginInstance(pluginClass, constructionContext, pluginName, args);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's have some test cases to verify that this works with args and without args. And that these args are passed into the correct mocks.

@@ -77,21 +78,30 @@ private ComponentPluginArgumentsContext(final Builder builder) {
}

@Override
public Object[] createArguments(final Class<?>[] parameterTypes) {
public Object[] createArguments(final Class<?>[] parameterTypes, final Object ... args) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, let's add unit tests for this method to ensure that args are used correctly and as expected.

Krishna Kondaka added 2 commits October 19, 2023 19:47
@@ -112,6 +112,11 @@ public void writeAll(Collection<T> records, int timeoutInMillis) throws Exceptio
}
}

@Override
public void writeBytes(final byte[] bytes, final String key, int timeoutInMillis) throws Exception {
throw new RuntimeException("not supported");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an UnsupportedOperationException

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do this in the next PR

@kkondaka kkondaka merged commit aa81607 into opensearch-project:main Oct 20, 2023
26 of 27 checks passed
@kkondaka kkondaka deleted the kafka-bytes branch May 13, 2024 05:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants