Skip to content

Commit

Permalink
Fix Buildkite warnings (#437)
Browse files Browse the repository at this point in the history
  • Loading branch information
esemeniuc authored Nov 10, 2023
1 parent 2a8d046 commit 2f377b0
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 97 deletions.
3 changes: 2 additions & 1 deletion tip-distributor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version = { workspace = true }
edition = { workspace = true }
license = { workspace = true }
description = "Collection of binaries used to distribute MEV rewards to delegators and validators."
publish = false

[dependencies]
anchor-lang = { workspace = true }
Expand All @@ -24,8 +25,8 @@ serde_json = { workspace = true }
solana-client = { workspace = true }
solana-genesis-utils = { workspace = true }
solana-ledger = { workspace = true }
solana-merkle-tree = { workspace = true }
solana-measure = { workspace = true }
solana-merkle-tree = { workspace = true }
solana-metrics = { workspace = true }
solana-program = { workspace = true }
solana-program-runtime = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion tip-distributor/src/bin/claim-mev-tips.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
///! This binary claims MEV tips.
//! This binary claims MEV tips.
use {
clap::Parser,
gethostname::gethostname,
Expand Down
80 changes: 41 additions & 39 deletions tip-distributor/src/claim_mev_workflow.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use {
crate::{
claim_mev_workflow::ClaimMevError::{ClaimantNotFound, InsufficientBalance, TDANotFound},
sign_and_send_transactions_with_retries_multi_rpc, GeneratedMerkleTreeCollection, TreeNode,
minimum_balance, sign_and_send_transactions_with_retries_multi_rpc,
GeneratedMerkleTreeCollection, TreeNode,
},
anchor_lang::{AccountDeserialize, InstructionData, ToAccountMetas},
itertools::Itertools,
Expand All @@ -11,7 +12,7 @@ use {
solana_metrics::{datapoint_info, datapoint_warn},
solana_program::{
fee_calculator::DEFAULT_TARGET_LAMPORTS_PER_SIGNATURE, native_token::LAMPORTS_PER_SOL,
stake::state::StakeState, system_program,
system_program,
},
solana_sdk::{
account::Account,
Expand Down Expand Up @@ -99,10 +100,6 @@ pub async fn claim_mev_tips(
.iter()
.flat_map(|tree| &tree.tree_nodes)
.collect_vec();
let stake_acct_min_rent = blockhash_rpc_client
.get_minimum_balance_for_rent_exemption(StakeState::size_of())
.await
.expect("Failed to calculate min rent");

// fetch all accounts up front
info!(
Expand All @@ -122,20 +119,17 @@ pub async fn claim_mev_tips(
.map_err(ClaimMevError::MaxFetchRetriesExceeded)?
.into_iter()
.filter_map(|(pubkey, maybe_account)| {
let account = match maybe_account {
Some(account) => account,
None => {
datapoint_warn!(
"claim_mev_workflow-account_error",
("epoch", merkle_trees.epoch, i64),
("pubkey", pubkey.to_string(), String),
("account_type", "tip_distribution_account", String),
("error", 1, i64),
("err_type", "fetch", String),
("err_str", "Failed to fetch TipDistributionAccount", String)
);
return None;
}
let Some(account) = maybe_account else {
datapoint_warn!(
"claim_mev_workflow-account_error",
("epoch", merkle_trees.epoch, i64),
("pubkey", pubkey.to_string(), String),
("account_type", "tip_distribution_account", String),
("error", 1, i64),
("err_type", "fetch", String),
("err_str", "Failed to fetch TipDistributionAccount", String)
);
return None;
};

let account = match TipDistributionAccount::try_deserialize(&mut account.data.as_slice()) {
Expand All @@ -157,7 +151,7 @@ pub async fn claim_mev_tips(
})
.collect::<HashMap<Pubkey, TipDistributionAccount>>();

// track balances only
// track balances and account len to make sure account is rent-exempt after transfer
let claimants = crate::get_batched_accounts(
&blockhash_rpc_client,
max_concurrent_rpc_get_reqs,
Expand All @@ -173,11 +167,11 @@ pub async fn claim_mev_tips(
(
pubkey,
maybe_account
.map(|account| account.lamports)
.map(|account| (account.lamports, account.data.len()))
.unwrap_or_default(),
)
})
.collect::<HashMap<Pubkey, u64>>();
.collect::<HashMap<Pubkey, (u64, usize)>>();

// Refresh claimants + Try sending txns to RPC
let mut retries = 0;
Expand Down Expand Up @@ -207,7 +201,6 @@ pub async fn claim_mev_tips(
&merkle_trees,
&payer_pubkey,
&tree_nodes,
stake_acct_min_rent,
&tdas,
&claimants,
&claim_statuses,
Expand Down Expand Up @@ -269,7 +262,8 @@ pub async fn claim_mev_tips(
max_loop_duration,
)
.await;
failed_transaction_count += new_failed_transaction_count;
failed_transaction_count =
failed_transaction_count.saturating_add(new_failed_transaction_count);

datapoint_info!(
"claim_mev_workflow-send_transactions",
Expand All @@ -278,7 +272,7 @@ pub async fn claim_mev_tips(
("transaction_count", transactions_len, i64),
(
"successful_transaction_count",
transactions_len - remaining_transaction_count,
transactions_len.saturating_sub(remaining_transaction_count),
i64
),
(
Expand All @@ -301,7 +295,7 @@ pub async fn claim_mev_tips(
failed_transaction_count,
});
}
retries += 1;
retries = retries.saturating_add(1);
}
}

Expand All @@ -311,11 +305,19 @@ fn build_transactions(
merkle_trees: &GeneratedMerkleTreeCollection,
payer_pubkey: &Pubkey,
tree_nodes: &[&TreeNode],
stake_acct_min_rent: u64,
tdas: &HashMap<Pubkey, TipDistributionAccount>,
claimants: &HashMap<Pubkey, u64>,
claimants: &HashMap<Pubkey, (u64 /* lamports */, usize /* allocated bytes */)>,
claim_statuses: &HashMap<Pubkey, Option<Account>>,
) -> Result<(usize, usize, usize, usize, Vec<Transaction>), ClaimMevError> {
) -> Result<
(
usize, /* skipped_merkle_root_count */
usize, /* zero_lamports_count */
usize, /* already_claimed_count */
usize, /* below_min_rent_count */
Vec<Transaction>,
),
ClaimMevError,
> {
let tip_distribution_config =
Pubkey::find_program_address(&[Config::SEED], tip_distribution_program_id).0;
let mut skipped_merkle_root_count: usize = 0;
Expand All @@ -327,9 +329,9 @@ fn build_transactions(

// prepare instructions to transfer to all claimants
for tree in &merkle_trees.generated_merkle_trees {
let fetched_tip_distribution_account = match tdas.get(&tree.tip_distribution_account) {
Some(account) => account,
None => return Err(TDANotFound(tree.tip_distribution_account)),
let Some(fetched_tip_distribution_account) = tdas.get(&tree.tip_distribution_account)
else {
return Err(TDANotFound(tree.tip_distribution_account));
};
// only claim for ones that have merkle root on-chain
if fetched_tip_distribution_account.merkle_root.is_none() {
Expand Down Expand Up @@ -359,16 +361,16 @@ fn build_transactions(
}
None => return Err(ClaimantNotFound(node.claim_status_pubkey)),
};
let current_balance = match claimants.get(&node.claimant) {
Some(balance) => balance,
None => return Err(ClaimantNotFound(node.claimant)),
let Some((current_balance, allocated_bytes)) = claimants.get(&node.claimant) else {
return Err(ClaimantNotFound(node.claimant));
};

// some older accounts can be rent-paying
// any new transfers will need to make the account rent-exempt (runtime enforced)
let balance_with_tip = current_balance.checked_add(node.amount).unwrap();
if balance_with_tip < stake_acct_min_rent {
debug!("Current balance + tip claim amount of {balance_with_tip} is less than required rent-exempt of {stake_acct_min_rent} for pubkey: {}. Skipping.", node.claimant);
let new_balance = current_balance.checked_add(node.amount).unwrap();
let minimum_rent = minimum_balance(*allocated_bytes);
if new_balance < minimum_rent {
debug!("Current balance + claim amount of {new_balance} is less than required rent-exempt of {minimum_rent} for pubkey: {}. Skipping.", node.claimant);
below_min_rent_count = below_min_rent_count.checked_add(1).unwrap();
continue;
}
Expand Down
123 changes: 71 additions & 52 deletions tip-distributor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ use {
solana_client::{nonblocking::rpc_client::RpcClient, rpc_client::RpcClient as SyncRpcClient},
solana_merkle_tree::MerkleTree,
solana_metrics::{datapoint_error, datapoint_warn},
solana_program::instruction::InstructionError,
solana_program::{
instruction::InstructionError,
rent::{
ACCOUNT_STORAGE_OVERHEAD, DEFAULT_EXEMPTION_THRESHOLD, DEFAULT_LAMPORTS_PER_BYTE_YEAR,
},
},
solana_rpc_client_api::{
client_error::{Error, ErrorKind},
request::{RpcError, RpcResponseErrorData, MAX_MULTIPLE_ACCOUNTS},
Expand Down Expand Up @@ -480,7 +485,10 @@ pub async fn sign_and_send_transactions_with_retries_multi_rpc(
rpc_clients: &Arc<Vec<Arc<RpcClient>>>,
mut transactions: Vec<Transaction>,
max_loop_duration: Duration,
) -> (usize, usize) {
) -> (
usize, /* remaining txn count */
usize, /* failed txn count */
) {
let error_count = Arc::new(AtomicUsize::default());
let blockhash = Arc::new(RwLock::new(
blockhash_rpc_client
Expand Down Expand Up @@ -536,18 +544,18 @@ pub async fn sign_and_send_transactions_with_retries_multi_rpc(
let error_count = error_count.clone();
let blockhash = blockhash.clone();
tokio::spawn(async move {
let mut iterations = 0;
let mut iterations = 0usize;
while let Ok(txn) = transactions_receiver.recv() {
let mut retries = 0;
let mut retries = 0usize;
while retries < MAX_RETRIES {
iterations += 1;
iterations = iterations.saturating_add(1);
let (_signed_txn, res) =
signed_send(&signer, &rpc_client, *blockhash.read().await, txn.clone())
.await;
match res {
Ok(_) => break,
Err(_) => {
retries += 1;
retries = retries.saturating_add(1);
error_count.fetch_add(1, Ordering::Relaxed);
tokio::time::sleep(FAIL_DELAY).await;
}
Expand Down Expand Up @@ -691,6 +699,63 @@ async fn signed_send(
(txn, res)
}

/// Fetch accounts in parallel batches with retries.
async fn get_batched_accounts(
rpc_client: &RpcClient,
max_concurrent_rpc_get_reqs: usize,
pubkeys: Vec<Pubkey>,
) -> solana_rpc_client_api::client_error::Result<HashMap<Pubkey, Option<Account>>> {
let semaphore = Arc::new(Semaphore::new(max_concurrent_rpc_get_reqs));
let futs = pubkeys.chunks(MAX_MULTIPLE_ACCOUNTS).map(|pubkeys| {
let semaphore = semaphore.clone();

async move {
let _permit = semaphore.acquire_owned().await.unwrap(); // wait until our turn
let mut retries = 0usize;
loop {
match rpc_client.get_multiple_accounts(pubkeys).await {
Ok(accts) => return Ok(accts),
Err(e) => {
retries = retries.saturating_add(1);
if retries == MAX_RETRIES {
datapoint_error!(
"claim_mev_workflow-get_batched_accounts_error",
("pubkeys", format!("{pubkeys:?}"), String),
("error", 1, i64),
("err_type", "fetch_account", String),
("err_str", e.to_string(), String)
);
return Err(e);
}
tokio::time::sleep(FAIL_DELAY).await;
}
}
}
}
});

let claimant_accounts = futures::future::join_all(futs)
.await
.into_iter()
.collect::<solana_rpc_client_api::client_error::Result<Vec<Vec<Option<Account>>>>>()? // fail on single error
.into_iter()
.flatten()
.collect_vec();

Ok(pubkeys.into_iter().zip(claimant_accounts).collect())
}

/// Calculates the minimum balance needed to be rent-exempt
/// taken from: https://github.com/jito-foundation/jito-solana/blob/d1ba42180d0093dd59480a77132477323a8e3f88/sdk/program/src/rent.rs#L78
pub fn minimum_balance(data_len: usize) -> u64 {
((((ACCOUNT_STORAGE_OVERHEAD
.checked_add(data_len as u64)
.unwrap())
.checked_mul(DEFAULT_LAMPORTS_PER_BYTE_YEAR))
.unwrap() as f64)
* DEFAULT_EXEMPTION_THRESHOLD) as u64
}

mod pubkey_string_conversion {
use {
serde::{self, Deserialize, Deserializer, Serializer},
Expand Down Expand Up @@ -1016,49 +1081,3 @@ mod tests {
});
}
}

/// Fetch accounts in parallel batches with retries.
async fn get_batched_accounts(
rpc_client: &RpcClient,
max_concurrent_rpc_get_reqs: usize,
pubkeys: Vec<Pubkey>,
) -> solana_rpc_client_api::client_error::Result<HashMap<Pubkey, Option<Account>>> {
let semaphore = Arc::new(Semaphore::new(max_concurrent_rpc_get_reqs));
let futs = pubkeys.chunks(MAX_MULTIPLE_ACCOUNTS).map(|pubkeys| {
let semaphore = semaphore.clone();

async move {
let _permit = semaphore.acquire_owned().await.unwrap(); // wait until our turn
let mut retries = 0;
loop {
match rpc_client.get_multiple_accounts(pubkeys).await {
Ok(accts) => return Ok(accts),
Err(e) => {
retries += 1;
if retries == MAX_RETRIES {
datapoint_error!(
"claim_mev_workflow-get_batched_accounts_error",
("pubkeys", format!("{pubkeys:?}"), String),
("error", 1, i64),
("err_type", "fetch_account", String),
("err_str", e.to_string(), String)
);
return Err(e);
}
tokio::time::sleep(FAIL_DELAY).await;
}
}
}
}
});

let claimant_accounts = futures::future::join_all(futs)
.await
.into_iter()
.collect::<solana_rpc_client_api::client_error::Result<Vec<Vec<Option<Account>>>>>()? // fail on single error
.into_iter()
.flatten()
.collect_vec();

Ok(pubkeys.into_iter().zip(claimant_accounts).collect())
}
Loading

0 comments on commit 2f377b0

Please sign in to comment.