diff --git a/node/bft/ledger-service/src/ledger.rs b/node/bft/ledger-service/src/ledger.rs index adb6fe00bc..b8e42770dd 100644 --- a/node/bft/ledger-service/src/ledger.rs +++ b/node/bft/ledger-service/src/ledger.rs @@ -404,4 +404,29 @@ impl> LedgerService for CoreLedgerService< tracing::info!("\n\nAdvanced to block {} at round {} - {}\n", block.height(), block.round(), block.hash()); Ok(()) } + + fn compute_cost(&self, _transaction_id: N::TransactionID, transaction: Data>) -> Result { + // TODO: move to VM or ledger? + let process = self.ledger.vm().process(); + + // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error. + let transaction = match transaction { + Data::Object(transaction) => transaction, + Data::Buffer(bytes) => Transaction::::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?, + }; + + // Collect the Optional Stack corresponding to the transaction if its an Execution. + let stack = if let Transaction::Execute(_, ref execution, _) = transaction { + // Get the root transition from the execution. + let root_transition = execution.peek()?; + // Get the stack from the process. + Some(process.read().get_stack(root_transition.program_id())?.clone()) + } else { + None + }; + + use snarkvm::prelude::compute_cost; + + compute_cost(&transaction, stack) + } } diff --git a/node/bft/ledger-service/src/mock.rs b/node/bft/ledger-service/src/mock.rs index 2833878bfb..8a22b64b10 100644 --- a/node/bft/ledger-service/src/mock.rs +++ b/node/bft/ledger-service/src/mock.rs @@ -235,4 +235,9 @@ impl LedgerService for MockLedgerService { self.height_to_round_and_hash.lock().insert(block.height(), (block.round(), block.hash())); Ok(()) } + + /// TODO: is this reasonable? + fn compute_cost(&self, _transaction_id: N::TransactionID, _transaction: Data>) -> Result { + Ok(0) + } } diff --git a/node/bft/ledger-service/src/prover.rs b/node/bft/ledger-service/src/prover.rs index 9ff464a3d1..8c91e4c72f 100644 --- a/node/bft/ledger-service/src/prover.rs +++ b/node/bft/ledger-service/src/prover.rs @@ -186,4 +186,8 @@ impl LedgerService for ProverLedgerService { fn advance_to_next_block(&self, block: &Block) -> Result<()> { bail!("Cannot advance to next block in prover - {block}") } + + fn compute_cost(&self, transaction_id: N::TransactionID, _transaction: Data>) -> Result { + bail!("Transaction '{transaction_id}' doesn't exist in prover") + } } diff --git a/node/bft/ledger-service/src/traits.rs b/node/bft/ledger-service/src/traits.rs index 4080a51c2f..1c1d87479b 100644 --- a/node/bft/ledger-service/src/traits.rs +++ b/node/bft/ledger-service/src/traits.rs @@ -120,4 +120,6 @@ pub trait LedgerService: Debug + Send + Sync { /// Adds the given block as the next block in the ledger. #[cfg(feature = "ledger-write")] fn advance_to_next_block(&self, block: &Block) -> Result<()>; + + fn compute_cost(&self, transaction_id: N::TransactionID, transaction: Data>) -> Result; } diff --git a/node/bft/ledger-service/src/translucent.rs b/node/bft/ledger-service/src/translucent.rs index a3869738f1..a2baff14a5 100644 --- a/node/bft/ledger-service/src/translucent.rs +++ b/node/bft/ledger-service/src/translucent.rs @@ -195,4 +195,8 @@ impl> LedgerService for TranslucentLedgerS fn advance_to_next_block(&self, block: &Block) -> Result<()> { self.inner.advance_to_next_block(block) } + + fn compute_cost(&self, transaction_id: N::TransactionID, transaction: Data>) -> Result { + self.inner.compute_cost(transaction_id, transaction) + } } diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index 1c8793c931..604322260c 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -490,88 +490,100 @@ impl Primary { // Determined the required number of transmissions per worker. let num_transmissions_per_worker = BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH / self.num_workers() as usize; + // Initialize the map of transmissions. let mut transmissions: IndexMap<_, _> = Default::default(); + // Keeps track of the number of transmissions included thus far. The + // transmissions index is only updated in batches, this counter is more granular. + let mut num_transmissions_included = 0usize; + // Track the total execution costs of the batch proposal as it is being constructed. + let mut proposal_cost = 0u64; // Take the transmissions from the workers. - for worker in self.workers.iter() { + 'outer: for worker in self.workers.iter() { // Initialize a tracker for included transmissions for the current worker. let mut num_transmissions_included_for_worker = 0; - // Keep draining the worker until the desired number of transmissions is reached or the worker is empty. - 'outer: while num_transmissions_included_for_worker < num_transmissions_per_worker { - // Determine the number of remaining transmissions for the worker. - let num_remaining_transmissions = - num_transmissions_per_worker.saturating_sub(num_transmissions_included_for_worker); - // Drain the worker. - let mut worker_transmissions = worker.drain(num_remaining_transmissions).peekable(); - // If the worker is empty, break early. - if worker_transmissions.peek().is_none() { - break 'outer; + let mut worker_transmissions = worker.transmissions().into_iter(); + + // Check the transactions for inclusion in the batch proposal. + while num_transmissions_included_for_worker < num_transmissions_per_worker { + let Some((id, transmission)) = worker_transmissions.next() else { break }; + + // Check if the ledger already contains the transmission. + if self.ledger.contains_transmission(&id).unwrap_or(true) { + trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id)); + continue; } - // Iterate through the worker transmissions. - 'inner: for (id, transmission) in worker_transmissions { - // Check if the ledger already contains the transmission. - if self.ledger.contains_transmission(&id).unwrap_or(true) { - trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id)); - continue 'inner; - } - // Check if the storage already contain the transmission. - // Note: We do not skip if this is the first transmission in the proposal, to ensure that - // the primary does not propose a batch with no transmissions. - if !transmissions.is_empty() && self.storage.contains_transmission(id) { - trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id)); - continue 'inner; - } - // Check the transmission is still valid. - match (id, transmission.clone()) { - (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => { - // Ensure the checksum matches. - match solution.to_checksum::() { - Ok(solution_checksum) if solution_checksum == checksum => (), - _ => { - trace!( - "Proposing - Skipping solution '{}' - Checksum mismatch", - fmt_id(solution_id) - ); - continue 'inner; - } - } - // Check if the solution is still valid. - if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await { - trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id)); - continue 'inner; + + // Check if the storage already contain the transmission. + // Note: We do not skip if this is the first transmission in the proposal, to ensure that + // the primary does not propose a batch with no transmissions. + if num_transmissions_included != 0 && self.storage.contains_transmission(id) { + trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id)); + continue; + } + + // Check the transmission is still valid. + match (id, transmission.clone()) { + (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => { + // Ensure the checksum matches. + match solution.to_checksum::() { + Ok(solution_checksum) if solution_checksum == checksum => (), + _ => { + trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id)); + continue; } } - ( - TransmissionID::Transaction(transaction_id, checksum), - Transmission::Transaction(transaction), - ) => { - // Ensure the checksum matches. - match transaction.to_checksum::() { - Ok(transaction_checksum) if transaction_checksum == checksum => (), - _ => { - trace!( - "Proposing - Skipping transaction '{}' - Checksum mismatch", - fmt_id(transaction_id) - ); - continue 'inner; - } + // Check if the solution is still valid. + if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await { + trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id)); + continue; + } + } + (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => { + // Ensure the checksum matches. + match transaction.to_checksum::() { + Ok(transaction_checksum) if transaction_checksum == checksum => (), + _ => { + trace!( + "Proposing - Skipping transaction '{}' - Checksum mismatch", + fmt_id(transaction_id) + ); + continue; } - // Check if the transaction is still valid. - if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await { - trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id)); - continue 'inner; + } + // Check if the transaction is still valid. + // TODO: check if clone is cheap, otherwise fix. + if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction.clone()).await { + trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id)); + continue; + } + + // Ensure the transaction doesn't bring the proposal above the spend limit. + match self.ledger.compute_cost(transaction_id, transaction) { + Ok(cost) if proposal_cost + cost <= N::BATCH_SPEND_LIMIT => proposal_cost += cost, + _ => { + trace!( + "Proposing - Skipping transaction '{}' - Batch spend limit surpassed", + fmt_id(transaction_id) + ); + break 'outer; } } - // Note: We explicitly forbid including ratifications, - // as the protocol currently does not support ratifications. - (TransmissionID::Ratification, Transmission::Ratification) => continue, - // All other combinations are clearly invalid. - _ => continue 'inner, } - // Insert the transmission into the map. - transmissions.insert(id, transmission); - num_transmissions_included_for_worker += 1; + // Note: We explicitly forbid including ratifications, + // as the protocol currently does not support ratifications. + (TransmissionID::Ratification, Transmission::Ratification) => continue, + // All other combinations are clearly invalid. + _ => continue, } + + num_transmissions_included += 1; + num_transmissions_included_for_worker += 1; + } + + // Drain the selected transactions from the worker and insert them into the batch proposal. + for (id, transmission) in worker.drain(num_transmissions_included_for_worker) { + transmissions.insert(id, transmission); } } @@ -755,6 +767,35 @@ impl Primary { // Inserts the missing transmissions into the workers. self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?; + // Ensure the transaction doesn't bring the proposal above the spend limit. + let mut proposal_cost = 0u64; + for transmission_id in batch_header.transmission_ids() { + let worker_id = assign_to_worker(*transmission_id, self.num_workers())?; + let Some(worker) = self.workers.get(worker_id as usize) else { + debug!("Unable to find worker {worker_id}"); + return Ok(()); + }; + + let Some(transmission) = worker.get_transmission(*transmission_id) else { + debug!("Unable to find transmission '{}' in worker '{worker_id}", fmt_id(transmission_id)); + return Ok(()); + }; + + // If the transmission is a transaction, compute its execution cost. + if let (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) = + (transmission_id, transmission) + { + proposal_cost += self.ledger.compute_cost(*transaction_id, transaction)? + } + } + + if proposal_cost > N::BATCH_SPEND_LIMIT { + debug!( + "Batch propose from peer '{peer_ip}' exceeds the batch spend limit — cost in microcredits: '{proposal_cost}'" + ); + return Ok(()); + } + /* Proceeding to sign the batch. */ // Retrieve the batch ID. diff --git a/node/bft/src/worker.rs b/node/bft/src/worker.rs index 2200877d25..27b8e20d28 100644 --- a/node/bft/src/worker.rs +++ b/node/bft/src/worker.rs @@ -602,6 +602,7 @@ mod tests { transmissions: IndexMap, Transmission>, ) -> Result>; fn advance_to_next_block(&self, block: &Block) -> Result<()>; + fn compute_cost(&self, transaction_id: N::TransactionID, transaction: Data>) -> Result; } }