diff --git a/Cargo.lock b/Cargo.lock index 42d5f50fc5..43512845fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7209,22 +7209,27 @@ version = "1.16.19" dependencies = [ "anchor-lang", "clap 4.1.11", + "crossbeam-channel", "env_logger", "futures 0.3.28", + "gethostname", "im", "itertools", "jito-tip-distribution", "jito-tip-payment", "log", "num-traits", + "rand 0.7.3", "serde", "serde_json", "solana-client", "solana-genesis-utils", "solana-ledger", + "solana-measure", "solana-merkle-tree", "solana-metrics", "solana-program", + "solana-program-runtime", "solana-rpc-client-api", "solana-runtime", "solana-sdk", diff --git a/tip-distributor/Cargo.toml b/tip-distributor/Cargo.toml index 722b99c689..9d044fe983 100644 --- a/tip-distributor/Cargo.toml +++ b/tip-distributor/Cargo.toml @@ -7,29 +7,34 @@ description = "Collection of binaries used to distribute MEV rewards to delegato [dependencies] anchor-lang = { workspace = true } -clap = { version = "=4.1.11", features = ["derive", "env"] } -env_logger = "0.9.0" -futures = "0.3.21" -im = "15.1.0" -itertools = "0.10.3" +clap = { version = "4.1.11", features = ["derive", "env"] } +crossbeam-channel = { workspace = true } +env_logger = { workspace = true } +futures = { workspace = true } +gethostname = { workspace = true } +im = { workspace = true } +itertools = { workspace = true } jito-tip-distribution = { workspace = true } jito-tip-payment = { workspace = true } -log = "0.4.17" -num-traits = "0.2.15" -serde = "1.0.137" -serde_json = "1.0.81" +log = { workspace = true } +num-traits = { workspace = true } +rand = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } solana-client = { workspace = true } solana-genesis-utils = { workspace = true } solana-ledger = { workspace = true } solana-merkle-tree = { workspace = true } +solana-measure = { workspace = true } solana-metrics = { workspace = true } solana-program = { workspace = true } +solana-program-runtime = { workspace = true } solana-rpc-client-api = { workspace = true } solana-runtime = { workspace = true } solana-sdk = { workspace = true } solana-stake-program = { workspace = true } thiserror = { workspace = true } -tokio = { workspace = true, features = ["rt-multi-thread", "macros", "sync", "time", "full"] } +tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } [[bin]] name = "solana-stake-meta-generator" @@ -46,7 +51,3 @@ path = "src/bin/merkle-root-uploader.rs" [[bin]] name = "solana-claim-mev-tips" path = "src/bin/claim-mev-tips.rs" - -[[bin]] -name = "solana-reclaim-rent" -path = "src/bin/reclaim-rent.rs" diff --git a/tip-distributor/README.md b/tip-distributor/README.md index c100843a41..fec682879a 100644 --- a/tip-distributor/README.md +++ b/tip-distributor/README.md @@ -27,8 +27,15 @@ out into the PDA until some slot in epoch N + 1. Due to this we cannot rely on t in the PDAs. We use the bank solely to take a snapshot of delegations, but an RPC node to fetch the PDA lamports for more up-to-date data. ### merkle-root-generator -This script accepts a path to the above JSON file as one of its arguments, and generates a merkle-root. It'll optionally upload the root -on-chain if specified. Additionally, it'll spit the generated merkle trees out into a JSON file. +This script accepts a path to the above JSON file as one of its arguments, and generates a merkle-root into a JSON file. + +### merkle-root-uploader +Uploads the root on-chain. + +### claim-mev-tips +This reads the file outputted by `merkle-root-generator` and finds all eligible accounts to receive mev tips. Transactions +are created and sent to the RPC server. + ## How it works? In order to use this library as the merkle root creator one must follow the following steps: @@ -38,6 +45,8 @@ In order to use this library as the merkle root creator one must follow the foll 1. The snapshot created at `${WHERE_TO_CREATE_SNAPSHOT}` will have the highest slot of `${YOUR_SLOT}`, assuming you downloaded the correct snapshot. 4. Run `stake-meta-generator --ledger-path ${WHERE_TO_CREATE_SNAPSHOT} --tip-distribution-program-id ${PUBKEY} --out-path ${JSON_OUT_PATH} --snapshot-slot ${SLOT} --rpc-url ${URL}` 1. Note: `${WHERE_TO_CREATE_SNAPSHOT}` must be the same in steps 3 & 4. -5. Run `merkle-root-generator --path-to-my-keypair ${KEYPAIR_PATH} --stake-meta-coll-path ${STAKE_META_COLLECTION_JSON} --rpc-url ${URL} --upload-roots ${BOOL} --force-upload-root ${BOOL}` +5. Run `merkle-root-generator --stake-meta-coll-path ${STAKE_META_COLLECTION_JSON} --rpc-url ${URL} --out-path ${MERKLE_ROOT_PATH}` +6. Run `merkle-root-uploader --out-path ${MERKLE_ROOT_PATH} --keypair-path ${KEYPAIR_PATH} --rpc-url ${URL} --tip-distribution-program-id ${PROGRAM_ID}` +7. Run `solana-claim-mev-tips --merkle-trees-path /solana/ledger/autosnapshot/merkle-tree-221615999.json --rpc-url ${URL} --tip-distribution-program-id ${PROGRAM_ID} --keypair-path ${KEYPAIR_PATH}` Voila! diff --git a/tip-distributor/src/bin/claim-mev-tips.rs b/tip-distributor/src/bin/claim-mev-tips.rs index 4a9a789509..b77dcf89c8 100644 --- a/tip-distributor/src/bin/claim-mev-tips.rs +++ b/tip-distributor/src/bin/claim-mev-tips.rs @@ -1,11 +1,21 @@ -//! This binary claims MEV tips. - +///! This binary claims MEV tips. use { clap::Parser, + gethostname::gethostname, log::*, - solana_sdk::pubkey::Pubkey, - solana_tip_distributor::claim_mev_workflow::claim_mev_tips, - std::{path::PathBuf, str::FromStr}, + solana_metrics::{datapoint_error, datapoint_info, set_host_id}, + solana_sdk::{pubkey::Pubkey, signature::read_keypair_file}, + solana_tip_distributor::{ + claim_mev_workflow::{claim_mev_tips, ClaimMevError}, + read_json_from_file, + reclaim_rent_workflow::reclaim_rent, + GeneratedMerkleTreeCollection, + }, + std::{ + path::PathBuf, + sync::Arc, + time::{Duration, Instant}, + }, }; #[derive(Parser, Debug)] @@ -16,37 +26,146 @@ struct Args { merkle_trees_path: PathBuf, /// RPC to send transactions through - #[arg(long, env)] + #[arg(long, env, default_value = "http://localhost:8899")] rpc_url: String, /// Tip distribution program ID #[arg(long, env)] - tip_distribution_program_id: String, + tip_distribution_program_id: Pubkey, /// Path to keypair #[arg(long, env)] keypair_path: PathBuf, + + /// Number of unique connections to the RPC server for sending txns + #[arg(long, env, default_value_t = 128)] + rpc_send_connection_count: u64, + + /// Rate-limits the maximum number of GET requests per RPC connection + #[arg(long, env, default_value_t = 256)] + max_concurrent_rpc_get_reqs: usize, + + /// Number of retries for main claim send loop. Loop is time bounded. + #[arg(long, env, default_value_t = 5)] + max_loop_retries: u64, + + /// Limits how long before send loop runs before stopping. Defaults to 10 mins + #[arg(long, env, default_value_t = 10 * 60)] + max_loop_duration_secs: u64, + + /// Specifies whether to reclaim any rent. + #[arg(long, env, default_value_t = true)] + should_reclaim_rent: bool, + + /// Specifies whether to reclaim rent on behalf of validators from respective TDAs. + #[arg(long, env)] + should_reclaim_tdas: bool, } -fn main() { +#[tokio::main] +async fn main() -> Result<(), ClaimMevError> { env_logger::init(); - info!("Starting to claim mev tips..."); - + gethostname() + .into_string() + .map(set_host_id) + .expect("set hostname"); let args: Args = Args::parse(); + let keypair = Arc::new(read_keypair_file(&args.keypair_path).expect("read keypair file")); + let merkle_trees: GeneratedMerkleTreeCollection = + read_json_from_file(&args.merkle_trees_path).expect("read GeneratedMerkleTreeCollection"); + let max_loop_duration = Duration::from_secs(args.max_loop_duration_secs); - let tip_distribution_program_id = Pubkey::from_str(&args.tip_distribution_program_id) - .expect("valid tip_distribution_program_id"); - - if let Err(e) = claim_mev_tips( - &args.merkle_trees_path, - &args.rpc_url, - &tip_distribution_program_id, - &args.keypair_path, - ) { - panic!("error claiming mev tips: {:?}", e); - } info!( - "done claiming mev tips from file {:?}", - args.merkle_trees_path + "Starting to claim mev tips for epoch: {}", + merkle_trees.epoch ); + let start = Instant::now(); + + match claim_mev_tips( + merkle_trees.clone(), + args.rpc_url.clone(), + args.rpc_send_connection_count, + args.max_concurrent_rpc_get_reqs, + &args.tip_distribution_program_id, + keypair.clone(), + args.max_loop_retries, + max_loop_duration, + ) + .await + { + Err(e) => { + datapoint_error!( + "claim_mev_workflow-claim_error", + ("epoch", merkle_trees.epoch, i64), + ("error", 1, i64), + ("err_str", e.to_string(), String), + ( + "merkle_trees_path", + args.merkle_trees_path.to_string_lossy(), + String + ), + ("elapsed_us", start.elapsed().as_micros(), i64), + ); + Err(e) + } + Ok(()) => { + datapoint_info!( + "claim_mev_workflow-claim_completion", + ("epoch", merkle_trees.epoch, i64), + ( + "merkle_trees_path", + args.merkle_trees_path.to_string_lossy(), + String + ), + ("elapsed_us", start.elapsed().as_micros(), i64), + ); + Ok(()) + } + }?; + + if args.should_reclaim_rent { + let start = Instant::now(); + match reclaim_rent( + args.rpc_url, + args.rpc_send_connection_count, + args.tip_distribution_program_id, + keypair, + args.max_loop_retries, + max_loop_duration, + args.should_reclaim_tdas, + ) + .await + { + Err(e) => { + datapoint_error!( + "claim_mev_workflow-reclaim_rent_error", + ("epoch", merkle_trees.epoch, i64), + ("error", 1, i64), + ("err_str", e.to_string(), String), + ( + "merkle_trees_path", + args.merkle_trees_path.to_string_lossy(), + String + ), + ("elapsed_us", start.elapsed().as_micros(), i64), + ); + Err(e) + } + Ok(()) => { + datapoint_info!( + "claim_mev_workflow-reclaim_rent_completion", + ("epoch", merkle_trees.epoch, i64), + ( + "merkle_trees_path", + args.merkle_trees_path.to_string_lossy(), + String + ), + ("elapsed_us", start.elapsed().as_micros(), i64), + ); + Ok(()) + } + }?; + } + solana_metrics::flush(); // sometimes last datapoint doesn't get emitted. this increases likelihood. + Ok(()) } diff --git a/tip-distributor/src/bin/merkle-root-uploader.rs b/tip-distributor/src/bin/merkle-root-uploader.rs index 9fcd7b8ed7..9000ce66d0 100644 --- a/tip-distributor/src/bin/merkle-root-uploader.rs +++ b/tip-distributor/src/bin/merkle-root-uploader.rs @@ -1,9 +1,6 @@ use { - clap::Parser, - log::info, - solana_sdk::pubkey::Pubkey, - solana_tip_distributor::merkle_root_upload_workflow::upload_merkle_root, - std::{path::PathBuf, str::FromStr}, + clap::Parser, log::info, solana_sdk::pubkey::Pubkey, + solana_tip_distributor::merkle_root_upload_workflow::upload_merkle_root, std::path::PathBuf, }; #[derive(Parser, Debug)] @@ -23,7 +20,15 @@ struct Args { /// Tip distribution program ID #[arg(long, env)] - tip_distribution_program_id: String, + tip_distribution_program_id: Pubkey, + + /// Rate-limits the maximum number of requests per RPC connection + #[arg(long, env, default_value_t = 100)] + max_concurrent_rpc_get_reqs: usize, + + /// Number of transactions to send to RPC at a time. + #[arg(long, env, default_value_t = 64)] + txn_send_batch_size: usize, } fn main() { @@ -31,15 +36,14 @@ fn main() { let args: Args = Args::parse(); - let tip_distribution_program_id = Pubkey::from_str(&args.tip_distribution_program_id) - .expect("valid tip_distribution_program_id"); - info!("starting merkle root uploader..."); if let Err(e) = upload_merkle_root( &args.merkle_root_path, &args.keypair_path, &args.rpc_url, - &tip_distribution_program_id, + &args.tip_distribution_program_id, + args.max_concurrent_rpc_get_reqs, + args.txn_send_batch_size, ) { panic!("failed to upload merkle roots: {:?}", e); } diff --git a/tip-distributor/src/bin/reclaim-rent.rs b/tip-distributor/src/bin/reclaim-rent.rs deleted file mode 100644 index 5aa372a27a..0000000000 --- a/tip-distributor/src/bin/reclaim-rent.rs +++ /dev/null @@ -1,62 +0,0 @@ -//! Reclaims rent from TDAs and Claim Status accounts. - -use { - clap::Parser, - log::*, - solana_client::nonblocking::rpc_client::RpcClient, - solana_sdk::{ - commitment_config::CommitmentConfig, pubkey::Pubkey, signature::read_keypair_file, - }, - solana_tip_distributor::reclaim_rent_workflow::reclaim_rent, - std::{path::PathBuf, str::FromStr, time::Duration}, - tokio::runtime::Runtime, -}; - -#[derive(Parser, Debug)] -#[command(author, version, about, long_about = None)] -struct Args { - /// RPC to send transactions through. - /// NOTE: This script uses getProgramAccounts, make sure you have added an account index - /// for the tip_distribution_program_id on the RPC node. - #[arg(long, env)] - rpc_url: String, - - /// Tip distribution program ID. - #[arg(long, env, value_parser = Pubkey::from_str)] - tip_distribution_program_id: Pubkey, - - /// The keypair signing and paying for transactions. - #[arg(long, env)] - keypair_path: PathBuf, - - /// High timeout b/c of get_program_accounts call - #[arg(long, env, default_value_t = 180)] - rpc_timeout_secs: u64, - - /// Specifies whether to reclaim rent on behalf of validators from respective TDAs. - #[arg(long, env)] - should_reclaim_tdas: bool, -} - -fn main() { - env_logger::init(); - - info!("Starting to claim mev tips..."); - let args: Args = Args::parse(); - - let runtime = Runtime::new().unwrap(); - if let Err(e) = runtime.block_on(reclaim_rent( - RpcClient::new_with_timeout_and_commitment( - args.rpc_url, - Duration::from_secs(args.rpc_timeout_secs), - CommitmentConfig::confirmed(), - ), - args.tip_distribution_program_id, - read_keypair_file(&args.keypair_path).expect("read keypair file"), - args.should_reclaim_tdas, - )) { - panic!("error reclaiming rent: {e:?}"); - } - - info!("done reclaiming all rent",); -} diff --git a/tip-distributor/src/claim_mev_workflow.rs b/tip-distributor/src/claim_mev_workflow.rs index 446011b1ae..911358cae6 100644 --- a/tip-distributor/src/claim_mev_workflow.rs +++ b/tip-distributor/src/claim_mev_workflow.rs @@ -1,26 +1,32 @@ use { crate::{ - read_json_from_file, sign_and_send_transactions_with_retries, GeneratedMerkleTreeCollection, + claim_mev_workflow::ClaimMevError::{ClaimantNotFound, InsufficientBalance, TDANotFound}, + sign_and_send_transactions_with_retries_multi_rpc, GeneratedMerkleTreeCollection, TreeNode, }, anchor_lang::{AccountDeserialize, InstructionData, ToAccountMetas}, - jito_tip_distribution::state::*, - log::{debug, info, warn}, - solana_client::{nonblocking::rpc_client::RpcClient, rpc_request::RpcError}, + itertools::Itertools, + jito_tip_distribution::state::{ClaimStatus, Config, TipDistributionAccount}, + log::{debug, error, info}, + solana_client::nonblocking::rpc_client::RpcClient, + solana_metrics::{datapoint_info, datapoint_warn}, solana_program::{ fee_calculator::DEFAULT_TARGET_LAMPORTS_PER_SIGNATURE, native_token::LAMPORTS_PER_SOL, stake::state::StakeState, system_program, }, - solana_rpc_client_api::client_error, solana_sdk::{ + account::Account, commitment_config::CommitmentConfig, instruction::Instruction, pubkey::Pubkey, - signature::{read_keypair_file, Signer}, + signature::{Keypair, Signer}, transaction::Transaction, }, - std::{path::PathBuf, time::Duration}, + std::{ + collections::HashMap, + sync::Arc, + time::{Duration, Instant}, + }, thiserror::Error, - tokio::runtime::Builder, }; #[derive(Error, Debug)] @@ -30,123 +36,411 @@ pub enum ClaimMevError { #[error(transparent)] JsonError(#[from] serde_json::Error), + + #[error(transparent)] + AnchorError(anchor_lang::error::Error), + + #[error("TDA not found for pubkey: {0:?}")] + TDANotFound(Pubkey), + + #[error("Claim Status not found for pubkey: {0:?}")] + ClaimStatusNotFound(Pubkey), + + #[error("Claimant not found for pubkey: {0:?}")] + ClaimantNotFound(Pubkey), + + #[error(transparent)] + MaxFetchRetriesExceeded(#[from] solana_rpc_client_api::client_error::Error), + + #[error("Failed after {attempts} retries. {remaining_transaction_count} remaining mev claim transactions, {failed_transaction_count} failed requests.",)] + MaxSendTransactionRetriesExceeded { + attempts: u64, + remaining_transaction_count: usize, + failed_transaction_count: usize, + }, + + #[error("Expected to have at least {desired_balance} lamports in {payer:?}. Current balance is {start_balance} lamports. Deposit {sol_to_deposit} SOL to continue.")] + InsufficientBalance { + desired_balance: u64, + payer: Pubkey, + start_balance: u64, + sol_to_deposit: u64, + }, } -pub fn claim_mev_tips( - merkle_root_path: &PathBuf, - rpc_url: &str, +pub async fn claim_mev_tips( + merkle_trees: GeneratedMerkleTreeCollection, + rpc_url: String, + rpc_send_connection_count: u64, + max_concurrent_rpc_get_reqs: usize, tip_distribution_program_id: &Pubkey, - keypair_path: &PathBuf, + keypair: Arc, + max_loop_retries: u64, + max_loop_duration: Duration, ) -> Result<(), ClaimMevError> { - const MAX_RETRY_DURATION: Duration = Duration::from_secs(600); + let payer_pubkey = keypair.pubkey(); + let blockhash_rpc_client = Arc::new(RpcClient::new_with_commitment( + rpc_url.clone(), + CommitmentConfig::finalized(), + )); + let rpc_clients = Arc::new( + (0..rpc_send_connection_count) + .map(|_| { + Arc::new(RpcClient::new_with_commitment( + rpc_url.clone(), + CommitmentConfig::confirmed(), + )) + }) + .collect_vec(), + ); - let merkle_trees: GeneratedMerkleTreeCollection = - read_json_from_file(merkle_root_path).expect("read GeneratedMerkleTreeCollection"); - let keypair = read_keypair_file(keypair_path).expect("read keypair file"); + let tree_nodes = merkle_trees + .generated_merkle_trees + .iter() + .flat_map(|tree| &tree.tree_nodes) + .collect_vec(); + let stake_acct_min_rent = blockhash_rpc_client + .get_minimum_balance_for_rent_exemption(StakeState::size_of()) + .await + .expect("Failed to calculate min rent"); - let tip_distribution_config = - Pubkey::find_program_address(&[Config::SEED], tip_distribution_program_id).0; + // fetch all accounts up front + info!( + "Starting to fetch accounts for epoch {}", + merkle_trees.epoch + ); + let tdas = crate::get_batched_accounts( + &blockhash_rpc_client, + max_concurrent_rpc_get_reqs, + merkle_trees + .generated_merkle_trees + .iter() + .map(|tree| tree.tip_distribution_account) + .collect_vec(), + ) + .await + .map_err(ClaimMevError::MaxFetchRetriesExceeded)? + .into_iter() + .filter_map(|(pubkey, maybe_account)| { + let account = match maybe_account { + Some(account) => account, + None => { + datapoint_warn!( + "claim_mev_workflow-account_error", + ("epoch", merkle_trees.epoch, i64), + ("pubkey", pubkey.to_string(), String), + ("account_type", "tip_distribution_account", String), + ("error", 1, i64), + ("err_type", "fetch", String), + ("err_str", "Failed to fetch TipDistributionAccount", String) + ); + return None; + } + }; - let rpc_client = - RpcClient::new_with_commitment(rpc_url.to_string(), CommitmentConfig::finalized()); + let account = match TipDistributionAccount::try_deserialize(&mut account.data.as_slice()) { + Ok(a) => a, + Err(e) => { + datapoint_warn!( + "claim_mev_workflow-account_error", + ("epoch", merkle_trees.epoch, i64), + ("pubkey", pubkey.to_string(), String), + ("account_type", "tip_distribution_account", String), + ("error", 1, i64), + ("err_type", "deserialize_tip_distribution_account", String), + ("err_str", e.to_string(), String) + ); + return None; + } + }; + Some((pubkey, account)) + }) + .collect::>(); - let runtime = Builder::new_multi_thread() - .worker_threads(16) - .enable_all() - .build() - .unwrap(); + // track balances only + let claimants = crate::get_batched_accounts( + &blockhash_rpc_client, + max_concurrent_rpc_get_reqs, + tree_nodes + .iter() + .map(|tree_node| tree_node.claimant) + .collect_vec(), + ) + .await + .map_err(ClaimMevError::MaxFetchRetriesExceeded)? + .into_iter() + .map(|(pubkey, maybe_account)| { + ( + pubkey, + maybe_account + .map(|account| account.lamports) + .unwrap_or_default(), + ) + }) + .collect::>(); + + // Refresh claimants + Try sending txns to RPC + let mut retries = 0; + let mut failed_transaction_count = 0usize; + loop { + let start = Instant::now(); + let claim_statuses = crate::get_batched_accounts( + &blockhash_rpc_client, + max_concurrent_rpc_get_reqs, + tree_nodes + .iter() + .map(|tree_node| tree_node.claim_status_pubkey) + .collect_vec(), + ) + .await + .map_err(ClaimMevError::MaxFetchRetriesExceeded)?; + let account_fetch_elapsed = start.elapsed(); - let mut instructions = Vec::new(); + let ( + skipped_merkle_root_count, + zero_lamports_count, + already_claimed_count, + below_min_rent_count, + transactions, + ) = build_transactions( + tip_distribution_program_id, + &merkle_trees, + &payer_pubkey, + &tree_nodes, + stake_acct_min_rent, + &tdas, + &claimants, + &claim_statuses, + )?; + datapoint_info!( + "claim_mev_workflow-prepare_transactions", + ("epoch", merkle_trees.epoch, i64), + ("attempt", retries, i64), + ("tree_node_count", tree_nodes.len(), i64), + ("tda_count", tdas.len(), i64), + ("claimant_count", claimants.len(), i64), + ("claim_status_count", claim_statuses.len(), i64), + ("skipped_merkle_root_count", skipped_merkle_root_count, i64), + ("zero_lamports_count", zero_lamports_count, i64), + ("already_claimed_count", already_claimed_count, i64), + ("below_min_rent_count", below_min_rent_count, i64), + ("transaction_count", transactions.len(), i64), + ( + "account_fetch_latency_us", + account_fetch_elapsed.as_micros(), + i64 + ), + ( + "transaction_prepare_latency_us", + start.elapsed().as_micros(), + i64 + ), + ); - runtime.block_on(async move { - let start_balance = rpc_client.get_balance(&keypair.pubkey()).await.expect("failed to get balance"); - // heuristic to make sure we have enough funds to cover the rent costs if epoch has many validators + if transactions.is_empty() { + info!("Finished claiming tips after {retries} retries, {failed_transaction_count} failed requests."); + return Ok(()); + } + + if let Some((start_balance, desired_balance, sol_to_deposit)) = is_sufficient_balance( + &payer_pubkey, + &blockhash_rpc_client, + transactions.len() as u64, + ) + .await { - // most amounts are for 0 lamports. had 1736 non-zero claims out of 164742 - let node_count = merkle_trees.generated_merkle_trees.iter().flat_map(|tree| &tree.tree_nodes).filter(|node| node.amount > 0).count(); - let min_rent_per_claim = rpc_client.get_minimum_balance_for_rent_exemption(ClaimStatus::SIZE).await.expect("Failed to calculate min rent"); - let desired_balance = (node_count as u64).checked_mul(min_rent_per_claim.checked_add(DEFAULT_TARGET_LAMPORTS_PER_SIGNATURE).unwrap()).unwrap(); - if start_balance < desired_balance { - let sol_to_deposit = desired_balance.checked_sub(start_balance).unwrap().checked_add(LAMPORTS_PER_SOL).unwrap().checked_sub(1).unwrap().checked_div(LAMPORTS_PER_SOL).unwrap(); // rounds up to nearest sol - panic!("Expected to have at least {} lamports in {}, current balance is {} lamports, deposit {} SOL to continue.", - desired_balance, &keypair.pubkey(), start_balance, sol_to_deposit) - } + return Err(InsufficientBalance { + desired_balance, + payer: payer_pubkey, + start_balance, + sol_to_deposit, + }); } - let stake_acct_min_rent = rpc_client.get_minimum_balance_for_rent_exemption(StakeState::size_of()).await.expect("Failed to calculate min rent"); - let mut below_min_rent_count: usize = 0; - let mut zero_lamports_count: usize = 0; - for tree in merkle_trees.generated_merkle_trees { - // only claim for ones that have merkle root on-chain - let account = rpc_client.get_account(&tree.tip_distribution_account).await.expect("expected to fetch tip distribution account"); - let fetched_tip_distribution_account = TipDistributionAccount::try_deserialize(&mut account.data.as_slice()).expect("failed to deserialize tip_distribution_account state"); - if fetched_tip_distribution_account.merkle_root.is_none() { - info!( - "not claiming because merkle root isn't uploaded yet. skipped {} claimants for tda: {:?}", - tree.tree_nodes.len(), - tree.tip_distribution_account - ); + let transactions_len = transactions.len(); + + info!("Sending {} tip claim transactions. {zero_lamports_count} would transfer zero lamports, {below_min_rent_count} would be below minimum rent", transactions.len()); + let send_start = Instant::now(); + let (remaining_transaction_count, new_failed_transaction_count) = + sign_and_send_transactions_with_retries_multi_rpc( + &keypair, + &blockhash_rpc_client, + &rpc_clients, + transactions, + max_loop_duration, + ) + .await; + failed_transaction_count += new_failed_transaction_count; + + datapoint_info!( + "claim_mev_workflow-send_transactions", + ("epoch", merkle_trees.epoch, i64), + ("attempt", retries, i64), + ("transaction_count", transactions_len, i64), + ( + "successful_transaction_count", + transactions_len - remaining_transaction_count, + i64 + ), + ( + "remaining_transaction_count", + remaining_transaction_count, + i64 + ), + ( + "failed_transaction_count", + new_failed_transaction_count, + i64 + ), + ("send_latency_us", send_start.elapsed().as_micros(), i64), + ); + + if retries >= max_loop_retries { + return Err(ClaimMevError::MaxSendTransactionRetriesExceeded { + attempts: max_loop_retries, + remaining_transaction_count, + failed_transaction_count, + }); + } + retries += 1; + } +} + +#[allow(clippy::result_large_err)] +fn build_transactions( + tip_distribution_program_id: &Pubkey, + merkle_trees: &GeneratedMerkleTreeCollection, + payer_pubkey: &Pubkey, + tree_nodes: &[&TreeNode], + stake_acct_min_rent: u64, + tdas: &HashMap, + claimants: &HashMap, + claim_statuses: &HashMap>, +) -> Result<(usize, usize, usize, usize, Vec), ClaimMevError> { + let tip_distribution_config = + Pubkey::find_program_address(&[Config::SEED], tip_distribution_program_id).0; + let mut skipped_merkle_root_count: usize = 0; + let mut zero_lamports_count: usize = 0; + let mut already_claimed_count: usize = 0; + let mut below_min_rent_count: usize = 0; + let mut instructions = + Vec::with_capacity(tree_nodes.iter().filter(|node| node.amount > 0).count()); + + // prepare instructions to transfer to all claimants + for tree in &merkle_trees.generated_merkle_trees { + let fetched_tip_distribution_account = match tdas.get(&tree.tip_distribution_account) { + Some(account) => account, + None => return Err(TDANotFound(tree.tip_distribution_account)), + }; + // only claim for ones that have merkle root on-chain + if fetched_tip_distribution_account.merkle_root.is_none() { + info!( + "Merkle root has not uploaded yet. Skipped {} claimants for TDA: {:?}", + tree.tree_nodes.len(), + tree.tip_distribution_account + ); + skipped_merkle_root_count = skipped_merkle_root_count.checked_add(1).unwrap(); + continue; + } + for node in &tree.tree_nodes { + if node.amount == 0 { + zero_lamports_count = zero_lamports_count.checked_add(1).unwrap(); continue; } - for node in tree.tree_nodes { - if node.amount == 0 { - zero_lamports_count = zero_lamports_count.checked_add(1).unwrap(); + + // make sure not previously claimed + match claim_statuses.get(&node.claim_status_pubkey) { + Some(None) => {} // expected to not find ClaimStatus account, don't skip + Some(Some(_account)) => { + debug!( + "Claim status account already exists (already paid out). Skipping pubkey: {:?}.", node.claim_status_pubkey, + ); + already_claimed_count = already_claimed_count.checked_add(1).unwrap(); continue; } + None => return Err(ClaimantNotFound(node.claim_status_pubkey)), + }; + let current_balance = match claimants.get(&node.claimant) { + Some(balance) => balance, + None => return Err(ClaimantNotFound(node.claimant)), + }; - // make sure not previously claimed - match rpc_client.get_account(&node.claim_status_pubkey).await { - Ok(_) => { - debug!("claim status account already exists, skipping pubkey {:?}.", node.claim_status_pubkey); - continue; - } - // expected to not find ClaimStatus account, don't skip - Err(client_error::Error { kind: client_error::ErrorKind::RpcError(RpcError::ForUser(err)), .. }) if err.starts_with("AccountNotFound") => {} - Err(err) => panic!("Unexpected RPC Error: {}", err), + // some older accounts can be rent-paying + // any new transfers will need to make the account rent-exempt (runtime enforced) + let balance_with_tip = current_balance.checked_add(node.amount).unwrap(); + if balance_with_tip < stake_acct_min_rent { + debug!("Current balance + tip claim amount of {balance_with_tip} is less than required rent-exempt of {stake_acct_min_rent} for pubkey: {}. Skipping.", node.claimant); + below_min_rent_count = below_min_rent_count.checked_add(1).unwrap(); + continue; + } + instructions.push(Instruction { + program_id: *tip_distribution_program_id, + data: jito_tip_distribution::instruction::Claim { + proof: node.proof.clone().unwrap(), + amount: node.amount, + bump: node.claim_status_bump, } - - let current_balance = rpc_client.get_balance(&node.claimant).await.expect("Failed to get balance"); - // some older accounts can be rent-paying - // any new transfers will need to make the account rent-exempt (runtime enforced) - if current_balance.checked_add(node.amount).unwrap() < stake_acct_min_rent { - warn!("Current balance + tip claim amount of {} is less than required rent-exempt of {} for pubkey: {}. Skipping.", - current_balance.checked_add(node.amount).unwrap(), stake_acct_min_rent, node.claimant); - below_min_rent_count = below_min_rent_count.checked_add(1).unwrap(); - continue; + .data(), + accounts: jito_tip_distribution::accounts::Claim { + config: tip_distribution_config, + tip_distribution_account: tree.tip_distribution_account, + claimant: node.claimant, + claim_status: node.claim_status_pubkey, + payer: *payer_pubkey, + system_program: system_program::id(), } - instructions.push(Instruction { - program_id: *tip_distribution_program_id, - data: jito_tip_distribution::instruction::Claim { - proof: node.proof.unwrap(), - amount: node.amount, - bump: node.claim_status_bump, - }.data(), - accounts: jito_tip_distribution::accounts::Claim { - config: tip_distribution_config, - tip_distribution_account: tree.tip_distribution_account, - claimant: node.claimant, - claim_status: node.claim_status_pubkey, - payer: keypair.pubkey(), - system_program: system_program::id(), - }.to_account_metas(None), - }); - } + .to_account_metas(None), + }); } + } - let transactions = instructions.into_iter().map(|ix|{ - Transaction::new_with_payer( - &[ix], - Some(&keypair.pubkey()), - ) - }).collect::>(); - - info!("Sending {} tip claim transactions. {} tried sending zero lamports, {} would be below minimum rent", - &transactions.len(), zero_lamports_count, below_min_rent_count); - - let failed_transactions = sign_and_send_transactions_with_retries(&keypair, &rpc_client, transactions, MAX_RETRY_DURATION).await; - if !failed_transactions.is_empty() { - panic!("failed to send {} transactions", failed_transactions.len()); - } - }); + let transactions = instructions + .into_iter() + .map(|ix| Transaction::new_with_payer(&[ix], Some(payer_pubkey))) + .collect::>(); + Ok(( + skipped_merkle_root_count, + zero_lamports_count, + already_claimed_count, + below_min_rent_count, + transactions, + )) +} - Ok(()) +/// heuristic to make sure we have enough funds to cover the rent costs if epoch has many validators +/// If insufficient funds, returns start balance, desired balance, and amount of sol to deposit +async fn is_sufficient_balance( + payer: &Pubkey, + rpc_client: &RpcClient, + instruction_count: u64, +) -> Option<(u64, u64, u64)> { + let start_balance = rpc_client + .get_balance(payer) + .await + .expect("Failed to get starting balance"); + // most amounts are for 0 lamports. had 1736 non-zero claims out of 164742 + let min_rent_per_claim = rpc_client + .get_minimum_balance_for_rent_exemption(ClaimStatus::SIZE) + .await + .expect("Failed to calculate min rent"); + let desired_balance = instruction_count + .checked_mul( + min_rent_per_claim + .checked_add(DEFAULT_TARGET_LAMPORTS_PER_SIGNATURE) + .unwrap(), + ) + .unwrap(); + if start_balance < desired_balance { + let sol_to_deposit = desired_balance + .checked_sub(start_balance) + .unwrap() + .checked_add(LAMPORTS_PER_SOL) + .unwrap() + .checked_sub(1) + .unwrap() + .checked_div(LAMPORTS_PER_SOL) + .unwrap(); // rounds up to nearest sol + Some((start_balance, desired_balance, sol_to_deposit)) + } else { + None + } } diff --git a/tip-distributor/src/lib.rs b/tip-distributor/src/lib.rs index bd8de90230..800b48077d 100644 --- a/tip-distributor/src/lib.rs +++ b/tip-distributor/src/lib.rs @@ -10,6 +10,7 @@ use { stake_meta_generator_workflow::StakeMetaGeneratorError::CheckedMathError, }, anchor_lang::Id, + itertools::Itertools, jito_tip_distribution::{ program::JitoTipDistribution, state::{ClaimStatus, TipDistributionAccount}, @@ -20,35 +21,44 @@ use { TIP_ACCOUNT_SEED_7, }, log::*, + rand::prelude::SliceRandom, serde::{de::DeserializeOwned, Deserialize, Serialize}, solana_client::{nonblocking::rpc_client::RpcClient, rpc_client::RpcClient as SyncRpcClient}, solana_merkle_tree::MerkleTree, solana_metrics::{datapoint_error, datapoint_warn}, + solana_program::instruction::InstructionError, solana_rpc_client_api::{ client_error::{Error, ErrorKind}, - request::RpcRequest, + request::{RpcError, RpcResponseErrorData, MAX_MULTIPLE_ACCOUNTS}, + response::RpcSimulateTransactionResult, }, solana_sdk::{ - account::{AccountSharedData, ReadableAccount}, + account::{Account, AccountSharedData, ReadableAccount}, clock::Slot, hash::{Hash, Hasher}, pubkey::Pubkey, signature::{Keypair, Signature}, stake_history::Epoch, - transaction::{Transaction, TransactionError::AlreadyProcessed}, + transaction::{ + Transaction, + TransactionError::{self}, + }, }, std::{ collections::HashMap, fs::File, io::BufReader, path::PathBuf, - sync::Arc, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, time::{Duration, Instant}, }, - tokio::time::sleep, + tokio::sync::{RwLock, Semaphore}, }; -#[derive(Deserialize, Serialize, Debug)] +#[derive(Clone, Deserialize, Serialize, Debug)] pub struct GeneratedMerkleTreeCollection { pub generated_merkle_trees: Vec, pub bank_hash: String, @@ -56,7 +66,7 @@ pub struct GeneratedMerkleTreeCollection { pub slot: Slot, } -#[derive(Eq, Debug, Hash, PartialEq, Deserialize, Serialize)] +#[derive(Clone, Eq, Debug, Hash, PartialEq, Deserialize, Serialize)] pub struct GeneratedMerkleTree { #[serde(with = "pubkey_string_conversion")] pub tip_distribution_account: Pubkey, @@ -460,104 +470,225 @@ pub fn derive_tip_distribution_account_address( ) } +pub const MAX_RETRIES: usize = 5; +pub const FAIL_DELAY: Duration = Duration::from_millis(100); + +/// Returns unprocessed transactions, along with fail count +pub async fn sign_and_send_transactions_with_retries_multi_rpc( + signer: &Arc, + blockhash_rpc_client: &Arc, + rpc_clients: &Arc>>, + mut transactions: Vec, + max_loop_duration: Duration, +) -> (usize, usize) { + let error_count = Arc::new(AtomicUsize::default()); + let blockhash = Arc::new(RwLock::new( + blockhash_rpc_client + .get_latest_blockhash() + .await + .expect("fetch latest blockhash"), + )); + let transactions_receiver = { + let (transactions_sender, transactions_receiver) = crossbeam_channel::unbounded(); + let mut rng = rand::thread_rng(); + transactions.shuffle(&mut rng); // shuffle to avoid racing for the same order of txns as other claim-tip processes + transactions + .into_iter() + .for_each(|txn| transactions_sender.send(txn).unwrap()); + transactions_receiver + }; + let blockhash_refresh_handle = { + let blockhash_rpc_client = blockhash_rpc_client.clone(); + let blockhash = blockhash.clone(); + let transactions_receiver = transactions_receiver.clone(); + tokio::spawn(async move { + let start = Instant::now(); + let mut last_blockhash_update = Instant::now(); + while start.elapsed() < max_loop_duration && !transactions_receiver.is_empty() { + // ensure we always have a recent blockhash + if last_blockhash_update.elapsed() > Duration::from_secs(2) { + let hash = blockhash_rpc_client + .get_latest_blockhash() + .await + .expect("fetch latest blockhash"); + info!( + "Got blockhash {hash:?}. Sending {} transactions to claim mev tips.", + transactions_receiver.len() + ); + *blockhash.write().await = hash; + last_blockhash_update = Instant::now(); + } + } + + info!( + "Exited blockhash refresh thread. {} transactions remain.", + transactions_receiver.len() + ); + transactions_receiver.len() + }) + }; + let send_handles = rpc_clients + .iter() + .map(|rpc_client| { + let signer = signer.clone(); + let transactions_receiver = transactions_receiver.clone(); + let rpc_client = rpc_client.clone(); + let error_count = error_count.clone(); + let blockhash = blockhash.clone(); + tokio::spawn(async move { + let mut iterations = 0; + while let Ok(txn) = transactions_receiver.recv() { + let mut retries = 0; + while retries < MAX_RETRIES { + iterations += 1; + let (_signed_txn, res) = + signed_send(&signer, &rpc_client, *blockhash.read().await, txn.clone()) + .await; + match res { + Ok(_) => break, + Err(_) => { + retries += 1; + error_count.fetch_add(1, Ordering::Relaxed); + tokio::time::sleep(FAIL_DELAY).await; + } + } + } + } + + info!("Exited send thread. Ran {iterations} times."); + }) + }) + .collect_vec(); + + for handle in send_handles { + if let Err(e) = handle.await { + warn!("Error joining handle: {e:?}") + } + } + let remaining_transaction_count = blockhash_refresh_handle.await.unwrap(); + ( + remaining_transaction_count, + error_count.load(Ordering::Relaxed), + ) +} + pub async fn sign_and_send_transactions_with_retries( signer: &Keypair, rpc_client: &RpcClient, + max_concurrent_rpc_get_reqs: usize, transactions: Vec, - max_retry_duration: Duration, -) -> HashMap { - use tokio::sync::Semaphore; - const MAX_CONCURRENT_RPC_CALLS: usize = 50; - let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_RPC_CALLS)); - + txn_send_batch_size: usize, + max_loop_duration: Duration, +) -> (Vec, HashMap) { + let semaphore = Arc::new(Semaphore::new(max_concurrent_rpc_get_reqs)); let mut errors = HashMap::default(); let mut blockhash = rpc_client .get_latest_blockhash() .await .expect("fetch latest blockhash"); - - let mut signatures_to_transactions = transactions + // track unsigned txns + let mut transactions_to_process = transactions .into_iter() - .map(|mut tx| { - tx.sign(&[signer], blockhash); - (tx.signatures[0], tx) - }) - .collect::>(); + .map(|txn| (txn.message_data(), txn)) + .collect::, Transaction>>(); let start = Instant::now(); - while start.elapsed() < max_retry_duration && !signatures_to_transactions.is_empty() { - if start.elapsed() > Duration::from_secs(60) { + while start.elapsed() < max_loop_duration && !transactions_to_process.is_empty() { + // ensure we always have a recent blockhash + // blockhashes last max 150 blocks + // finalized commitment is ~32 slots behind tip + // assuming 0% skip rate (every slot has a block), we’d have roughly 120 slots + // or (120*0.4s) = 48s to land a tx before it expires + // if we’re refreshing every 30s, then any txs sent immediately before the refresh would likely expire + if start.elapsed() > Duration::from_secs(1) { blockhash = rpc_client .get_latest_blockhash() .await .expect("fetch latest blockhash"); - signatures_to_transactions - .iter_mut() - .for_each(|(_sig, tx)| { - *tx = Transaction::new_unsigned(tx.message.clone()); - tx.sign(&[signer], blockhash); - }); } + info!( + "Sending {txn_send_batch_size} of {} transactions to claim mev tips", + transactions_to_process.len() + ); + let send_futs = transactions_to_process + .iter() + .take(txn_send_batch_size) + .map(|(hash, txn)| { + let semaphore = semaphore.clone(); + async move { + let _permit = semaphore.acquire_owned().await.unwrap(); // wait until our turn + let (txn, res) = signed_send(signer, rpc_client, blockhash, txn.clone()).await; + (hash.clone(), txn, res) + } + }); - let futs = signatures_to_transactions.iter().map(|(sig, tx)| { - let semaphore = semaphore.clone(); - async move { - let permit = semaphore.clone().acquire_owned().await.unwrap(); - let res = match rpc_client.send_transaction(tx).await { - Ok(sig) => { - info!("sent transaction: {sig:?}"); - drop(permit); - sleep(Duration::from_secs(10)).await; - - let _permit = semaphore.acquire_owned().await.unwrap(); - match rpc_client.confirm_transaction(&sig).await { - Ok(true) => Ok(()), - Ok(false) => Err(Error::new_with_request( - ErrorKind::Custom("transaction failed to confirm".to_string()), - RpcRequest::SendTransaction, - )), - Err(e) => Err(e), - } - } - Err(e) => Err(e), - }; - - let res = res - .err() - .map(|e| { - if let ErrorKind::TransactionError(AlreadyProcessed) = e.kind { - Ok(()) - } else { - error!("error sending transaction {sig:?} error: {e:?}"); - Err(e) - } - }) - .unwrap_or(Ok(())); - - (*sig, res) - } - }); - - errors = futures::future::join_all(futs) - .await + let send_res = futures::future::join_all(send_futs).await; + let new_errors = send_res .into_iter() - .filter(|(sig, result)| { - if result.is_err() { - true - } else { - let _ = signatures_to_transactions.remove(sig); - false + .filter_map(|(hash, txn, result)| match result { + Err(e) => Some((txn.signatures[0], e)), + Ok(..) => { + let _ = transactions_to_process.remove(&hash); + None } }) - .map(|(sig, result)| { - let e = result.err().unwrap(); - warn!("error sending transaction: [error={e}, signature={sig}]"); - (sig, e) - }) .collect::>(); + + errors.extend(new_errors); } - errors + (transactions_to_process.values().cloned().collect(), errors) +} + +/// Just in time sign and send transaction to RPC +async fn signed_send( + signer: &Keypair, + rpc_client: &RpcClient, + blockhash: Hash, + mut txn: Transaction, +) -> (Transaction, solana_rpc_client_api::client_error::Result<()>) { + txn.sign(&[signer], blockhash); // just in time signing + let res = match rpc_client.send_and_confirm_transaction(&txn).await { + Ok(_) => Ok(()), + Err(e) => { + match e.kind { + // Already claimed, skip. + ErrorKind::TransactionError(TransactionError::AlreadyProcessed) + | ErrorKind::TransactionError(TransactionError::InstructionError( + 0, + InstructionError::Custom(0), + )) + | ErrorKind::RpcError(RpcError::RpcResponseError { + data: + RpcResponseErrorData::SendTransactionPreflightFailure( + RpcSimulateTransactionResult { + err: + Some(TransactionError::InstructionError( + 0, + InstructionError::Custom(0), + )), + .. + }, + ), + .. + }) => Ok(()), + + // transaction got held up too long and blockhash expired. retry txn + ErrorKind::TransactionError(TransactionError::BlockhashNotFound) => Err(e), + + // unexpected error, warn and retry + _ => { + error!( + "Error sending transaction. Signature: {}, Error: {e:?}", + txn.signatures[0] + ); + Err(e) + } + } + } + }; + + (txn, res) } mod pubkey_string_conversion { @@ -583,7 +714,7 @@ mod pubkey_string_conversion { } } -pub(crate) fn read_json_from_file(path: &PathBuf) -> serde_json::Result +pub fn read_json_from_file(path: &PathBuf) -> serde_json::Result where T: DeserializeOwned, { @@ -885,3 +1016,49 @@ mod tests { }); } } + +/// Fetch accounts in parallel batches with retries. +async fn get_batched_accounts( + rpc_client: &RpcClient, + max_concurrent_rpc_get_reqs: usize, + pubkeys: Vec, +) -> solana_rpc_client_api::client_error::Result>> { + let semaphore = Arc::new(Semaphore::new(max_concurrent_rpc_get_reqs)); + let futs = pubkeys.chunks(MAX_MULTIPLE_ACCOUNTS).map(|pubkeys| { + let semaphore = semaphore.clone(); + + async move { + let _permit = semaphore.acquire_owned().await.unwrap(); // wait until our turn + let mut retries = 0; + loop { + match rpc_client.get_multiple_accounts(pubkeys).await { + Ok(accts) => return Ok(accts), + Err(e) => { + retries += 1; + if retries == MAX_RETRIES { + datapoint_error!( + "claim_mev_workflow-get_batched_accounts_error", + ("pubkeys", format!("{pubkeys:?}"), String), + ("error", 1, i64), + ("err_type", "fetch_account", String), + ("err_str", e.to_string(), String) + ); + return Err(e); + } + tokio::time::sleep(FAIL_DELAY).await; + } + } + } + } + }); + + let claimant_accounts = futures::future::join_all(futs) + .await + .into_iter() + .collect::>>>>()? // fail on single error + .into_iter() + .flatten() + .collect_vec(); + + Ok(pubkeys.into_iter().zip(claimant_accounts).collect()) +} diff --git a/tip-distributor/src/merkle_root_upload_workflow.rs b/tip-distributor/src/merkle_root_upload_workflow.rs index cc75797f05..e40465581f 100644 --- a/tip-distributor/src/merkle_root_upload_workflow.rs +++ b/tip-distributor/src/merkle_root_upload_workflow.rs @@ -38,6 +38,8 @@ pub fn upload_merkle_root( keypair_path: &PathBuf, rpc_url: &str, tip_distribution_program_id: &Pubkey, + max_concurrent_rpc_get_reqs: usize, + txn_send_batch_size: usize, ) -> Result<(), MerkleRootUploadError> { const MAX_RETRY_DURATION: Duration = Duration::from_secs(600); @@ -124,9 +126,11 @@ pub fn upload_merkle_root( ) }) .collect(); - let failed_transactions = sign_and_send_transactions_with_retries(&keypair, &rpc_client, transactions, MAX_RETRY_DURATION).await; - if !failed_transactions.is_empty() { - panic!("failed to send {} transactions", failed_transactions.len()); + + let (to_process, failed_transactions) = sign_and_send_transactions_with_retries( + &keypair, &rpc_client, max_concurrent_rpc_get_reqs, transactions, txn_send_batch_size, MAX_RETRY_DURATION).await; + if !to_process.is_empty() { + panic!("{} remaining mev claim transactions, {} failed requests.", to_process.len(), failed_transactions.len()); } }); diff --git a/tip-distributor/src/reclaim_rent_workflow.rs b/tip-distributor/src/reclaim_rent_workflow.rs index da8d6c6362..84c4e1350f 100644 --- a/tip-distributor/src/reclaim_rent_workflow.rs +++ b/tip-distributor/src/reclaim_rent_workflow.rs @@ -1,6 +1,10 @@ use { - crate::sign_and_send_transactions_with_retries, + crate::{ + claim_mev_workflow::ClaimMevError, reclaim_rent_workflow::ClaimMevError::AnchorError, + sign_and_send_transactions_with_retries_multi_rpc, + }, anchor_lang::AccountDeserialize, + itertools::Itertools, jito_tip_distribution::{ sdk::{ derive_config_account_address, @@ -14,47 +18,150 @@ use { }, log::info, solana_client::nonblocking::rpc_client::RpcClient, + solana_measure::measure, + solana_metrics::datapoint_info, solana_program::pubkey::Pubkey, solana_sdk::{ + commitment_config::CommitmentConfig, signature::{Keypair, Signer}, transaction::Transaction, }, std::{ - error::Error, + sync::Arc, time::{Duration, Instant}, }, }; +/// Clear old ClaimStatus accounts pub async fn reclaim_rent( - rpc_client: RpcClient, + rpc_url: String, + rpc_send_connection_count: u64, tip_distribution_program_id: Pubkey, - signer: Keypair, + signer: Arc, + max_loop_retries: u64, + max_loop_duration: Duration, // Optionally reclaim TipDistributionAccount rents on behalf of validators. should_reclaim_tdas: bool, -) -> Result<(), Box> { - info!("fetching program accounts..."); - let now = Instant::now(); - let accounts = rpc_client - .get_program_accounts(&tip_distribution_program_id) +) -> Result<(), ClaimMevError> { + let blockhash_rpc_client = Arc::new(RpcClient::new_with_timeout_and_commitment( + rpc_url.clone(), + Duration::from_secs(3 * 60), + CommitmentConfig::finalized(), + )); + let rpc_clients = Arc::new( + (0..rpc_send_connection_count) + .map(|_| { + Arc::new(RpcClient::new_with_commitment( + rpc_url.clone(), + CommitmentConfig::confirmed(), + )) + }) + .collect_vec(), + ); + let mut retries = 0; + let mut failed_transaction_count = 0usize; + let signer_pubkey = signer.pubkey(); + loop { + let (transactions, get_pa_elapsed, transaction_prepare_elaspsed) = build_transactions( + blockhash_rpc_client.clone(), + &tip_distribution_program_id, + &signer_pubkey, + should_reclaim_tdas, + ) .await?; + datapoint_info!( + "claim_mev_workflow-prepare_rent_reclaim_transactions", + ("attempt", retries, i64), + ("transaction_count", transactions.len(), i64), + ("account_fetch_latency_us", get_pa_elapsed.as_micros(), i64), + ( + "transaction_prepare_latency_us", + transaction_prepare_elaspsed.as_micros(), + i64 + ), + ); + let transactions_len = transactions.len(); + if transactions.is_empty() { + info!("Finished reclaim rent after {retries} retries, {failed_transaction_count} failed requests."); + return Ok(()); + } + + info!("Sending {} rent reclaim transactions", transactions.len()); + let send_start = Instant::now(); + let (remaining_transaction_count, new_failed_transaction_count) = + sign_and_send_transactions_with_retries_multi_rpc( + &signer, + &blockhash_rpc_client, + &rpc_clients, + transactions, + max_loop_duration, + ) + .await; + failed_transaction_count += new_failed_transaction_count; + + datapoint_info!( + "claim_mev_workflow-send_reclaim_rent_transactions", + ("attempt", retries, i64), + ("transaction_count", transactions_len, i64), + ( + "successful_transaction_count", + transactions_len - remaining_transaction_count, + i64 + ), + ( + "remaining_transaction_count", + remaining_transaction_count, + i64 + ), + ( + "failed_transaction_count", + new_failed_transaction_count, + i64 + ), + ("send_latency_us", send_start.elapsed().as_micros(), i64), + ); + + if retries >= max_loop_retries { + return Err(ClaimMevError::MaxSendTransactionRetriesExceeded { + attempts: max_loop_retries, + remaining_transaction_count, + failed_transaction_count, + }); + } + retries += 1; + } +} + +async fn build_transactions( + rpc_client: Arc, + tip_distribution_program_id: &Pubkey, + signer_pubkey: &Pubkey, + should_reclaim_tdas: bool, +) -> Result<(Vec, Duration, Duration), ClaimMevError> { + info!("Fetching program accounts"); + let (accounts, get_pa_elapsed) = measure!( + rpc_client + .get_program_accounts(tip_distribution_program_id) + .await? + ); info!( - "get_program_accounts took {}ms and fetched {} accounts", - now.elapsed().as_millis(), + "Fetch get_program_accounts took {:?} and fetched {} accounts", + get_pa_elapsed.as_duration(), accounts.len() ); - info!("fetching current_epoch..."); + info!("Fetching current_epoch"); let current_epoch = rpc_client.get_epoch_info().await?.epoch; - info!("current_epoch: {current_epoch}"); + info!("Fetch current_epoch: {current_epoch}"); - info!("fetching config_account..."); - let now = Instant::now(); - let config_pubkey = derive_config_account_address(&tip_distribution_program_id).0; - let config_account = rpc_client.get_account(&config_pubkey).await?; - let config_account: Config = Config::try_deserialize(&mut config_account.data.as_slice())?; - info!("fetch config_account took {}ms", now.elapsed().as_millis()); + info!("Fetching Config account"); + let config_pubkey = derive_config_account_address(tip_distribution_program_id).0; + let (config_account, elapsed) = measure!(rpc_client.get_account(&config_pubkey).await?); + info!("Fetch Config account took {:?}", elapsed.as_duration()); + let config_account: Config = + Config::try_deserialize(&mut config_account.data.as_slice()).map_err(AnchorError)?; - info!("filtering for claim_status accounts"); + info!("Filtering for ClaimStatus accounts"); let claim_status_accounts: Vec<(Pubkey, ClaimStatus)> = accounts .iter() .filter_map(|(pubkey, account)| { @@ -63,30 +170,22 @@ pub async fn reclaim_rent( }) .filter(|(_, claim_status): &(Pubkey, ClaimStatus)| { // Only return claim statuses that we've paid for and ones that are expired to avoid transaction failures. - claim_status.claim_status_payer == signer.pubkey() + claim_status.claim_status_payer.eq(signer_pubkey) && current_epoch > claim_status.expires_at }) .collect::>(); info!( - "{} claim_status accounts eligible for rent reclaim", + "{} ClaimStatus accounts eligible for rent reclaim", claim_status_accounts.len() ); - info!("fetching recent_blockhash"); - let now = Instant::now(); - let recent_blockhash = rpc_client.get_latest_blockhash().await?; - info!( - "fetch recent_blockhash took {}ms, hash={recent_blockhash:?}", - now.elapsed().as_millis() - ); - - info!("creating close_claim_status_account transactions"); - let now = Instant::now(); + info!("Creating CloseClaimStatusAccounts transactions"); + let transaction_now = Instant::now(); let mut transactions = claim_status_accounts .into_iter() .map(|(claim_status_pubkey, claim_status)| { close_claim_status_ix( - tip_distribution_program_id, + *tip_distribution_program_id, CloseClaimStatusArgs, CloseClaimStatusAccounts { config: config_pubkey, @@ -97,71 +196,51 @@ pub async fn reclaim_rent( }) .collect::>() .chunks(4) - .map(|instructions| { - Transaction::new_signed_with_payer( - instructions, - Some(&signer.pubkey()), - &[&signer], - recent_blockhash, - ) - }) + .map(|instructions| Transaction::new_with_payer(instructions, Some(signer_pubkey))) .collect::>(); info!( - "create close_claim_status_account transactions took {}us", - now.elapsed().as_micros() + "Create CloseClaimStatusAccounts transactions took {:?}", + transaction_now.elapsed() ); if should_reclaim_tdas { - let tip_distribution_accounts = accounts + info!("Creating CloseTipDistributionAccounts transactions"); + let now = Instant::now(); + let close_tda_txs = accounts .into_iter() .filter_map(|(pubkey, account)| { let tda = TipDistributionAccount::try_deserialize(&mut account.data.as_slice()).ok()?; Some((pubkey, tda)) }) - .filter(|(_, tda): &(Pubkey, TipDistributionAccount)| current_epoch > tda.expires_at); - - info!("creating close_tip_distribution_account transactions"); - let now = Instant::now(); - let close_tda_txs = tip_distribution_accounts - .map( - |(tip_distribution_account, tda): (Pubkey, TipDistributionAccount)| { - close_tip_distribution_account_ix( - tip_distribution_program_id, - CloseTipDistributionAccountArgs { - _epoch: tda.epoch_created_at, - }, - CloseTipDistributionAccounts { - config: config_pubkey, - tip_distribution_account, - validator_vote_account: tda.validator_vote_account, - expired_funds_account: config_account.expired_funds_account, - signer: signer.pubkey(), - }, - ) - }, - ) + .filter(|(_, tda): &(Pubkey, TipDistributionAccount)| current_epoch > tda.expires_at) + .map(|(tip_distribution_account, tda)| { + close_tip_distribution_account_ix( + *tip_distribution_program_id, + CloseTipDistributionAccountArgs { + _epoch: tda.epoch_created_at, + }, + CloseTipDistributionAccounts { + config: config_pubkey, + tip_distribution_account, + validator_vote_account: tda.validator_vote_account, + expired_funds_account: config_account.expired_funds_account, + signer: *signer_pubkey, + }, + ) + }) .collect::>() .chunks(4) - .map(|instructions| Transaction::new_with_payer(instructions, Some(&signer.pubkey()))) + .map(|instructions| Transaction::new_with_payer(instructions, Some(signer_pubkey))) .collect::>(); - info!("create close_tip_distribution_account transactions took {}us, closing {} tip distribution accounts", now.elapsed().as_micros(), close_tda_txs.len()); + info!("Create CloseTipDistributionAccounts transactions took {:?}, closing {} tip distribution accounts", now.elapsed(), close_tda_txs.len()); transactions.extend(close_tda_txs); } - - info!("sending {} transactions", transactions.len()); - let failed_txs = sign_and_send_transactions_with_retries( - &signer, - &rpc_client, + Ok(( transactions, - Duration::from_secs(300), - ) - .await; - if !failed_txs.is_empty() { - panic!("failed to send {} transactions", failed_txs.len()); - } - - Ok(()) + get_pa_elapsed.as_duration(), + transaction_now.elapsed(), + )) }