diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/PipelineIf.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/PipelineIf.java new file mode 100644 index 0000000000..8c90be7913 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/PipelineIf.java @@ -0,0 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model; + +import org.opensearch.dataprepper.model.source.Source; + +public interface PipelineIf { + Source getSource(); +} + diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/FailurePipeline.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/FailurePipeline.java new file mode 100644 index 0000000000..b3f0af9e58 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/FailurePipeline.java @@ -0,0 +1,16 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.failures; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +import java.util.Collection; +public interface FailurePipeline { + void sendFailedEvents(Collection> events); +} + + diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java index 26dd7e98a6..0349f660eb 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java @@ -9,6 +9,7 @@ import io.micrometer.core.instrument.Timer; import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; @@ -64,16 +65,16 @@ public void initialize() { * @param records the records to write to the sink. */ @Override - public void output(Collection records) { + public void output(Collection records, final PipelineIf failurePipeline) { recordsInCounter.increment(records.size()*1.0); - timeElapsedTimer.record(() -> doOutput(records)); + timeElapsedTimer.record(() -> doOutput(records, failurePipeline)); } /** * This method should implement the output logic * @param records Records to be output */ - public abstract void doOutput(Collection records); + public abstract void doOutput(Collection records, final PipelineIf failurePipeline); @Override public void shutdown() { diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java index 178566ba5b..2c95e21d8b 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.model.sink; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.PipelineIf; import java.util.Collection; @@ -20,7 +21,11 @@ public interface Sink> { * * @param records the records to write to the sink. */ - void output(Collection records); + default void output(Collection records) { + output(records, null); + } + + void output(Collection records, PipelineIf failurePipeline); /** * Prepare sink for shutdown, by cleaning up resources and threads. diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java index 1026cb6083..9fc374d4da 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java @@ -11,6 +11,7 @@ import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.metrics.MetricsTestUtil; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; @@ -149,7 +150,7 @@ private static class AbstractEventSinkImpl extends AbstractSink> { } @Override - public void doOutput(Collection> records) { + public void doOutput(Collection> records, PipelineIf failurePipeline) { for (final Record record: records) { Event event = record.getData(); event.getEventHandle().release(true); @@ -179,7 +180,7 @@ private static class AbstractSinkImpl extends AbstractSink> { } @Override - public void doOutput(Collection> records) { + public void doOutput(Collection> records, PipelineIf failurePipeline) { try { Thread.sleep(150); } catch (InterruptedException e) { @@ -213,7 +214,7 @@ private static class AbstractSinkNotReadyImpl extends AbstractSink> records) { + public void doOutput(Collection> records, PipelineIf failurePipeline) { } @Override diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java index 5f66a623aa..717c26a056 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.model.sink; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.PipelineIf; import org.junit.jupiter.api.Test; import java.util.Collection; @@ -28,7 +29,7 @@ public void initialize() { } @Override - public void output(Collection> records) { + public void output(Collection> records, PipelineIf failurePipeline) { } }; diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java index a98b1c56b8..6727711ea3 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; @@ -34,7 +35,7 @@ public InMemorySink(final InMemoryConfig inMemoryConfig, } @Override - public void output(final Collection> records) { + public void output(final Collection> records, final PipelineIf failurePipeline) { inMemorySinkAccessor.addEvents(testingKey, records); boolean result = inMemorySinkAccessor.getResult(); records.stream().forEach((record) -> { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java index 77cf649535..c02b4c4c64 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderProvider; import org.opensearch.dataprepper.core.peerforwarder.PeerForwardingProcessorDecorator; import org.opensearch.dataprepper.core.pipeline.Pipeline; +import org.opensearch.dataprepper.core.pipeline.FailurePipelineSource; import org.opensearch.dataprepper.core.pipeline.PipelineConnector; import org.opensearch.dataprepper.core.pipeline.router.Router; import org.opensearch.dataprepper.core.pipeline.router.RouterFactory; @@ -59,6 +60,7 @@ public class PipelineTransformer { "See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax."; private static final String PIPELINE_TYPE = "pipeline"; private static final String ATTRIBUTE_NAME = "name"; + //private static final String FAILURE_PIPELINE_NAME = "dlq"; private final PipelinesDataFlowModel pipelinesDataFlowModel; private final RouterFactory routerFactory; private final DataPrepperConfiguration dataPrepperConfiguration; @@ -128,24 +130,30 @@ private void buildPipelineFromConfiguration( final Map pipelineMap) { final PipelineConfiguration pipelineConfiguration = pipelineConfigurationMap.get(pipelineName); LOG.info("Building pipeline [{}] from provided configuration", pipelineName); + final String failurePipelineName = dataPrepperConfiguration.getFailurePipelineName(); try { + Source source; final PluginSetting sourceSetting = pipelineConfiguration.getSourcePluginSetting(); - final Optional pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting, - pipelineMap, pipelineConfigurationMap); - final Source source = pipelineSource.orElseGet(() -> { - try { - return pluginFactory.loadPlugin(Source.class, sourceSetting); - } catch (Exception e) { - final PluginError pluginError = PluginError.builder() - .componentType(PipelineModel.SOURCE_PLUGIN_TYPE) - .pipelineName(pipelineName) - .pluginName(sourceSetting.getName()) - .exception(e) - .build(); - pluginErrorCollector.collectPluginError(pluginError); - return null; - } - }); + if (!pipelineName.equals(failurePipelineName)) { + final Optional pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting, + pipelineMap, pipelineConfigurationMap); + source = pipelineSource.orElseGet(() -> { + try { + return pluginFactory.loadPlugin(Source.class, sourceSetting); + } catch (Exception e) { + final PluginError pluginError = PluginError.builder() + .componentType(PipelineModel.SOURCE_PLUGIN_TYPE) + .pipelineName(pipelineName) + .pluginName(sourceSetting.getName()) + .exception(e) + .build(); + pluginErrorCollector.collectPluginError(pluginError); + return null; + } + }); + } else { + source = new FailurePipelineSource(); + } LOG.info("Building buffer for the pipeline [{}]", pipelineName); Buffer pipelineDefinedBuffer = null; @@ -234,7 +242,14 @@ private void buildPipelineFromConfiguration( "pipelines", pipelineName, ex); processRemoveIfRequired(pipelineName, pipelineConfigurationMap, pipelineMap); } - + final Pipeline failurePipeline = pipelineMap.get(failurePipelineName); + if (failurePipeline != null) { + for (Map.Entry pipelineEntry : pipelineMap.entrySet()) { + if (!(pipelineEntry.getKey().equals(failurePipelineName))) { + pipelineEntry.getValue().setFailurePipeline(failurePipeline); + } + } + } } private List> newProcessor(final PluginSetting pluginSetting) { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java index 9212a9943b..e7b72cf22b 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java @@ -34,6 +34,7 @@ public class DataPrepperConfiguration implements ExtensionsConfiguration, EventConfigurationContainer { static final Duration DEFAULT_SHUTDOWN_DURATION = Duration.ofSeconds(30L); + static final String DEFAULT_FAILURE_PIPELINE_NAME = "dlq"; private static final String DEFAULT_SOURCE_COORDINATION_STORE = "in_memory"; static final int MAX_TAGS_NUMBER = 3; @@ -213,6 +214,8 @@ public void setMetricTagFilters(final List metricTagFilters) { } } + public String getFailurePipelineName() { return DEFAULT_FAILURE_PIPELINE_NAME; } + public Duration getProcessorShutdownTimeout() { return processorShutdownTimeout; } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/FailurePipelineSource.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/FailurePipelineSource.java new file mode 100644 index 0000000000..548e4da0cb --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/FailurePipelineSource.java @@ -0,0 +1,48 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.core.pipeline; + +import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.model.failures.FailurePipeline; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; + +public class FailurePipelineSource implements Source>, FailurePipeline { + private static final Logger LOG = LoggerFactory.getLogger(FailurePipelineSource.class); + private static final int DEFAULT_WRITE_TIMEOUT = Integer.MAX_VALUE; + private Buffer buffer; + private AtomicBoolean isStopRequested; + + public FailurePipelineSource() { + isStopRequested = new AtomicBoolean(false); + } + + @Override + public void start(Buffer buffer) { + this.buffer = buffer; + } + + @Override + public void stop() { + isStopRequested.set(true); + } + + @Override + public void sendFailedEvents(Collection> records) { + try { + buffer.writeAll(records, DEFAULT_WRITE_TIMEOUT); + } catch (Exception e) { + LOG.error("Failed to write to failure pipeline"); + } + } + +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java index b5d3f812cf..b53f6ffd43 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java @@ -16,6 +16,7 @@ import org.opensearch.dataprepper.core.pipeline.router.RouterGetRecordStrategy; import org.opensearch.dataprepper.core.sourcecoordination.SourceCoordinatorFactory; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.processor.Processor; @@ -52,7 +53,7 @@ * {@link Processor} and outputs the transformed (or original) data to {@link Sink}. */ @SuppressWarnings({"rawtypes", "unchecked"}) -public class Pipeline { +public class Pipeline implements PipelineIf { private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class); private static final int SINK_LOGGING_FREQUENCY = (int) Duration.ofSeconds(60).toMillis(); private final PipelineShutdown pipelineShutdown; @@ -73,6 +74,7 @@ public class Pipeline { private final ExecutorService processorExecutorService; private final ExecutorService sinkExecutorService; private final EventFactory eventFactory; + private Pipeline failurePipeline; private final AcknowledgementSetManager acknowledgementSetManager; private final List observers = Collections.synchronizedList(new LinkedList<>()); @@ -119,6 +121,7 @@ public Pipeline( this.name = name; this.source = source; this.buffer = buffer; + this.failurePipeline = null; this.processorSets = processorSets; this.sinks = sinks; this.router = router; @@ -165,6 +168,14 @@ public Buffer getBuffer() { return this.buffer; } + public void setFailurePipeline(Pipeline failurePipeline) { + this.failurePipeline = failurePipeline; + } + + public Pipeline getFailurePipeline() { + return failurePipeline; + } + /** * @return {@link Sink} of this pipeline. */ @@ -360,7 +371,7 @@ List> publishToSinks(final Collection records) { router.route(records, sinks, getRecordStrategy, (sink, events) -> sinkFutures.add(sinkExecutorService.submit(() -> { sink.updateLatencyMetrics(events); - sink.output(events); + sink.output(events, failurePipeline); }, null)) ); return sinkFutures; diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java index 08bcfab642..5981c6dfd3 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.model.PipelineIf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +74,7 @@ public boolean isReady() { } @Override - public void output(final Collection records) { + public void output(final Collection records, final PipelineIf failurePipeline) { if (buffer != null && !isStopRequested.get()) { for (T record : records) { while (true) { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java index e313430b49..c9cedf29ef 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java @@ -145,6 +145,9 @@ private void doRun() { if (inputEvents != null) { processAcknowledgements(inputEvents, Collections.emptyList()); } + if (pipeline.getFailurePipeline() != null) { + ((FailurePipelineSource)(pipeline.getFailurePipeline().getSource())).sendFailedEvents(records); + } records = Collections.emptyList(); break; diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java index 9caa18820f..7fa9324246 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java @@ -162,6 +162,7 @@ void parseConfiguration_with_multiple_valid_pipelines_creates_the_correct_pipeli final Map actualPipelineMap = pipelineTransformer.transformConfiguration(); assertThat(actualPipelineMap.keySet(), equalTo(TestDataProvider.VALID_MULTIPLE_PIPELINE_NAMES)); verifyDataPrepperConfigurationAccesses(actualPipelineMap.keySet().size()); + verify(dataPrepperConfiguration, times(3)).getFailurePipelineName(); verify(dataPrepperConfiguration).getPipelineExtensions(); } @@ -173,6 +174,7 @@ void parseConfiguration_with_multiple_valid_pipelines_creates_the_correct_pipeli final Map actualPipelineMap = pipelineTransformer.transformConfiguration(); assertThat(actualPipelineMap.keySet(), equalTo(TestDataProvider.VALID_MULTIPLE_PIPELINE_NAMES)); verifyDataPrepperConfigurationAccesses(actualPipelineMap.keySet().size()); + verify(dataPrepperConfiguration, times(3)).getFailurePipelineName(); verify(dataPrepperConfiguration).getPipelineExtensions(); assertThat(actualPipelineMap, hasKey("test-pipeline-1")); @@ -198,6 +200,7 @@ void parseConfiguration_with_multiple_disconnected_valid_pipelines_creates_the_c final Map actualPipelineMap = pipelineTransformer.transformConfiguration(); assertThat(actualPipelineMap.keySet(), equalTo(TestDataProvider.VALID_MULTIPLE_PIPELINE_NAMES)); verifyDataPrepperConfigurationAccesses(actualPipelineMap.keySet().size()); + verify(dataPrepperConfiguration, times(3)).getFailurePipelineName(); verify(dataPrepperConfiguration).getPipelineExtensions(); assertThat(actualPipelineMap, hasKey("test-pipeline-1")); @@ -221,6 +224,7 @@ void parseConfiguration_with_invalid_root_source_pipeline_creates_empty_pipeline createObjectUnderTest(TestDataProvider.CONNECTED_PIPELINE_ROOT_SOURCE_INCORRECT); final Map connectedPipelines = pipelineTransformer.transformConfiguration(); assertThat(connectedPipelines.size(), equalTo(0)); + verify(dataPrepperConfiguration).getFailurePipelineName(); verify(dataPrepperConfiguration).getPipelineExtensions(); assertThat(pluginErrorCollector.getPluginErrors().size(), equalTo(1)); final PluginError sourcePluginError = pluginErrorCollector.getPluginErrors().get(0); @@ -236,6 +240,7 @@ void parseConfiguration_with_invalid_root_pipeline_creates_empty_pipelinesMap( final PipelineTransformer pipelineTransformer = createObjectUnderTest(pipelineResourcePath); final Map connectedPipelines = pipelineTransformer.transformConfiguration(); assertThat(connectedPipelines.size(), equalTo(0)); + verify(dataPrepperConfiguration).getFailurePipelineName(); verify(dataPrepperConfiguration).getPipelineExtensions(); assertThat(pluginErrorCollector.getPluginErrors().size(), equalTo(1)); final PluginError pluginError = pluginErrorCollector.getPluginErrors().get(0); @@ -262,6 +267,7 @@ void parseConfiguration_with_incorrect_child_pipeline_returns_empty_pipelinesMap final Map connectedPipelines = pipelineTransformer.transformConfiguration(); assertThat(connectedPipelines.size(), equalTo(0)); verifyDataPrepperConfigurationAccesses(); + verify(dataPrepperConfiguration, times(2)).getFailurePipelineName(); verify(dataPrepperConfiguration).getPipelineExtensions(); assertThat(pluginErrorCollector.getPluginErrors().size(), equalTo(2)); final PluginError pluginError = pluginErrorCollector.getPluginErrors().get(1); @@ -284,6 +290,7 @@ void parseConfiguration_with_a_single_pipeline_with_empty_source_settings_return final Map actualPipelineMap = pipelineTransformer.transformConfiguration(); assertThat(actualPipelineMap.keySet().size(), equalTo(1)); verifyDataPrepperConfigurationAccesses(); + verify(dataPrepperConfiguration).getFailurePipelineName(); verify(dataPrepperConfiguration).getPipelineExtensions(); } @@ -318,6 +325,7 @@ void parseConfiguration_with_compatible_version() { verify(dataPrepperConfiguration).getProcessorShutdownTimeout(); verify(dataPrepperConfiguration).getSinkShutdownTimeout(); verify(dataPrepperConfiguration).getPeerForwarderConfiguration(); + verify(dataPrepperConfiguration).getFailurePipelineName(); verify(dataPrepperConfiguration).getPipelineExtensions(); } @@ -350,6 +358,7 @@ void testMultipleSinks() { final Map pipelineMap = pipelineTransformer.transformConfiguration(); assertThat(pipelineMap.keySet().size(), equalTo(3)); verifyDataPrepperConfigurationAccesses(pipelineMap.keySet().size()); + verify(dataPrepperConfiguration, times(3)).getFailurePipelineName(); verify(dataPrepperConfiguration).getPipelineExtensions(); } @@ -361,6 +370,7 @@ void testMultipleProcessors() { final Map pipelineMap = pipelineTransformer.transformConfiguration(); assertThat(pipelineMap.keySet().size(), equalTo(3)); verifyDataPrepperConfigurationAccesses(pipelineMap.keySet().size()); + verify(dataPrepperConfiguration, times(3)).getFailurePipelineName(); verify(dataPrepperConfiguration).getPipelineExtensions(); } @@ -378,6 +388,7 @@ void parseConfiguration_with_routes_creates_correct_pipeline() { assertThat(entryPipeline, notNullValue()); assertThat(entryPipeline.getSinks(), notNullValue()); assertThat(entryPipeline.getSinks().size(), equalTo(2)); + verify(dataPrepperConfiguration, times(3)).getFailurePipelineName(); verify(dataPrepperConfiguration).getPipelineExtensions(); } @@ -406,6 +417,7 @@ void parseConfiguration_with_invalid_route_expressions_handles_errors_and_return assertThat(pluginError.getException() instanceof InvalidPluginConfigurationException, equalTo(true)); assertThat(pluginError.getException().getMessage(), equalTo(expectedErrorMessage)); + verify(dataPrepperConfiguration).getFailurePipelineName(); verify(dataPrepperConfiguration).getPipelineExtensions(); } @@ -455,6 +467,7 @@ void parseConfiguration_uses_CircuitBreaking_buffer_when_circuit_breakers_applie verify(dataPrepperConfiguration).getProcessorShutdownTimeout(); verify(dataPrepperConfiguration).getSinkShutdownTimeout(); verify(dataPrepperConfiguration).getPeerForwarderConfiguration(); + verify(dataPrepperConfiguration).getFailurePipelineName(); verify(dataPrepperConfiguration).getPipelineExtensions(); } @@ -476,6 +489,7 @@ void parseConfiguration_uses_unwrapped_buffer_when_circuit_breakers_applied_but_ verify(dataPrepperConfiguration).getProcessorShutdownTimeout(); verify(dataPrepperConfiguration).getSinkShutdownTimeout(); verify(dataPrepperConfiguration).getPeerForwarderConfiguration(); + verify(dataPrepperConfiguration).getFailurePipelineName(); verify(dataPrepperConfiguration).getPipelineExtensions(); } @@ -495,6 +509,7 @@ void parseConfiguration_uses_unwrapped_buffer_when_no_circuit_breakers_are_appli assertThat(pipeline.getBuffer(), notNullValue()); assertThat(pipeline.getBuffer(), CoreMatchers.not(instanceOf(CircuitBreakingBuffer.class))); + verify(dataPrepperConfiguration).getFailurePipelineName(); verify(dataPrepperConfiguration).getProcessorShutdownTimeout(); verify(dataPrepperConfiguration).getSinkShutdownTimeout(); verify(dataPrepperConfiguration).getPeerForwarderConfiguration(); @@ -525,6 +540,7 @@ void parseConfiguration_uses_unwrapped_buffer_for_pipeline_connectors() { verify(dataPrepperConfiguration, times(3)).getProcessorShutdownTimeout(); verify(dataPrepperConfiguration, times(3)).getSinkShutdownTimeout(); verify(dataPrepperConfiguration, times(3)).getPeerForwarderConfiguration(); + verify(dataPrepperConfiguration, times(3)).getFailurePipelineName(); verify(dataPrepperConfiguration).getPipelineExtensions(); } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineTests.java index 6faeed0f0f..64150bfd84 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineTests.java @@ -516,7 +516,7 @@ void publishToSinks_writes_Events_to_Sinks() { FutureHelper.awaitFuturesIndefinitely(futures); for (Sink sink : sinks) { - verify(sink).output(records); + verify(sink).output(records, null); } } @@ -568,7 +568,7 @@ void publishToSinks_writes_Events_to_Sinks_which_have_the_route() { FutureHelper.awaitFuturesIndefinitely(futures); - verify(routedSink).output(records); + verify(routedSink).output(records, null); for (Sink sink : unroutedSinks) { verify(sink, never()).output(records); diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSink.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSink.java index 1e2742a0ba..b215432f34 100644 --- a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSink.java +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSink.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.sink.Sink; import java.time.Duration; @@ -44,7 +45,7 @@ public TestSink(boolean failSinkForTest) { } @Override - public void output(Collection> records) { + public void output(Collection> records, PipelineIf failurePipeline) { if(failSinkForTest) { throw new RuntimeException("Sink is expected to fail"); } diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java index d981cf67ca..8da37d892c 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.sink.AbstractSink; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.sink.Sink; @@ -145,7 +146,7 @@ private void doInitializeInternal() { * @param records Records to be output */ @Override - public void doOutput(final Collection> records) { + public void doOutput(final Collection> records, final PipelineIf failurePipeline) { if (records.isEmpty()) { return; } diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java index cf45281cfd..c0c3a68075 100644 --- a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java @@ -207,7 +207,7 @@ public void testOutput_SuccessfulProcessing() throws Exception { lambdaCommonHandler.when(() -> LambdaCommonHandler.isSuccess(any(InvokeResponse.class))).thenReturn(true); - lambdaSink.doOutput(records); + lambdaSink.doOutput(records, null); verify(numberOfRecordsSuccessCounter, times(1)).increment(1.0); } @@ -281,7 +281,7 @@ public void testOutput_ExceptionDuringProcessing() throws Exception { lambdaCommonHandler.when(() -> LambdaCommonHandler.isSuccess(any(InvokeResponse.class))).thenReturn(true); - lambdaSink.doOutput(records); + lambdaSink.doOutput(records, null); verify(numberOfRecordsFailedCounter, times(1)).increment(1); } diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/FileSink.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/FileSink.java index ae9f52bbf9..98c12aae8d 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/FileSink.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/FileSink.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; @@ -64,7 +65,7 @@ public FileSink(final FileSinkConfig fileSinkConfig, final SinkContext sinkConte } @Override - public void output(final Collection> records) { + public void output(final Collection> records, final PipelineIf failurePipeline) { lock.lock(); try { if (isStopRequested) diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/NoopSink.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/NoopSink.java index 0ce0194b31..3e7c638d9a 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/NoopSink.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/NoopSink.java @@ -1,6 +1,7 @@ package org.opensearch.dataprepper.plugins.sink; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.Sink; @@ -14,7 +15,7 @@ public class NoopSink implements Sink> { private static final Logger LOG = LoggerFactory.getLogger(NoopSink.class); @Override - public void output(Collection> records) { + public void output(Collection> records, final PipelineIf failurePipeline) { LOG.info("Releasing events for NOOP sink"); for (Record record : records) { Event event = (Event)record.getData(); diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/StdOutSink.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/StdOutSink.java index 0396af3176..1590a627ae 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/StdOutSink.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/StdOutSink.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; @@ -43,7 +44,7 @@ public StdOutSink() { this.tagsTargetKey = null; } @Override - public void output(final Collection> records) { + public void output(final Collection> records, final PipelineIf failurePipeline) { for (final Record record : records) { checkTypeAndPrintObject(record.getData()); } diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/service/HttpSinkService.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/service/HttpSinkService.java index c392bf4ae0..69e7dfb75d 100644 --- a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/service/HttpSinkService.java +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/service/HttpSinkService.java @@ -16,6 +16,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; @@ -162,7 +163,7 @@ public HttpSinkService(final HttpSinkConfiguration httpSinkConfiguration, * This method process buffer records and send to Http End points based on configured codec * @param records Collection of Event */ - public void output(Collection> records) { + public void output(Collection> records, final PipelineIf failurePipeline) { reentrantLock.lock(); if (currentBuffer == null) { this.currentBuffer = bufferFactory.getBuffer(); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java index 97c93d22de..c3e68fd34c 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.plugin.PluginFactory; @@ -115,7 +116,7 @@ private void doInitializeInternal() { } @Override - public void doOutput(Collection> records) { + public void doOutput(Collection> records, final PipelineIf failurePipeline) { reentrantLock.lock(); if (records.isEmpty()) { return; diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkTest.java index 6d57c7803a..8eda8056ab 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkTest.java @@ -150,9 +150,9 @@ public void doOutputTest() { final Collection records = Arrays.asList(new Record(event)); final KafkaSink objectUnderTest = createObjectUnderTest(); - objectUnderTest.doOutput(records); + objectUnderTest.doOutput(records, null); - verify(objectUnderTest).doOutput(records); + verify(objectUnderTest).doOutput(records, null); } @@ -161,15 +161,15 @@ public void doOutputExceptionTest() { final Collection records = Arrays.asList(new Record(event)); when(executorService.submit(any(ProducerWorker.class))).thenThrow(new RuntimeException()); final KafkaSink objectUnderTest = createObjectUnderTest(); - assertThrows(RuntimeException.class, () -> objectUnderTest.doOutput(records)); + assertThrows(RuntimeException.class, () -> objectUnderTest.doOutput(records, null)); } @Test public void doOutputEmptyRecordsTest() { final Collection records = Arrays.asList(); final KafkaSink objectUnderTest = createObjectUnderTest(); - objectUnderTest.doOutput(records); - verify(objectUnderTest).doOutput(records); + objectUnderTest.doOutput(records, null); + verify(objectUnderTest).doOutput(records, null); } @@ -232,6 +232,6 @@ public void doOutputTestForAutoTopicCreate() { final Collection records = Arrays.asList(new Record(event)); final KafkaSink objectUnderTest = createObjectUnderTest(); - assertThrows(RuntimeException.class, () -> objectUnderTest.doOutput(records)); + assertThrows(RuntimeException.class, () -> objectUnderTest.doOutput(records, null)); } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 2248ba669a..cc5a43041c 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -26,6 +26,7 @@ import org.opensearch.dataprepper.expression.ExpressionEvaluationException; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.configuration.PluginModel; @@ -380,7 +381,7 @@ private BulkOperation getBulkOperationForAction(final String action, } @Override - public void doOutput(final Collection> records) { + public void doOutput(final Collection> records, final PipelineIf failurePipelineObj) { final long threadId = Thread.currentThread().getId(); if (!bulkRequestMap.containsKey(threadId)) { bulkRequestMap.put(threadId, bulkRequestSupplier.get()); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java index 31b77e0bf3..4e7214f16a 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java @@ -246,7 +246,7 @@ void doOutput_with_invalid_version_expression_catches_NumberFormatException_and_ .thenReturn(UUID.randomUUID().toString()); dlqObjectMockedStatic.when(DlqObject::builder).thenReturn(dlqObjectBuilder); - objectUnderTest.doOutput(List.of(eventRecord)); + objectUnderTest.doOutput(List.of(eventRecord), null); } final FailedDlqData failedDlqDataResult = failedDlqData.getValue(); @@ -350,7 +350,7 @@ void doOutput_with_invalid_version_expression_result_catches_RuntimeException_an .thenReturn(UUID.randomUUID().toString()); dlqObjectMockedStatic.when(DlqObject::builder).thenReturn(dlqObjectBuilder); - objectUnderTest.doOutput(List.of(eventRecord)); + objectUnderTest.doOutput(List.of(eventRecord), null); } final FailedDlqData failedDlqDataResult = failedDlqData.getValue(); diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java index a93e58875c..e667f5d161 100644 --- a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; @@ -74,7 +75,7 @@ public void doInitialize() { * @param records Records to be output */ @Override - public void doOutput(final Collection> records) { - personalizeSinkService.output(records); + public void doOutput(final Collection> records, final PipelineIf failurePipeline) { + personalizeSinkService.output(records, failurePipeline); } } \ No newline at end of file diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java index 80ea94bcf1..96279ca7e7 100644 --- a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java @@ -8,6 +8,7 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Timer; import org.opensearch.dataprepper.plugins.sink.personalize.configuration.PersonalizeSinkConfiguration; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; @@ -61,8 +62,8 @@ public PersonalizeSinkService(final PersonalizeSinkConfiguration personalizeSink /** * @param records received records and add into buffer. */ - void output(Collection> records) { + void output(Collection> records, final PipelineIf failurePipeline) { LOG.trace("{} records received", records.size()); return; } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/service/PrometheusSinkService.java b/data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/service/PrometheusSinkService.java index 10251041a3..09faac0a14 100644 --- a/data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/service/PrometheusSinkService.java +++ b/data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/service/PrometheusSinkService.java @@ -20,6 +20,7 @@ import org.apache.hc.core5.util.Timeout; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.metric.JacksonExponentialHistogram; @@ -149,7 +150,7 @@ public PrometheusSinkService(final PrometheusSinkConfiguration prometheusSinkCon * This method process buffer records and send to Http End points based on configured codec * @param records Collection of Event */ - public void output(final Collection> records) { + public void output(final Collection> records, final PipelineIf failurePipeline) { reentrantLock.lock(); try { records.forEach(record -> { @@ -451,4 +452,4 @@ private static void setMetricName(final String metricName, final List> records) { - s3SinkService.output(records); + public void doOutput(final Collection> records, final PipelineIf failurePipeline) { + s3SinkService.output(records, failurePipeline); } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java index 571a952f01..53d263e0f7 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer; @@ -100,7 +101,7 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, /** * @param records received records and add into buffer. */ - void output(Collection> records) { + void output(Collection> records, final PipelineIf failurePipeline) { // Don't acquire the lock if there's no work to be done if (records.isEmpty() && s3GroupManager.hasNoGroups()) { return; diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java index da29d71e17..d7ebb40568 100644 --- a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java +++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java @@ -14,6 +14,7 @@ import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.plugins.sink.sns.dlq.DlqPushHandler; import org.opensearch.dataprepper.plugins.sink.sns.dlq.SnsSinkFailedDlqData; import org.slf4j.Logger; @@ -115,7 +116,7 @@ public SnsSinkService(final SnsSinkConfig snsSinkConfig, /** * @param records received records and add into buffer. */ - void output(Collection> records) { + void output(Collection> records, final PipelineIf failurePipeline) { reentrantLock.lock(); try { for (Record record : records) {