Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dlq pipeline qchea #5305

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model;

import org.opensearch.dataprepper.model.source.Source;

public interface PipelineIf {
Source getSource();
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.failures;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;

import java.util.Collection;
public interface FailurePipeline {
void sendFailedEvents(Collection<Record<Event>> events);
}


Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.PipelineIf;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
Expand Down Expand Up @@ -64,16 +65,16 @@ public void initialize() {
* @param records the records to write to the sink.
*/
@Override
public void output(Collection<T> records) {
public void output(Collection<T> records, final PipelineIf failurePipeline) {
recordsInCounter.increment(records.size()*1.0);
timeElapsedTimer.record(() -> doOutput(records));
timeElapsedTimer.record(() -> doOutput(records, failurePipeline));
}

/**
* This method should implement the output logic
* @param records Records to be output
*/
public abstract void doOutput(Collection<T> records);
public abstract void doOutput(Collection<T> records, final PipelineIf failurePipeline);

@Override
public void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.model.sink;

import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.PipelineIf;

import java.util.Collection;

Expand All @@ -20,7 +21,11 @@ public interface Sink<T extends Record<?>> {
*
* @param records the records to write to the sink.
*/
void output(Collection<T> records);
default void output(Collection<T> records) {
output(records, null);
}

void output(Collection<T> records, PipelineIf failurePipeline);

/**
* Prepare sink for shutdown, by cleaning up resources and threads.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
import org.opensearch.dataprepper.model.PipelineIf;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
Expand Down Expand Up @@ -149,7 +150,7 @@ private static class AbstractEventSinkImpl extends AbstractSink<Record<Event>> {
}

@Override
public void doOutput(Collection<Record<Event>> records) {
public void doOutput(Collection<Record<Event>> records, PipelineIf failurePipeline) {
for (final Record<Event> record: records) {
Event event = record.getData();
event.getEventHandle().release(true);
Expand Down Expand Up @@ -179,7 +180,7 @@ private static class AbstractSinkImpl extends AbstractSink<Record<String>> {
}

@Override
public void doOutput(Collection<Record<String>> records) {
public void doOutput(Collection<Record<String>> records, PipelineIf failurePipeline) {
try {
Thread.sleep(150);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -213,7 +214,7 @@ private static class AbstractSinkNotReadyImpl extends AbstractSink<Record<String
}

@Override
public void doOutput(Collection<Record<String>> records) {
public void doOutput(Collection<Record<String>> records, PipelineIf failurePipeline) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.model.sink;

import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.PipelineIf;
import org.junit.jupiter.api.Test;

import java.util.Collection;
Expand All @@ -28,7 +29,7 @@ public void initialize() {
}

@Override
public void output(Collection<Record<?>> records) {
public void output(Collection<Record<?>> records, PipelineIf failurePipeline) {
}

};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.PipelineIf;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.record.Record;
Expand All @@ -34,7 +35,7 @@ public InMemorySink(final InMemoryConfig inMemoryConfig,
}

@Override
public void output(final Collection<Record<Event>> records) {
public void output(final Collection<Record<Event>> records, final PipelineIf failurePipeline) {
inMemorySinkAccessor.addEvents(testingKey, records);
boolean result = inMemorySinkAccessor.getResult();
records.stream().forEach((record) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderProvider;
import org.opensearch.dataprepper.core.peerforwarder.PeerForwardingProcessorDecorator;
import org.opensearch.dataprepper.core.pipeline.Pipeline;
import org.opensearch.dataprepper.core.pipeline.FailurePipelineSource;
import org.opensearch.dataprepper.core.pipeline.PipelineConnector;
import org.opensearch.dataprepper.core.pipeline.router.Router;
import org.opensearch.dataprepper.core.pipeline.router.RouterFactory;
Expand Down Expand Up @@ -59,6 +60,7 @@ public class PipelineTransformer {
"See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax.";
private static final String PIPELINE_TYPE = "pipeline";
private static final String ATTRIBUTE_NAME = "name";
//private static final String FAILURE_PIPELINE_NAME = "dlq";
private final PipelinesDataFlowModel pipelinesDataFlowModel;
private final RouterFactory routerFactory;
private final DataPrepperConfiguration dataPrepperConfiguration;
Expand Down Expand Up @@ -128,24 +130,30 @@ private void buildPipelineFromConfiguration(
final Map<String, Pipeline> pipelineMap) {
final PipelineConfiguration pipelineConfiguration = pipelineConfigurationMap.get(pipelineName);
LOG.info("Building pipeline [{}] from provided configuration", pipelineName);
final String failurePipelineName = dataPrepperConfiguration.getFailurePipelineName();
try {
Source source;
final PluginSetting sourceSetting = pipelineConfiguration.getSourcePluginSetting();
final Optional<Source> pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting,
pipelineMap, pipelineConfigurationMap);
final Source source = pipelineSource.orElseGet(() -> {
try {
return pluginFactory.loadPlugin(Source.class, sourceSetting);
} catch (Exception e) {
final PluginError pluginError = PluginError.builder()
.componentType(PipelineModel.SOURCE_PLUGIN_TYPE)
.pipelineName(pipelineName)
.pluginName(sourceSetting.getName())
.exception(e)
.build();
pluginErrorCollector.collectPluginError(pluginError);
return null;
}
});
if (!pipelineName.equals(failurePipelineName)) {
final Optional<Source> pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting,
pipelineMap, pipelineConfigurationMap);
source = pipelineSource.orElseGet(() -> {
try {
return pluginFactory.loadPlugin(Source.class, sourceSetting);
} catch (Exception e) {
final PluginError pluginError = PluginError.builder()
.componentType(PipelineModel.SOURCE_PLUGIN_TYPE)
.pipelineName(pipelineName)
.pluginName(sourceSetting.getName())
.exception(e)
.build();
pluginErrorCollector.collectPluginError(pluginError);
return null;
}
});
} else {
source = new FailurePipelineSource();
}

LOG.info("Building buffer for the pipeline [{}]", pipelineName);
Buffer pipelineDefinedBuffer = null;
Expand Down Expand Up @@ -234,7 +242,14 @@ private void buildPipelineFromConfiguration(
"pipelines", pipelineName, ex);
processRemoveIfRequired(pipelineName, pipelineConfigurationMap, pipelineMap);
}

final Pipeline failurePipeline = pipelineMap.get(failurePipelineName);
if (failurePipeline != null) {
for (Map.Entry<String, Pipeline> pipelineEntry : pipelineMap.entrySet()) {
if (!(pipelineEntry.getKey().equals(failurePipelineName))) {
pipelineEntry.getValue().setFailurePipeline(failurePipeline);
}
}
}
}

private List<IdentifiedComponent<Processor>> newProcessor(final PluginSetting pluginSetting) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
public class DataPrepperConfiguration implements ExtensionsConfiguration, EventConfigurationContainer {
static final Duration DEFAULT_SHUTDOWN_DURATION = Duration.ofSeconds(30L);

static final String DEFAULT_FAILURE_PIPELINE_NAME = "dlq";
private static final String DEFAULT_SOURCE_COORDINATION_STORE = "in_memory";

static final int MAX_TAGS_NUMBER = 3;
Expand Down Expand Up @@ -213,6 +214,8 @@ public void setMetricTagFilters(final List<MetricTagFilter> metricTagFilters) {
}
}

public String getFailurePipelineName() { return DEFAULT_FAILURE_PIPELINE_NAME; }

public Duration getProcessorShutdownTimeout() {
return processorShutdownTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.core.pipeline;

import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.failures.FailurePipeline;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;

public class FailurePipelineSource implements Source<Record<Event>>, FailurePipeline {
private static final Logger LOG = LoggerFactory.getLogger(FailurePipelineSource.class);
private static final int DEFAULT_WRITE_TIMEOUT = Integer.MAX_VALUE;
private Buffer buffer;
private AtomicBoolean isStopRequested;

public FailurePipelineSource() {
isStopRequested = new AtomicBoolean(false);
}

@Override
public void start(Buffer buffer) {
this.buffer = buffer;
}

@Override
public void stop() {
isStopRequested.set(true);
}

@Override
public void sendFailedEvents(Collection<Record<Event>> records) {
try {
buffer.writeAll(records, DEFAULT_WRITE_TIMEOUT);
} catch (Exception e) {
LOG.error("Failed to write to failure pipeline");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.dataprepper.core.pipeline.router.RouterGetRecordStrategy;
import org.opensearch.dataprepper.core.sourcecoordination.SourceCoordinatorFactory;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.PipelineIf;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.processor.Processor;
Expand Down Expand Up @@ -52,7 +53,7 @@
* {@link Processor} and outputs the transformed (or original) data to {@link Sink}.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class Pipeline {
public class Pipeline implements PipelineIf {
private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
private static final int SINK_LOGGING_FREQUENCY = (int) Duration.ofSeconds(60).toMillis();
private final PipelineShutdown pipelineShutdown;
Expand All @@ -73,6 +74,7 @@ public class Pipeline {
private final ExecutorService processorExecutorService;
private final ExecutorService sinkExecutorService;
private final EventFactory eventFactory;
private Pipeline failurePipeline;
private final AcknowledgementSetManager acknowledgementSetManager;
private final List<PipelineObserver> observers = Collections.synchronizedList(new LinkedList<>());

Expand Down Expand Up @@ -119,6 +121,7 @@ public Pipeline(
this.name = name;
this.source = source;
this.buffer = buffer;
this.failurePipeline = null;
this.processorSets = processorSets;
this.sinks = sinks;
this.router = router;
Expand Down Expand Up @@ -165,6 +168,14 @@ public Buffer getBuffer() {
return this.buffer;
}

public void setFailurePipeline(Pipeline failurePipeline) {
this.failurePipeline = failurePipeline;
}

public Pipeline getFailurePipeline() {
return failurePipeline;
}

/**
* @return {@link Sink} of this pipeline.
*/
Expand Down Expand Up @@ -360,7 +371,7 @@ List<Future<Void>> publishToSinks(final Collection<Record> records) {
router.route(records, sinks, getRecordStrategy, (sink, events) ->
sinkFutures.add(sinkExecutorService.submit(() -> {
sink.updateLatencyMetrics(events);
sink.output(events);
sink.output(events, failurePipeline);
}, null))
);
return sinkFutures;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.PipelineIf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -73,7 +74,7 @@ public boolean isReady() {
}

@Override
public void output(final Collection<T> records) {
public void output(final Collection<T> records, final PipelineIf failurePipeline) {
if (buffer != null && !isStopRequested.get()) {
for (T record : records) {
while (true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ private void doRun() {
if (inputEvents != null) {
processAcknowledgements(inputEvents, Collections.emptyList());
}
if (pipeline.getFailurePipeline() != null) {
((FailurePipelineSource)(pipeline.getFailurePipeline().getSource())).sendFailedEvents(records);
}

records = Collections.emptyList();
break;
Expand Down
Loading
Loading