Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Swap out processors dynamically without interrupting the data flow #5327

Open
dinujoh opened this issue Jan 13, 2025 · 2 comments
Open

Swap out processors dynamically without interrupting the data flow #5327

dinujoh opened this issue Jan 13, 2025 · 2 comments
Labels
enhancement New feature or request

Comments

@dinujoh
Copy link
Member

dinujoh commented Jan 13, 2025

Is your feature request related to a problem? Please describe.
Currently, updating processors in a Data Prepper pipeline requires stopping and restarting the pipeline, which can lead to data loss or processing delays. We need a mechanism to update processor configurations or swap out processors dynamically without interrupting the data flow.

Describe the solution you'd like

Implement a system for hot-swapping processors and updating their configurations on-the-fly, ensuring continuous data processing without pipeline restarts.

  1. ProcessorRegistry
public class ProcessorRegistry {
    private volatile List<Processor> processors;

    public ProcessorRegistry(List<Processor> initialProcessors) {
        this.processors = new ArrayList<>(initialProcessors);
    }

    // Atomic swap of entire processor list
    public void swapProcessors(List<Processor> newProcessors) {
        Objects.requireNonNull(newProcessors, "New processors list cannot be null");
        this.processors = new ArrayList<>(newProcessors);
    }

    // Get current processors for execution
    public List<Processor> getProcessors() {
        return processors;
    }
}
  1. ProcessWorker class that uses the registry
public class ProcessWorker {
    private final ProcessorRegistry processorRegistry;
    private final Buffer readBuffer;
    // ... other fields

    private void doRun() {
        final Map.Entry<Collection, CheckpointState> readResult = readBuffer.read(pipeline.getReadBatchTimeoutInMillis());
        Collection records = readResult.getKey();
        final CheckpointState checkpointState = readResult.getValue();

        // Get current processor list from registry
        List<Processor> currentProcessors = processorRegistry.getProcessors();
        
        for (final Processor processor : currentProcessors) {
            List<Event> inputEvents = null;
            if (acknowledgementsEnabled) {
                inputEvents = ((List<Record<Event>>) records).stream()
                    .map(Record::getData)
                    .collect(Collectors.toList());
            }

            try {
                records = processor.execute(records);
                if (inputEvents != null) {
                    processAcknowledgements(inputEvents, records);
                }
            } catch (final Exception e) {
                LOG.error("Processor threw an exception. This batch of Events will be dropped.", e);
                if (inputEvents != null) {
                    processAcknowledgements(inputEvents, Collections.emptyList());
                }
                records = Collections.emptyList();
                break;
            }
        }

        postToSink(records);
        readBuffer.checkpoint(checkpointState);
    }
}
  1. Manager class to handle processor updates
public class PipelineManager {
    private final ProcessorRegistry processorRegistry;
    
    public void updateProcessors(List<Processor> newProcessors) {
        try {
            validateProcessors(newProcessors);
            processorRegistry.swapProcessors(newProcessors);
            LOG.info("Successfully updated processors");
        } catch (Exception e) {
            LOG.error("Failed to update processors", e);
            throw new ProcessorUpdateException("Failed to update processors", e);
        }
    }

    private void validateProcessors(List<Processor> processors) {
        if (processors == null || processors.isEmpty()) {
            throw new IllegalArgumentException("Processors list cannot be null or empty");
        }
        // Add any additional validation logic
    }
}

Describe alternatives you've considered (Optional)

  • Implementing a copy-on-write approach for the entire processor chain.
  • Using a message queue/buffering between processors to allow for more flexible updates.
@dlvenable
Copy link
Member

This looks like a good approach to me. I think one of the things we should also consider is how to receive the updates and then to know whether or not the change is only to processors. Also, do you have any thoughts on whether we could re-use the existing Processor instances if there was no change? It may reduce the memory overhead.

@kkondaka
Copy link
Collaborator

This issue is mentioned in the #5261, which considers all updates to data prepper config that can be done without restarting

@kkondaka kkondaka added enhancement New feature or request and removed untriaged labels Jan 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Development

No branches or pull requests

3 participants