diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index 39267e1347..2c2474b50d 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -263,10 +263,35 @@ impl UnprocessedTransactionStorage { } pub fn new_bundle_storage( - unprocessed_bundle_storage: VecDeque, - cost_model_failed_bundles: VecDeque, + unprocessed_bundle_storage: VecDeque<( + u8, /* attempts remaining */ + ImmutableDeserializedBundle, + )>, + cost_model_failed_bundles: VecDeque<( + u8, /* attempts remaining */ + ImmutableDeserializedBundle, + )>, + ) -> Self { + Self::new_bundle_storage_ttl( + unprocessed_bundle_storage, + cost_model_failed_bundles, + BundleStorage::MAX_BUNDLE_RETRY_ATTEMPTS, + ) + } + + pub fn new_bundle_storage_ttl( + unprocessed_bundle_storage: VecDeque<( + u8, /* attempts remaining */ + ImmutableDeserializedBundle, + )>, + cost_model_failed_bundles: VecDeque<( + u8, /* attempts remaining */ + ImmutableDeserializedBundle, + )>, + max_bundle_retry_attempts: u8, ) -> Self { Self::BundleStorage(BundleStorage { + max_bundle_retry_attempts, last_update_slot: Slot::default(), unprocessed_bundle_storage, cost_model_buffered_bundle_storage: cost_model_failed_bundles, @@ -444,7 +469,11 @@ impl UnprocessedTransactionStorage { ) -> bool where F: FnMut( - &[(ImmutableDeserializedBundle, SanitizedBundle)], + &[( + u8, /* attempts remaining */ + ImmutableDeserializedBundle, + SanitizedBundle, + )], &mut BundleStageLeaderMetrics, ) -> Vec>, { @@ -460,14 +489,21 @@ impl UnprocessedTransactionStorage { } } - /// Inserts bundles into storage. Only supported for UnprocessedTransactionStorage::BundleStorage + /// Inserts new bundles into storage. Only supported for [UnprocessedTransactionStorage::BundleStorage] pub(crate) fn insert_bundles( &mut self, deserialized_bundles: Vec, ) -> InsertPacketBundlesSummary { match self { UnprocessedTransactionStorage::BundleStorage(bundle_storage) => { - bundle_storage.insert_unprocessed_bundles(deserialized_bundles, true) + BundleStorage::insert_bundles( + &mut bundle_storage.unprocessed_bundle_storage, + deserialized_bundles + .into_iter() + .map(|bundle| (bundle_storage.max_bundle_retry_attempts, bundle)) + .collect_vec(), + true, + ) } UnprocessedTransactionStorage::LocalTransactionStorage(_) | UnprocessedTransactionStorage::VoteStorage(_) => { @@ -1072,14 +1108,28 @@ pub struct InsertPacketBundlesSummary { /// the cost model and need to get retried next slot. #[derive(Debug)] pub struct BundleStorage { + /// Maximum number of times a bundle may be reattempted before being dropped + max_bundle_retry_attempts: u8, last_update_slot: Slot, - unprocessed_bundle_storage: VecDeque, - // Storage for bundles that exceeded the cost model for the slot they were last attempted - // execution on - cost_model_buffered_bundle_storage: VecDeque, + + /// Stores newly received bundles, or rebuffered bundles that error during execution on + /// [BundleExecutionError::PohRecordError] or + /// [BundleExecutionError::BankProcessingTimeLimitReached] + unprocessed_bundle_storage: VecDeque<( + u8, /* attempts remaining */ + ImmutableDeserializedBundle, + )>, + /// Storage for bundles that exceeded the cost model for the slot they were last attempted + /// execution on + cost_model_buffered_bundle_storage: VecDeque<( + u8, /* attempts remaining */ + ImmutableDeserializedBundle, + )>, } impl BundleStorage { + /// Maximum number of times a bundle may be reattempted before being dropped + pub const MAX_BUNDLE_RETRY_ATTEMPTS: u8 = 3; fn is_empty(&self) -> bool { self.unprocessed_bundle_storage.is_empty() } @@ -1091,7 +1141,13 @@ impl BundleStorage { pub fn unprocessed_packets_len(&self) -> usize { self.unprocessed_bundle_storage .iter() - .map(|b| b.len()) + .map(|(attempts_remaining, bundle)| { + if *attempts_remaining == 0 { + 0 + } else { + bundle.len() + } + }) .sum::() } @@ -1102,7 +1158,13 @@ impl BundleStorage { pub(crate) fn cost_model_buffered_packets_len(&self) -> usize { self.cost_model_buffered_bundle_storage .iter() - .map(|b| b.len()) + .map(|(attempts_remaining, bundle)| { + if *attempts_remaining == 0 { + 0 + } else { + bundle.len() + } + }) .sum() } @@ -1123,9 +1185,16 @@ impl BundleStorage { (num_unprocessed_bundles, num_cost_model_buffered_bundles) } + /// Inserts new bundles into storage. fn insert_bundles( - deque: &mut VecDeque, - deserialized_bundles: Vec, + deque: &mut VecDeque<( + u8, /* attempts remaining */ + ImmutableDeserializedBundle, + )>, + deserialized_bundles: Vec<( + u8, /* attempts remaining */ + ImmutableDeserializedBundle, + )>, push_back: bool, ) -> InsertPacketBundlesSummary { let mut num_bundles_inserted: usize = 0; @@ -1133,7 +1202,7 @@ impl BundleStorage { let mut num_bundles_dropped: usize = 0; let mut num_packets_dropped: usize = 0; - for bundle in deserialized_bundles { + for (attempts_remaining, bundle) in deserialized_bundles { if deque.capacity() == deque.len() { saturating_add_assign!(num_bundles_dropped, 1); saturating_add_assign!(num_packets_dropped, bundle.len()); @@ -1141,9 +1210,9 @@ impl BundleStorage { saturating_add_assign!(num_bundles_inserted, 1); saturating_add_assign!(num_packets_inserted, bundle.len()); if push_back { - deque.push_back(bundle); + deque.push_back((attempts_remaining, bundle)); } else { - deque.push_front(bundle) + deque.push_front((attempts_remaining, bundle)); } } } @@ -1162,7 +1231,10 @@ impl BundleStorage { fn push_front_unprocessed_bundles( &mut self, - deserialized_bundles: Vec, + deserialized_bundles: Vec<( + u8, /* attempts remaining */ + ImmutableDeserializedBundle, + )>, ) -> InsertPacketBundlesSummary { Self::insert_bundles( &mut self.unprocessed_bundle_storage, @@ -1173,7 +1245,10 @@ impl BundleStorage { fn push_back_cost_model_buffered_bundles( &mut self, - deserialized_bundles: Vec, + deserialized_bundles: Vec<( + u8, /* attempts remaining */ + ImmutableDeserializedBundle, + )>, ) -> InsertPacketBundlesSummary { Self::insert_bundles( &mut self.cost_model_buffered_bundle_storage, @@ -1182,18 +1257,6 @@ impl BundleStorage { ) } - fn insert_unprocessed_bundles( - &mut self, - deserialized_bundles: Vec, - push_back: bool, - ) -> InsertPacketBundlesSummary { - Self::insert_bundles( - &mut self.unprocessed_bundle_storage, - deserialized_bundles, - push_back, - ) - } - /// Drains bundles from the queue, sanitizes them to prepare for execution, executes them by /// calling `processing_function`, then potentially rebuffer them. pub fn process_bundles( @@ -1205,7 +1268,11 @@ impl BundleStorage { ) -> bool where F: FnMut( - &[(ImmutableDeserializedBundle, SanitizedBundle)], + &[( + u8, /* attempts remaining */ + ImmutableDeserializedBundle, + SanitizedBundle, + )], &mut BundleStageLeaderMetrics, ) -> Vec>, { @@ -1221,13 +1288,15 @@ impl BundleStorage { let mut is_slot_over = false; - let mut rebuffered_bundles = Vec::new(); + let mut rebuffered_bundles = Vec::with_capacity(sanitized_bundles.len()); + let mut cost_model_rebuffered_bundles = Vec::with_capacity(sanitized_bundles.len()); sanitized_bundles .into_iter() .zip(bundle_execution_results) .for_each( - |((deserialized_bundle, sanitized_bundle), result)| match result { + |((attempts_remaining, deserialized_bundle, sanitized_bundle), result)| match result + { Ok(_) => { debug!("bundle={} executed ok", sanitized_bundle.bundle_id); // yippee @@ -1238,13 +1307,13 @@ impl BundleStorage { "bundle={} poh record error: {e:?}", sanitized_bundle.bundle_id ); - rebuffered_bundles.push(deserialized_bundle); + rebuffered_bundles.push((attempts_remaining, deserialized_bundle)); is_slot_over = true; } Err(BundleExecutionError::BankProcessingTimeLimitReached) => { // buffer the bundle to the front of the queue to be attempted next slot debug!("bundle={} bank processing done", sanitized_bundle.bundle_id); - rebuffered_bundles.push(deserialized_bundle); + rebuffered_bundles.push((attempts_remaining, deserialized_bundle)); is_slot_over = true; } Err(BundleExecutionError::TransactionFailure(e)) => { @@ -1257,7 +1326,8 @@ impl BundleStorage { Err(BundleExecutionError::ExceedsCostModel) => { // cost model buffered bundles contain most recent bundles at the front of the queue debug!("bundle={} exceeds cost model", sanitized_bundle.bundle_id); - self.push_back_cost_model_buffered_bundles(vec![deserialized_bundle]); + cost_model_rebuffered_bundles + .push((attempts_remaining, deserialized_bundle)); } Err(BundleExecutionError::TipError(e)) => { debug!("bundle={} tip error: {}", sanitized_bundle.bundle_id, e); @@ -1272,32 +1342,41 @@ impl BundleStorage { ); // rebuffered bundles are pushed onto deque in reverse order so the first bundle is at the front - for bundle in rebuffered_bundles.into_iter().rev() { - self.push_front_unprocessed_bundles(vec![bundle]); - } + self.push_front_unprocessed_bundles(rebuffered_bundles.into_iter().rev().collect_vec()); + self.push_back_cost_model_buffered_bundles(cost_model_rebuffered_bundles); is_slot_over } - /// Drains the unprocessed_bundle_storage, converting bundle packets into SanitizedBundles + /// Drains BundleStorage, converting bundle packets into [SanitizedBundle]. + /// Decrements attempts remaining, returning bundles that have attempts remaining fn drain_and_sanitize_bundles( &mut self, bank: Arc, bundle_stage_leader_metrics: &mut BundleStageLeaderMetrics, blacklisted_accounts: &HashSet, - ) -> Vec<(ImmutableDeserializedBundle, SanitizedBundle)> { + ) -> Vec<( + u8, /* attempts remaining */ + ImmutableDeserializedBundle, + SanitizedBundle, + )> { let mut error_metrics = TransactionErrorMetrics::default(); let start = Instant::now(); - let mut sanitized_bundles = Vec::new(); + let mut sanitized_bundles = Vec::with_capacity( + self.cost_model_buffered_bundle_storage + .len() + .saturating_add(self.unprocessed_bundle_storage.len()), + ); // on new slot, drain anything that was buffered from last slot if bank.slot() != self.last_update_slot { sanitized_bundles.extend( self.cost_model_buffered_bundle_storage .drain(..) - .filter_map(|packet_bundle| { + .filter(|(attempts_remaining, _packet_bundle)| *attempts_remaining > 0) + .filter_map(|(attempts_remaining, packet_bundle)| { let r = packet_bundle.build_sanitized_bundle( &bank, blacklisted_accounts, @@ -1308,12 +1387,13 @@ impl BundleStorage { .increment_sanitize_transaction_result(&r); match r { - Ok(sanitized_bundle) => Some((packet_bundle, sanitized_bundle)), + Ok(sanitized_bundle) => { + Some((attempts_remaining - 1, packet_bundle, sanitized_bundle)) + } Err(e) => { debug!( - "bundle id: {} error sanitizing: {}", + "bundle id: {} error sanitizing: {e}", packet_bundle.bundle_id(), - e ); None } @@ -1324,37 +1404,41 @@ impl BundleStorage { self.last_update_slot = bank.slot(); } - sanitized_bundles.extend(self.unprocessed_bundle_storage.drain(..).filter_map( - |packet_bundle| { - let r = packet_bundle.build_sanitized_bundle( - &bank, - blacklisted_accounts, - &mut error_metrics, - ); - bundle_stage_leader_metrics - .bundle_stage_metrics_tracker() - .increment_sanitize_transaction_result(&r); - match r { - Ok(sanitized_bundle) => Some((packet_bundle, sanitized_bundle)), - Err(e) => { - debug!( - "bundle id: {} error sanitizing: {}", - packet_bundle.bundle_id(), - e - ); - None + sanitized_bundles.extend( + self.unprocessed_bundle_storage + .drain(..) + .filter(|(attempts_remaining, _packet_bundle)| *attempts_remaining > 0) + .filter_map(|(attempts_remaining, packet_bundle)| { + let r = packet_bundle.build_sanitized_bundle( + &bank, + blacklisted_accounts, + &mut error_metrics, + ); + bundle_stage_leader_metrics + .bundle_stage_metrics_tracker() + .increment_sanitize_transaction_result(&r); + match r { + Ok(sanitized_bundle) => { + Some((attempts_remaining - 1, packet_bundle, sanitized_bundle)) + } + Err(e) => { + debug!( + "bundle id: {} error sanitizing: {e}", + packet_bundle.bundle_id(), + ); + None + } } - } - }, - )); + }), + ); - let elapsed = start.elapsed().as_micros(); + let elapsed = start.elapsed().as_micros() as u64; bundle_stage_leader_metrics .bundle_stage_metrics_tracker() - .increment_sanitize_bundle_elapsed_us(elapsed as u64); + .increment_sanitize_bundle_elapsed_us(elapsed); bundle_stage_leader_metrics .leader_slot_metrics_tracker() - .increment_transactions_from_packets_us(elapsed as u64); + .increment_transactions_from_packets_us(elapsed); bundle_stage_leader_metrics .leader_slot_metrics_tracker() @@ -1362,6 +1446,16 @@ impl BundleStorage { sanitized_bundles } + + #[cfg(test)] + pub fn get_unprocessed_bundle_storage_len(&self) -> usize { + self.unprocessed_bundle_storage.len() + } + + #[cfg(test)] + pub fn get_cost_model_buffered_bundle_storage_len(&self) -> usize { + self.cost_model_buffered_bundle_storage.len() + } } #[cfg(test)] diff --git a/core/src/bundle_stage.rs b/core/src/bundle_stage.rs index 3a4103831b..1b48598977 100644 --- a/core/src/bundle_stage.rs +++ b/core/src/bundle_stage.rs @@ -100,24 +100,24 @@ impl BundleStageLoopMetrics { .fetch_add(count, Ordering::Relaxed); } - pub fn increment_current_buffered_bundles_count(&mut self, count: u64) { + pub fn set_current_buffered_bundles_count(&mut self, count: u64) { self.current_buffered_bundles_count - .fetch_add(count, Ordering::Relaxed); + .store(count, Ordering::Relaxed); } - pub fn increment_current_buffered_packets_count(&mut self, count: u64) { + pub fn set_current_buffered_packets_count(&mut self, count: u64) { self.current_buffered_packets_count - .fetch_add(count, Ordering::Relaxed); + .store(count, Ordering::Relaxed); } - pub fn increment_cost_model_buffered_bundles_count(&mut self, count: u64) { + pub fn set_cost_model_buffered_bundles_count(&mut self, count: u64) { self.cost_model_buffered_bundles_count - .fetch_add(count, Ordering::Relaxed); + .store(count, Ordering::Relaxed); } - pub fn increment_cost_model_buffered_packets_count(&mut self, count: u64) { + pub fn set_cost_model_buffered_packets_count(&mut self, count: u64) { self.cost_model_buffered_packets_count - .fetch_add(count, Ordering::Relaxed); + .store(count, Ordering::Relaxed); } pub fn increment_num_bundles_dropped(&mut self, count: u64) { @@ -265,7 +265,7 @@ impl BundleStage { ); let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); - let unprocessed_bundle_storage = UnprocessedTransactionStorage::new_bundle_storage( + let mut unprocessed_bundle_storage = UnprocessedTransactionStorage::new_bundle_storage( VecDeque::with_capacity(1_000), VecDeque::with_capacity(1_000), ); @@ -285,7 +285,7 @@ impl BundleStage { reserved_ticks, ); - let consumer = BundleConsumer::new( + let mut consumer = BundleConsumer::new( committer, poh_recorder.read().unwrap().new_recorder(), QosService::new(BUNDLE_STAGE_ID), @@ -301,13 +301,13 @@ impl BundleStage { let bundle_thread = Builder::new() .name("solBundleStgTx".to_string()) .spawn(move || { - Self::process_loop( + Self::process_bundles_loop( &mut bundle_receiver, - decision_maker, - consumer, + &decision_maker, + &mut consumer, BUNDLE_STAGE_ID, - unprocessed_bundle_storage, - exit, + &mut unprocessed_bundle_storage, + &exit, ); }) .unwrap(); @@ -315,14 +315,14 @@ impl BundleStage { Self { bundle_thread } } - #[allow(clippy::too_many_arguments)] - fn process_loop( + /// Reads bundles off `bundle_receiver`, buffering in [UnprocessedTransactionStorage::BundleStorage] + fn process_bundles_loop( bundle_receiver: &mut BundleReceiver, - decision_maker: DecisionMaker, - mut consumer: BundleConsumer, + decision_maker: &DecisionMaker, + consumer: &mut BundleConsumer, id: u32, - mut unprocessed_bundle_storage: UnprocessedTransactionStorage, - exit: Arc, + unprocessed_bundle_storage: &mut UnprocessedTransactionStorage, + exit: &Arc, ) { let mut last_metrics_update = Instant::now(); @@ -330,51 +330,72 @@ impl BundleStage { let mut bundle_stage_leader_metrics = BundleStageLeaderMetrics::new(id); while !exit.load(Ordering::Relaxed) { - if !unprocessed_bundle_storage.is_empty() - || last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD - { - let (_, process_buffered_packets_time) = measure!( - Self::process_buffered_bundles( - &decision_maker, - &mut consumer, - &mut unprocessed_bundle_storage, - &mut bundle_stage_leader_metrics, - ), - "process_buffered_packets", - ); - bundle_stage_leader_metrics - .leader_slot_metrics_tracker() - .increment_process_buffered_packets_us(process_buffered_packets_time.as_us()); - last_metrics_update = Instant::now(); - } - - match bundle_receiver.receive_and_buffer_bundles( - &mut unprocessed_bundle_storage, + if let Err(e) = Self::process_bundles( + bundle_receiver, + decision_maker, + consumer, + unprocessed_bundle_storage, + &mut last_metrics_update, &mut bundle_stage_metrics, &mut bundle_stage_leader_metrics, ) { - Ok(_) | Err(RecvTimeoutError::Timeout) => (), - Err(RecvTimeoutError::Disconnected) => break, + error!("Bundle stage error: {e:?}"); + break; } + } + } - let bundle_storage = unprocessed_bundle_storage.bundle_storage().unwrap(); - bundle_stage_metrics.increment_current_buffered_bundles_count( - bundle_storage.unprocessed_bundles_len() as u64, - ); - bundle_stage_metrics.increment_current_buffered_packets_count( - bundle_storage.unprocessed_packets_len() as u64, - ); - bundle_stage_metrics.increment_cost_model_buffered_bundles_count( - bundle_storage.cost_model_buffered_bundles_len() as u64, - ); - bundle_stage_metrics.increment_cost_model_buffered_packets_count( - bundle_storage.cost_model_buffered_packets_len() as u64, + fn process_bundles( + bundle_receiver: &mut BundleReceiver, + decision_maker: &DecisionMaker, + consumer: &mut BundleConsumer, + unprocessed_bundle_storage: &mut UnprocessedTransactionStorage, + last_metrics_update: &mut Instant, + bundle_stage_metrics: &mut BundleStageLoopMetrics, + bundle_stage_leader_metrics: &mut BundleStageLeaderMetrics, + ) -> Result<(), RecvTimeoutError> { + if !unprocessed_bundle_storage.is_empty() + || last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD + { + let (_, process_buffered_packets_time) = measure!( + Self::process_buffered_bundles( + decision_maker, + consumer, + unprocessed_bundle_storage, + bundle_stage_leader_metrics, + ), + "process_buffered_packets", ); - bundle_stage_metrics.maybe_report(1_000); + bundle_stage_leader_metrics + .leader_slot_metrics_tracker() + .increment_process_buffered_packets_us(process_buffered_packets_time.as_us()); + *last_metrics_update = Instant::now(); + } + + if let Err(RecvTimeoutError::Disconnected) = bundle_receiver.receive_and_buffer_bundles( + unprocessed_bundle_storage, + bundle_stage_metrics, + bundle_stage_leader_metrics, + ) { + return Err(RecvTimeoutError::Disconnected); } + + let bundle_storage = unprocessed_bundle_storage.bundle_storage().unwrap(); + bundle_stage_metrics + .set_current_buffered_bundles_count(bundle_storage.unprocessed_bundles_len() as u64); + bundle_stage_metrics + .set_current_buffered_packets_count(bundle_storage.unprocessed_packets_len() as u64); + bundle_stage_metrics.set_cost_model_buffered_bundles_count( + bundle_storage.cost_model_buffered_bundles_len() as u64, + ); + bundle_stage_metrics.set_cost_model_buffered_packets_count( + bundle_storage.cost_model_buffered_packets_len() as u64, + ); + bundle_stage_metrics.maybe_report(1_000); + + Ok(()) } - #[allow(clippy::too_many_arguments)] fn process_buffered_bundles( decision_maker: &DecisionMaker, consumer: &mut BundleConsumer, @@ -434,3 +455,419 @@ impl BundleStage { } } } + +#[cfg(test)] +mod tests { + use { + super::*, + crossbeam_channel::unbounded, + itertools::Itertools, + serial_test::serial, + solana_gossip::cluster_info::Node, + solana_perf::packet::PacketBatch, + solana_poh::poh_recorder::create_test_recorder, + solana_runtime::{bank::Bank, bank_forks::BankForks}, + solana_sdk::{ + packet::Packet, + poh_config::PohConfig, + pubkey::Pubkey, + signature::{Keypair, Signer}, + system_instruction::{self, MAX_PERMITTED_DATA_LENGTH}, + system_transaction, + transaction::Transaction, + }, + solana_streamer::socket::SocketAddrSpace, + }; + + pub(crate) fn new_test_cluster_info(keypair: Option>) -> (Node, ClusterInfo) { + let keypair = keypair.unwrap_or_else(|| Arc::new(Keypair::new())); + let node = Node::new_localhost_with_pubkey(&keypair.pubkey()); + let cluster_info = + ClusterInfo::new(node.info.clone(), keypair, SocketAddrSpace::Unspecified); + (node, cluster_info) + } + + #[test] + #[serial] + fn test_basic_bundle() { + solana_logger::setup(); + const BUNDLE_STAGE_ID: u32 = 10_000; + let bundle_consumer::tests::TestFixture { + genesis_config_info, + leader_keypair, + bank, + blockstore: _, + exit: _, + poh_recorder, + poh_simulator: _, + entry_receiver: _entry_receiver, + } = bundle_consumer::tests::create_test_fixture(10_000_000); + let tip_manager = + bundle_consumer::tests::get_tip_manager(&genesis_config_info.voting_keypair.pubkey()); + let mint_keypair = genesis_config_info.mint_keypair; + let bank_forks = BankForks::new_from_banks(&[bank.clone()], bank.slot()); + let (_, cluster_info) = new_test_cluster_info(Some(Arc::new(leader_keypair))); + let cluster_info = Arc::new(cluster_info); + let (replay_vote_sender, _replay_vote_receiver) = unbounded(); + let (bundle_sender, bundle_receiver) = unbounded(); + let mut bundle_receiver = + BundleReceiver::new(BUNDLE_STAGE_ID, bundle_receiver, bank_forks, Some(5)); + // Queue the bundles + let recent_blockhash = genesis_config_info.genesis_config.hash(); + let kp = Keypair::new(); + let txn0 = + system_transaction::transfer(&mint_keypair, &kp.pubkey(), 1_000_000, recent_blockhash); + let txn1 = + system_transaction::transfer(&mint_keypair, &kp.pubkey(), 2_000_000, recent_blockhash); + + bundle_sender + .send(vec![PacketBundle { + batch: PacketBatch::new(vec![ + Packet::from_data(None, txn0).unwrap(), + Packet::from_data(None, txn1).unwrap(), + ]), + bundle_id: String::default(), + }]) + .unwrap(); + let committer = Committer::new( + None, + replay_vote_sender, + Arc::new(PrioritizationFeeCache::default()), + ); + + let mut unprocessed_transaction_storage = UnprocessedTransactionStorage::new_bundle_storage( + VecDeque::with_capacity(1_000), + VecDeque::with_capacity(1_000), + ); + + let reserved_ticks = poh_recorder + .read() + .unwrap() + .ticks_per_slot() + .saturating_mul(8) + .saturating_div(10); + + // The first 80% of the block, based on poh ticks, has `preallocated_bundle_cost` less compute units. + // The last 20% has has full compute so blockspace is maximized if BundleStage is idle. + let reserved_space = + BundleReservedSpaceManager::new(MAX_BLOCK_UNITS, 3_000_000, reserved_ticks); + + let mut consumer = BundleConsumer::new( + committer, + poh_recorder.read().unwrap().new_recorder(), + QosService::new(BUNDLE_STAGE_ID), + None, + tip_manager, + BundleAccountLocker::default(), + Arc::new(Mutex::new(BlockBuilderFeeInfo { + block_builder: cluster_info.keypair().pubkey(), + block_builder_commission: 0, + })), + MAX_BUNDLE_RETRY_DURATION, + cluster_info.clone(), + reserved_space, + ); + + // sanity check + assert_eq!(bank.get_balance(&kp.pubkey()), 0); + assert_eq!( + unprocessed_transaction_storage + .bundle_storage() + .unwrap() + .get_unprocessed_bundle_storage_len(), + 0 + ); + assert_eq!( + unprocessed_transaction_storage + .bundle_storage() + .unwrap() + .get_cost_model_buffered_bundle_storage_len(), + 0 + ); + + let mut last_metrics_update = Instant::now(); + let mut bundle_stage_metrics = BundleStageLoopMetrics::new(BUNDLE_STAGE_ID); + let mut bundle_stage_leader_metrics = BundleStageLeaderMetrics::new(BUNDLE_STAGE_ID); + + // first run, just buffers off receiver, does not process them + BundleStage::process_bundles( + &mut bundle_receiver, + &DecisionMaker::new(cluster_info.id(), poh_recorder.clone()), + &mut consumer, + &mut unprocessed_transaction_storage, + &mut last_metrics_update, + &mut bundle_stage_metrics, + &mut bundle_stage_leader_metrics, + ) + .unwrap(); + + // now process + BundleStage::process_bundles( + &mut bundle_receiver, + &DecisionMaker::new(cluster_info.id(), poh_recorder.clone()), + &mut consumer, + &mut unprocessed_transaction_storage, + &mut last_metrics_update, + &mut bundle_stage_metrics, + &mut bundle_stage_leader_metrics, + ) + .unwrap(); + + assert_eq!(bank.get_balance(&kp.pubkey()), 3_000_000); + assert_eq!( + unprocessed_transaction_storage + .bundle_storage() + .unwrap() + .get_unprocessed_bundle_storage_len(), + 0 + ); + assert_eq!( + unprocessed_transaction_storage + .bundle_storage() + .unwrap() + .get_cost_model_buffered_bundle_storage_len(), + 0 + ) + } + + #[test] + #[serial] + fn test_rebuffer_exceed_max_attempts() { + solana_logger::setup(); + const BUNDLE_STAGE_ID: u32 = 10_000; + const BUNDLE_RETRY_ATTEMPTS: u8 = 2; + + let bundle_consumer::tests::TestFixture { + genesis_config_info, + leader_keypair, + bank, + blockstore, + exit: _, + poh_recorder, + poh_simulator: _, + entry_receiver: _entry_receiver, + } = bundle_consumer::tests::create_test_fixture(10_000_000); + let tip_manager = + bundle_consumer::tests::get_tip_manager(&genesis_config_info.voting_keypair.pubkey()); + let mint_keypair = genesis_config_info.mint_keypair; + let bank_forks = BankForks::new_from_banks(&[bank.clone()], bank.slot()); + let (_, cluster_info) = new_test_cluster_info(Some(Arc::new(leader_keypair))); + let cluster_info = Arc::new(cluster_info); + let (replay_vote_sender, _replay_vote_receiver) = unbounded(); + let (bundle_sender, bundle_receiver) = unbounded(); + let mut bundle_receiver = + BundleReceiver::new(BUNDLE_STAGE_ID, bundle_receiver, bank_forks, Some(5)); + let rent = genesis_config_info + .genesis_config + .rent + .minimum_balance(MAX_PERMITTED_DATA_LENGTH as usize); + let test_keypairs = (0..10).map(|_| Keypair::new()).collect_vec(); + let ixs = test_keypairs + .iter() + .map(|kp| { + system_instruction::create_account( + &mint_keypair.pubkey(), + &kp.pubkey(), + rent, + MAX_PERMITTED_DATA_LENGTH, + &solana_sdk::system_program::ID, + ) + }) + .collect_vec(); + let recent_blockhash = bank.last_blockhash(); + let txn0 = Transaction::new_signed_with_payer( + ixs.iter().take(5).cloned().collect_vec().as_slice(), + Some(&mint_keypair.pubkey()), + std::iter::once(&mint_keypair) + .chain(test_keypairs.iter().take(5)) + .collect_vec() + .as_slice(), + recent_blockhash, + ); + let txn1 = Transaction::new_signed_with_payer( + ixs.iter().skip(5).cloned().collect_vec().as_slice(), + Some(&mint_keypair.pubkey()), + std::iter::once(&mint_keypair) + .chain(test_keypairs.iter().skip(5)) + .collect_vec() + .as_slice(), + recent_blockhash, + ); + bundle_sender + .send(vec![PacketBundle { + batch: PacketBatch::new(vec![ + Packet::from_data(None, txn0).unwrap(), + Packet::from_data(None, txn1).unwrap(), + ]), + bundle_id: String::default(), + }]) + .unwrap(); + + let committer = Committer::new( + None, + replay_vote_sender, + Arc::new(PrioritizationFeeCache::default()), + ); + + let mut unprocessed_transaction_storage = + UnprocessedTransactionStorage::new_bundle_storage_ttl( + VecDeque::with_capacity(1_000), + VecDeque::with_capacity(1_000), + BUNDLE_RETRY_ATTEMPTS, + ); + + let reserved_ticks = poh_recorder + .read() + .unwrap() + .ticks_per_slot() + .saturating_mul(8) + .saturating_div(10); + + // The first 80% of the block, based on poh ticks, has `preallocated_bundle_cost` less compute units. + // The last 20% has has full compute so blockspace is maximized if BundleStage is idle. + let reserved_space = + BundleReservedSpaceManager::new(MAX_BLOCK_UNITS, 3_000_000, reserved_ticks); + + let mut consumer = BundleConsumer::new( + committer, + poh_recorder.read().unwrap().new_recorder(), + QosService::new(BUNDLE_STAGE_ID), + None, + tip_manager, + BundleAccountLocker::default(), + Arc::new(Mutex::new(BlockBuilderFeeInfo { + block_builder: cluster_info.keypair().pubkey(), + block_builder_commission: 0, + })), + MAX_BUNDLE_RETRY_DURATION, + cluster_info.clone(), + reserved_space, + ); + + // sanity check + assert!(test_keypairs + .iter() + .all(|kp| bank.get_balance(&kp.pubkey()) == 0)); + assert_eq!( + unprocessed_transaction_storage + .bundle_storage() + .unwrap() + .get_unprocessed_bundle_storage_len(), + 0 + ); + assert_eq!( + unprocessed_transaction_storage + .bundle_storage() + .unwrap() + .get_cost_model_buffered_bundle_storage_len(), + 0 + ); + + let mut last_metrics_update = Instant::now(); + let mut bundle_stage_metrics = BundleStageLoopMetrics::new(BUNDLE_STAGE_ID); + let mut bundle_stage_leader_metrics = BundleStageLeaderMetrics::new(BUNDLE_STAGE_ID); + + // first run, just buffers off receiver, does not process them + BundleStage::process_bundles( + &mut bundle_receiver, + &DecisionMaker::new(cluster_info.id(), poh_recorder.clone()), + &mut consumer, + &mut unprocessed_transaction_storage, + &mut last_metrics_update, + &mut bundle_stage_metrics, + &mut bundle_stage_leader_metrics, + ) + .unwrap(); + assert!(test_keypairs + .iter() + .all(|kp| bank.get_balance(&kp.pubkey()) == 0)); + assert_eq!( + unprocessed_transaction_storage + .bundle_storage() + .unwrap() + .get_unprocessed_bundle_storage_len(), + 1 + ); + assert_eq!( + unprocessed_transaction_storage + .bundle_storage() + .unwrap() + .get_cost_model_buffered_bundle_storage_len(), + 0 + ); + + let mut curr_bank = bank; + // retry until reached, but not exceeding max attempts + (1..BUNDLE_RETRY_ATTEMPTS + 1).for_each(|_i| { + // advance the slot so we can evaluate cost_model_buffered_bundle_storage + let new_bank = Arc::new(Bank::new_from_parent( + curr_bank.clone(), + &Pubkey::default(), + curr_bank.slot() + 1, + )); + let (_exit, poh_recorder, _poh_simulator, _entry_receiver) = create_test_recorder( + new_bank.clone(), + blockstore.clone(), + Some(PohConfig::default()), + None, + ); + curr_bank = new_bank.clone(); + BundleStage::process_bundles( + &mut bundle_receiver, + &DecisionMaker::new(cluster_info.id(), poh_recorder), + &mut consumer, + &mut unprocessed_transaction_storage, + &mut last_metrics_update, + &mut bundle_stage_metrics, + &mut bundle_stage_leader_metrics, + ) + .unwrap(); + assert!(test_keypairs + .iter() + .all(|kp| new_bank.get_balance(&kp.pubkey()) == 0)); + assert_eq!( + unprocessed_transaction_storage + .bundle_storage() + .unwrap() + .get_unprocessed_bundle_storage_len(), + 0 + ); + assert_eq!( + unprocessed_transaction_storage + .bundle_storage() + .unwrap() + .get_cost_model_buffered_bundle_storage_len(), + 1 + ); + }); + + // exceed attempt limit, bundle should be removed from buffers + BundleStage::process_bundles( + &mut bundle_receiver, + &DecisionMaker::new(cluster_info.id(), poh_recorder), + &mut consumer, + &mut unprocessed_transaction_storage, + &mut last_metrics_update, + &mut bundle_stage_metrics, + &mut bundle_stage_leader_metrics, + ) + .unwrap(); + assert!(test_keypairs + .iter() + .all(|kp| curr_bank.get_balance(&kp.pubkey()) == 0)); + assert_eq!( + unprocessed_transaction_storage + .bundle_storage() + .unwrap() + .get_unprocessed_bundle_storage_len(), + 0 + ); + assert_eq!( + unprocessed_transaction_storage + .bundle_storage() + .unwrap() + .get_cost_model_buffered_bundle_storage_len(), + 0 + ); + } +} diff --git a/core/src/bundle_stage/bundle_consumer.rs b/core/src/bundle_stage/bundle_consumer.rs index ef4e480abe..c101097447 100644 --- a/core/src/bundle_stage/bundle_consumer.rs +++ b/core/src/bundle_stage/bundle_consumer.rs @@ -200,7 +200,11 @@ impl BundleConsumer { log_messages_bytes_limit: &Option, max_bundle_retry_duration: Duration, reserved_space: &BundleReservedSpaceManager, - bundles: &[(ImmutableDeserializedBundle, SanitizedBundle)], + bundles: &[( + u8, /* attempts remaining */ + ImmutableDeserializedBundle, + SanitizedBundle, + )], bank_start: &BankStart, bundle_stage_leader_metrics: &mut BundleStageLeaderMetrics, ) -> Vec> { @@ -212,7 +216,7 @@ impl BundleConsumer { let (locked_bundle_results, locked_bundles_elapsed) = measure!( bundles .iter() - .map(|(_, sanitized_bundle)| { + .map(|(_, _, sanitized_bundle)| { bundle_account_locker .prepare_locked_bundle(sanitized_bundle, &bank_start.working_bank) }) @@ -778,7 +782,7 @@ impl BundleConsumer { } #[cfg(test)] -mod tests { +pub mod tests { use { crate::{ bundle_stage::{ @@ -843,14 +847,15 @@ mod tests { }, }; - struct TestFixture { - genesis_config_info: GenesisConfigInfo, - leader_keypair: Keypair, - bank: Arc, - exit: Arc, - poh_recorder: Arc>, - poh_simulator: JoinHandle<()>, - entry_receiver: Receiver, + pub struct TestFixture { + pub genesis_config_info: GenesisConfigInfo, + pub leader_keypair: Keypair, + pub bank: Arc, + pub blockstore: Arc, + pub exit: Arc, + pub poh_recorder: Arc>, + pub poh_simulator: JoinHandle<()>, + pub entry_receiver: Receiver, } pub(crate) fn simulate_poh( @@ -914,7 +919,7 @@ mod tests { (exit, poh_recorder, poh_simulator, entry_receiver) } - fn create_test_fixture(mint_sol: u64) -> TestFixture { + pub fn create_test_fixture(mint_sol: u64) -> TestFixture { let mint_keypair = Keypair::new(); let leader_keypair = Keypair::new(); let voting_keypair = Keypair::new(); @@ -941,7 +946,7 @@ mod tests { genesis_config.ticks_per_slot *= 8; // workaround for https://github.com/solana-labs/solana/issues/30085 - // the test can deploy and use spl_programs in the genensis slot without waiting for the next one + // the test can deploy and use spl_programs in the genesis slot without waiting for the next one let (bank, _) = Bank::new_with_bank_forks_for_tests(&genesis_config); let bank = Arc::new(Bank::new_from_parent(bank, &Pubkey::default(), 1)); @@ -953,7 +958,7 @@ mod tests { ); let (exit, poh_recorder, poh_simulator, entry_receiver) = - create_test_recorder(&bank, blockstore, Some(PohConfig::default()), None); + create_test_recorder(&bank, blockstore.clone(), Some(PohConfig::default()), None); let validator_pubkey = voting_keypair.pubkey(); TestFixture { @@ -965,6 +970,7 @@ mod tests { }, leader_keypair, bank, + blockstore, exit, poh_recorder, poh_simulator, @@ -1008,7 +1014,7 @@ mod tests { .collect() } - fn get_tip_manager(vote_account: &Pubkey) -> TipManager { + pub fn get_tip_manager(vote_account: &Pubkey) -> TipManager { TipManager::new(TipManagerConfig { tip_payment_program_id: Pubkey::from_str("T1pyyaTNZsKv2WcRAB8oVnk93mLJw2XzjtVYqCsaHqt") .unwrap(), @@ -1032,6 +1038,7 @@ mod tests { genesis_config_info, leader_keypair, bank, + blockstore: _, exit, poh_recorder, poh_simulator, @@ -1187,6 +1194,7 @@ mod tests { genesis_config_info, leader_keypair, bank, + blockstore: _, exit, poh_recorder, poh_simulator, @@ -1371,6 +1379,7 @@ mod tests { genesis_config_info, leader_keypair, bank, + blockstore: _, exit, poh_recorder, poh_simulator, diff --git a/core/src/bundle_stage/bundle_packet_receiver.rs b/core/src/bundle_stage/bundle_packet_receiver.rs index 90ae7d3ba2..b9f639eba9 100644 --- a/core/src/bundle_stage/bundle_packet_receiver.rs +++ b/core/src/bundle_stage/bundle_packet_receiver.rs @@ -223,13 +223,17 @@ mod tests { fn assert_bundles_same( packet_bundles: &[PacketBundle], - bundles_to_process: &[(ImmutableDeserializedBundle, SanitizedBundle)], + bundles_to_process: &[( + u8, /* attempts remaining */ + ImmutableDeserializedBundle, + SanitizedBundle, + )], ) { assert_eq!(packet_bundles.len(), bundles_to_process.len()); packet_bundles .iter() .zip(bundles_to_process.iter()) - .for_each(|(packet_bundle, (_, sanitized_bundle))| { + .for_each(|(packet_bundle, (_, _, sanitized_bundle))| { assert_eq!(packet_bundle.bundle_id, sanitized_bundle.bundle_id); assert_eq!( packet_bundle.batch.len(),