Skip to content

Commit

Permalink
refactor: implement graceful shutdown for IndexerExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiupopescu199 committed Jan 17, 2025
1 parent 5dd444a commit 38bd076
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 86 deletions.
120 changes: 98 additions & 22 deletions crates/iota-data-ingestion-core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use iota_types::{
full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
};
use prometheus::Registry;
use tokio::sync::mpsc;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;

use crate::{
Expand All @@ -23,17 +23,31 @@ use crate::{

pub const MAX_CHECKPOINTS_IN_PROGRESS: usize = 10000;

#[derive(Debug, Clone)]
pub enum WorkerPoolMsg {
/// Send WorkerPool progress status to Executor main loop
Progress((String, u64)),
/// Signal WorkerPool graceful shutdown to Executor main loop
ShutDown(String),
}

pub struct IndexerExecutor<P> {
pools: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
pool_senders: Vec<mpsc::Sender<CheckpointData>>,
progress_store: ProgressStoreWrapper<P>,
pool_progress_sender: mpsc::Sender<(String, CheckpointSequenceNumber)>,
pool_progress_receiver: mpsc::Receiver<(String, CheckpointSequenceNumber)>,
pool_progress_sender: mpsc::Sender<WorkerPoolMsg>,
pool_progress_receiver: mpsc::Receiver<WorkerPoolMsg>,
metrics: DataIngestionMetrics,
token: CancellationToken,
}

impl<P: ProgressStore> IndexerExecutor<P> {
pub fn new(progress_store: P, number_of_jobs: usize, metrics: DataIngestionMetrics) -> Self {
pub fn new(
progress_store: P,
number_of_jobs: usize,
metrics: DataIngestionMetrics,
token: CancellationToken,
) -> Self {
let (pool_progress_sender, pool_progress_receiver) =
mpsc::channel(number_of_jobs * MAX_CHECKPOINTS_IN_PROGRESS);
Self {
Expand All @@ -43,6 +57,7 @@ impl<P: ProgressStore> IndexerExecutor<P> {
pool_progress_sender,
pool_progress_receiver,
metrics,
token,
}
}

Expand All @@ -54,6 +69,7 @@ impl<P: ProgressStore> IndexerExecutor<P> {
checkpoint_number,
receiver,
self.pool_progress_sender.clone(),
self.token.child_token(),
)));
self.pool_senders.push(sender);
Ok(())
Expand All @@ -66,41 +82,102 @@ impl<P: ProgressStore> IndexerExecutor<P> {
remote_store_url: Option<String>,
remote_store_options: Vec<(String, String)>,
reader_options: ReaderOptions,
token: CancellationToken,
) -> Result<ExecutorProgress> {
let mut reader_checkpoint_number = self.progress_store.min_watermark()?;
let (checkpoint_reader, mut checkpoint_recv, gc_sender, _exit_sender) =
let (checkpoint_reader, mut checkpoint_recv, gc_sender, exit_sender) =
CheckpointReader::initialize(
path,
reader_checkpoint_number,
remote_store_url,
remote_store_options,
reader_options,
);
spawn_monitored_task!(checkpoint_reader.run());

for pool in std::mem::take(&mut self.pools) {
spawn_monitored_task!(pool);
}
let checkpoint_reader_handle = spawn_monitored_task!(checkpoint_reader.run());

let worker_pools = std::mem::take(&mut self.pools)
.into_iter()
.map(|pool| spawn_monitored_task!(pool))
.collect::<Vec<JoinHandle<()>>>();

let mut worker_pools_shutdown_signals = vec![];

loop {
tokio::select! {
_ = token.cancelled() => break,
Some((task_name, sequence_number)) = self.pool_progress_receiver.recv() => {
self.progress_store.save(task_name.clone(), sequence_number).await?;
let seq_number = self.progress_store.min_watermark()?;
if seq_number > reader_checkpoint_number {
gc_sender.send(seq_number).await?;
reader_checkpoint_number = seq_number;
Some(worker_pool_progress_msg) = self.pool_progress_receiver.recv() => {
match worker_pool_progress_msg {
WorkerPoolMsg::Progress((task_name, sequence_number)) => {
self.progress_store.save(task_name.clone(), sequence_number).await?;
let seq_number = self.progress_store.min_watermark()?;
if seq_number > reader_checkpoint_number {
gc_sender.send(seq_number).await?;
reader_checkpoint_number = seq_number;
}
self.metrics.data_ingestion_checkpoint.with_label_values(&[&task_name]).set(sequence_number as i64);
}
// Manages the graceful shutdown sequence of the entire indexer system.
//
// The shutdown process follows these steps:
// 1. Token cancellation triggers:
// a. Individual workers in each pool:
// - Complete current checkpoint processing
// - Send final progress updates
// - Signal completion to their pool
// b. Worker pools:
// - Stop accepting new checkpoints
// - Process remaining progress messages
// - Wait for all their workers to finish
// - Send ShutDown message to executor
//
// 2. Executor main loop:
// - Continues processing Progress messages from pools
// - Tracks pool shutdowns via ShutDown messages
// - Once all pools report shutdown:
// a. Awaits all worker pool join handles
// b. Signals checkpoint reader to stop
// c. Awaits checkpoint reader completion
// d. Exits main loop
//
// This ensures hierarchical shutdown order:
// 1. Workers (in parallel within each pool)
// 2. Worker pools (in parallel)
// 3. Checkpoint reader
// 4. Executor main loop
//
// Guarantees:
// - No work is interrupted mid-processing
// - All progress is saved to storage
// - All messages are processed in order
// - All resources are properly cleaned up
WorkerPoolMsg::ShutDown(worker_pool_name) => {
// Track worker pools that have initiated shutdown
worker_pools_shutdown_signals.push(worker_pool_name);
// Once all workers pools have signaled completion, await their handles
// This ensures all workers have finished their final tasks
if worker_pools_shutdown_signals.len() == self.pool_senders.len() {
for worker in worker_pools {
// Await the Worker actor completion
worker.await?;
}
// Send shutdown signal to CheckpointReader Actor
_ = exit_sender.send(());
// Await the CheckpointReader actor completion
checkpoint_reader_handle.await??;
break;
}
}
}
self.metrics.data_ingestion_checkpoint.with_label_values(&[&task_name]).set(sequence_number as i64);
}
Some(checkpoint) = checkpoint_recv.recv() => {
// Only process new checkpoints while system is running (token not cancelled).
// The guard prevents accepting new work during shutdown while allowing existing work to complete for other branches.
Some(checkpoint) = checkpoint_recv.recv(), if !self.token.is_cancelled() => {
for sender in &self.pool_senders {
sender.send(checkpoint.clone()).await?;
}
}
}
}

Ok(self.progress_store.stats())
}
}
Expand All @@ -115,10 +192,10 @@ pub async fn setup_single_workflow<W: Worker + 'static>(
impl Future<Output = Result<ExecutorProgress>>,
CancellationToken,
)> {
let token = CancellationToken::new();
let metrics = DataIngestionMetrics::new(&Registry::new());
let progress_store = ShimProgressStore(initial_checkpoint_number);
let mut executor = IndexerExecutor::new(progress_store, 1, metrics);
let token = CancellationToken::new();
let mut executor = IndexerExecutor::new(progress_store, 1, metrics, token.child_token());
let worker_pool = WorkerPool::new(worker, "workflow".to_string(), concurrency);
executor.register(worker_pool).await?;
Ok((
Expand All @@ -127,7 +204,6 @@ pub async fn setup_single_workflow<W: Worker + 'static>(
Some(remote_store_url),
vec![],
reader_options.unwrap_or_default(),
token.child_token(),
),
token,
))
Expand Down
5 changes: 4 additions & 1 deletion crates/iota-data-ingestion-core/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use notify::{RecursiveMode, Watcher};
use object_store::{ObjectStore, path::Path};
use tap::pipe::Pipe;
use tokio::{
sync::{mpsc, mpsc::error::TryRecvError, oneshot},
sync::{
mpsc::{self, error::TryRecvError},
oneshot,
},
time::timeout,
};
use tracing::{debug, error, info};
Expand Down
27 changes: 15 additions & 12 deletions crates/iota-data-ingestion-core/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ async fn run(
indexer: IndexerExecutor<FileProgressStore>,
path: Option<PathBuf>,
duration: Option<Duration>,
token: CancellationToken,
) -> Result<ExecutorProgress> {
let options = ReaderOptions {
tick_interval_ms: 10,
Expand All @@ -52,24 +53,15 @@ async fn run(
match duration {
None => {
indexer
.run(
path.unwrap_or_else(temp_dir),
None,
vec![],
options,
CancellationToken::new(),
)
.run(path.unwrap_or_else(temp_dir), None, vec![], options)
.await
}
Some(duration) => {
let token = CancellationToken::new();
let token_child = token.child_token();
let handle = tokio::task::spawn(indexer.run(
path.unwrap_or_else(temp_dir),
None,
vec![],
options,
token_child,
));
tokio::time::sleep(duration).await;
token.cancel();
Expand All @@ -81,6 +73,7 @@ async fn run(
struct ExecutorBundle {
executor: IndexerExecutor<FileProgressStore>,
_progress_file: NamedTempFile,
token: CancellationToken,
}

#[derive(Clone)]
Expand All @@ -96,7 +89,7 @@ impl Worker for TestWorker {
#[tokio::test]
async fn empty_pools() {
let bundle = create_executor_bundle();
let result = run(bundle.executor, None, None).await;
let result = run(bundle.executor, None, None, bundle.token).await;
assert!(result.is_err());
if let Err(err) = result {
assert!(err.to_string().contains("pools can't be empty"));
Expand All @@ -114,7 +107,13 @@ async fn basic_flow() {
let bytes = mock_checkpoint_data_bytes(checkpoint_number);
std::fs::write(path.join(format!("{}.chk", checkpoint_number)), bytes).unwrap();
}
let result = run(bundle.executor, Some(path), Some(Duration::from_secs(1))).await;
let result = run(
bundle.executor,
Some(path),
Some(Duration::from_secs(1)),
bundle.token,
)
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap().get("test"), Some(&20));
}
Expand All @@ -130,14 +129,18 @@ fn create_executor_bundle() -> ExecutorBundle {
let path = progress_file.path().to_path_buf();
std::fs::write(path.clone(), "{}").unwrap();
let progress_store = FileProgressStore::new(path);
let token = CancellationToken::new();
let child_token = token.child_token();
let executor = IndexerExecutor::new(
progress_store,
1,
DataIngestionMetrics::new(&Registry::new()),
child_token,
);
ExecutorBundle {
executor,
_progress_file: progress_file,
token,
}
}

Expand Down
Loading

0 comments on commit 38bd076

Please sign in to comment.