diff --git a/agents/monitor/src/steps/producer.rs b/agents/monitor/src/steps/producer.rs index a693eb69..38855c36 100644 --- a/agents/monitor/src/steps/producer.rs +++ b/agents/monitor/src/steps/producer.rs @@ -11,6 +11,9 @@ use crate::{ RelayFaucet, Restartable, StepHandle, UpdateFaucet, }; +pub const POLLING_INTERVAL_SECS: u64 = 5; +pub const BEHIND_TIP: u64 = 5; + #[derive(Debug)] #[must_use = "Tasks do nothing unless you call .spawn() or .forever()"] pub(crate) struct DispatchProducer { @@ -64,8 +67,8 @@ impl ProcessStep for DispatchProducer { async move { let provider = self.home.client(); let height = provider.get_block_number().await.unwrap(); - let mut from = height - 10; - let mut to = height - 5; + let mut from = height - (2 * BEHIND_TIP); + let mut to = height - BEHIND_TIP; loop { if from < to { let res = self @@ -86,11 +89,11 @@ impl ProcessStep for DispatchProducer { let tip_res = provider.get_block_number().await; bail_task_if!(tip_res.is_err(), self, tip_res.unwrap_err()); - let tip = tip_res.unwrap() - 5; + let tip = tip_res.unwrap() - BEHIND_TIP; from = to; to = std::cmp::max(to, tip); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + tokio::time::sleep(std::time::Duration::from_secs(POLLING_INTERVAL_SECS)).await; } } .instrument(span), @@ -155,8 +158,8 @@ impl ProcessStep for UpdateProducer { bail_task_if!(height.is_err(), self, "Err retrieving height"); let height = height.expect("checked"); - let mut from = height - 10; - let mut to = height - 5; + let mut from = height - (2 * BEHIND_TIP); + let mut to = height - BEHIND_TIP; loop { if from < to { let res = self @@ -177,11 +180,11 @@ impl ProcessStep for UpdateProducer { let tip_res = provider.get_block_number().await; bail_task_if!(tip_res.is_err(), self, tip_res.unwrap_err()); - let tip = tip_res.expect("checked") - 5; + let tip = tip_res.expect("checked") - BEHIND_TIP; from = to; to = std::cmp::max(to, tip); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + tokio::time::sleep(std::time::Duration::from_secs(POLLING_INTERVAL_SECS)).await; } } .instrument(span), @@ -247,8 +250,8 @@ impl ProcessStep for RelayProducer { async move { let provider = self.replica.client(); let height = provider.get_block_number().await.unwrap(); - let mut from = height - 10; - let mut to = height - 5; + let mut from = height - (2 * BEHIND_TIP); + let mut to = height - BEHIND_TIP; loop { tracing::trace!(from = from.as_u64(), to = to.as_u64(), "produce_loop"); if from < to { @@ -269,11 +272,11 @@ impl ProcessStep for RelayProducer { } let tip_res = provider.get_block_number().await; bail_task_if!(tip_res.is_err(), self, tip_res.unwrap_err()); - let tip = tip_res.unwrap() - 5; + let tip = tip_res.unwrap() - BEHIND_TIP; from = to; to = std::cmp::max(to, tip); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + tokio::time::sleep(std::time::Duration::from_secs(POLLING_INTERVAL_SECS)).await; } } .instrument(span), @@ -339,8 +342,8 @@ impl ProcessStep for ProcessProducer { async move { let provider = self.replica.client(); let height = provider.get_block_number().await.unwrap(); - let mut from = height - 10; - let mut to = height - 5; + let mut from = height - (2 * BEHIND_TIP); + let mut to = height - BEHIND_TIP; loop { if from < to { let res = self @@ -361,11 +364,11 @@ impl ProcessStep for ProcessProducer { let tip_res = provider.get_block_number().await; bail_task_if!(tip_res.is_err(), self, tip_res.unwrap_err()); - let tip = tip_res.unwrap() - 5; + let tip = tip_res.unwrap() - BEHIND_TIP; from = to; to = std::cmp::max(to, tip); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + tokio::time::sleep(std::time::Duration::from_secs(POLLING_INTERVAL_SECS)).await; } } .instrument(span),