Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Buildkite warnings #437

Merged
merged 13 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tip-distributor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version = { workspace = true }
edition = { workspace = true }
license = { workspace = true }
description = "Collection of binaries used to distribute MEV rewards to delegators and validators."
publish = false

[dependencies]
anchor-lang = { workspace = true }
Expand All @@ -24,8 +25,8 @@ serde_json = { workspace = true }
solana-client = { workspace = true }
solana-genesis-utils = { workspace = true }
solana-ledger = { workspace = true }
solana-merkle-tree = { workspace = true }
solana-measure = { workspace = true }
solana-merkle-tree = { workspace = true }
solana-metrics = { workspace = true }
solana-program = { workspace = true }
solana-program-runtime = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion tip-distributor/src/bin/claim-mev-tips.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
///! This binary claims MEV tips.
//! This binary claims MEV tips.
use {
clap::Parser,
gethostname::gethostname,
Expand Down
80 changes: 41 additions & 39 deletions tip-distributor/src/claim_mev_workflow.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use {
crate::{
claim_mev_workflow::ClaimMevError::{ClaimantNotFound, InsufficientBalance, TDANotFound},
sign_and_send_transactions_with_retries_multi_rpc, GeneratedMerkleTreeCollection, TreeNode,
minimum_balance, sign_and_send_transactions_with_retries_multi_rpc,
GeneratedMerkleTreeCollection, TreeNode,
},
anchor_lang::{AccountDeserialize, InstructionData, ToAccountMetas},
itertools::Itertools,
Expand All @@ -11,7 +12,7 @@ use {
solana_metrics::{datapoint_info, datapoint_warn},
solana_program::{
fee_calculator::DEFAULT_TARGET_LAMPORTS_PER_SIGNATURE, native_token::LAMPORTS_PER_SOL,
stake::state::StakeState, system_program,
system_program,
},
solana_sdk::{
account::Account,
Expand Down Expand Up @@ -99,10 +100,6 @@ pub async fn claim_mev_tips(
.iter()
.flat_map(|tree| &tree.tree_nodes)
.collect_vec();
let stake_acct_min_rent = blockhash_rpc_client
.get_minimum_balance_for_rent_exemption(StakeState::size_of())
.await
.expect("Failed to calculate min rent");

// fetch all accounts up front
info!(
Expand All @@ -122,20 +119,17 @@ pub async fn claim_mev_tips(
.map_err(ClaimMevError::MaxFetchRetriesExceeded)?
.into_iter()
.filter_map(|(pubkey, maybe_account)| {
let account = match maybe_account {
Some(account) => account,
None => {
datapoint_warn!(
"claim_mev_workflow-account_error",
("epoch", merkle_trees.epoch, i64),
("pubkey", pubkey.to_string(), String),
("account_type", "tip_distribution_account", String),
("error", 1, i64),
("err_type", "fetch", String),
("err_str", "Failed to fetch TipDistributionAccount", String)
);
return None;
}
let Some(account) = maybe_account else {
datapoint_warn!(
"claim_mev_workflow-account_error",
("epoch", merkle_trees.epoch, i64),
("pubkey", pubkey.to_string(), String),
("account_type", "tip_distribution_account", String),
("error", 1, i64),
("err_type", "fetch", String),
("err_str", "Failed to fetch TipDistributionAccount", String)
);
return None;
};

let account = match TipDistributionAccount::try_deserialize(&mut account.data.as_slice()) {
Expand All @@ -157,7 +151,7 @@ pub async fn claim_mev_tips(
})
.collect::<HashMap<Pubkey, TipDistributionAccount>>();

// track balances only
// track balances and account len to make sure account is rent-exempt after transfer
let claimants = crate::get_batched_accounts(
&blockhash_rpc_client,
max_concurrent_rpc_get_reqs,
Expand All @@ -173,11 +167,11 @@ pub async fn claim_mev_tips(
(
pubkey,
maybe_account
.map(|account| account.lamports)
.map(|account| (account.lamports, account.data.len()))
.unwrap_or_default(),
)
})
.collect::<HashMap<Pubkey, u64>>();
.collect::<HashMap<Pubkey, (u64, usize)>>();

// Refresh claimants + Try sending txns to RPC
let mut retries = 0;
Expand Down Expand Up @@ -207,7 +201,6 @@ pub async fn claim_mev_tips(
&merkle_trees,
&payer_pubkey,
&tree_nodes,
stake_acct_min_rent,
&tdas,
&claimants,
&claim_statuses,
Expand Down Expand Up @@ -269,7 +262,8 @@ pub async fn claim_mev_tips(
max_loop_duration,
)
.await;
failed_transaction_count += new_failed_transaction_count;
failed_transaction_count =
failed_transaction_count.saturating_add(new_failed_transaction_count);

datapoint_info!(
"claim_mev_workflow-send_transactions",
Expand All @@ -278,7 +272,7 @@ pub async fn claim_mev_tips(
("transaction_count", transactions_len, i64),
(
"successful_transaction_count",
transactions_len - remaining_transaction_count,
transactions_len.saturating_sub(remaining_transaction_count),
i64
),
(
Expand All @@ -301,7 +295,7 @@ pub async fn claim_mev_tips(
failed_transaction_count,
});
}
retries += 1;
retries = retries.saturating_add(1);
}
}

Expand All @@ -311,11 +305,19 @@ fn build_transactions(
merkle_trees: &GeneratedMerkleTreeCollection,
payer_pubkey: &Pubkey,
tree_nodes: &[&TreeNode],
stake_acct_min_rent: u64,
tdas: &HashMap<Pubkey, TipDistributionAccount>,
claimants: &HashMap<Pubkey, u64>,
claimants: &HashMap<Pubkey, (u64 /* lamports */, usize /* allocated bytes */)>,
claim_statuses: &HashMap<Pubkey, Option<Account>>,
) -> Result<(usize, usize, usize, usize, Vec<Transaction>), ClaimMevError> {
) -> Result<
(
usize, /* skipped_merkle_root_count */
usize, /* zero_lamports_count */
usize, /* already_claimed_count */
usize, /* below_min_rent_count */
Vec<Transaction>,
),
ClaimMevError,
> {
let tip_distribution_config =
Pubkey::find_program_address(&[Config::SEED], tip_distribution_program_id).0;
let mut skipped_merkle_root_count: usize = 0;
Expand All @@ -327,9 +329,9 @@ fn build_transactions(

// prepare instructions to transfer to all claimants
for tree in &merkle_trees.generated_merkle_trees {
let fetched_tip_distribution_account = match tdas.get(&tree.tip_distribution_account) {
Some(account) => account,
None => return Err(TDANotFound(tree.tip_distribution_account)),
let Some(fetched_tip_distribution_account) = tdas.get(&tree.tip_distribution_account)
else {
return Err(TDANotFound(tree.tip_distribution_account));
};
// only claim for ones that have merkle root on-chain
if fetched_tip_distribution_account.merkle_root.is_none() {
Expand Down Expand Up @@ -359,16 +361,16 @@ fn build_transactions(
}
None => return Err(ClaimantNotFound(node.claim_status_pubkey)),
};
let current_balance = match claimants.get(&node.claimant) {
Some(balance) => balance,
None => return Err(ClaimantNotFound(node.claimant)),
let Some((current_balance, allocated_bytes)) = claimants.get(&node.claimant) else {
return Err(ClaimantNotFound(node.claimant));
};

// some older accounts can be rent-paying
// any new transfers will need to make the account rent-exempt (runtime enforced)
let balance_with_tip = current_balance.checked_add(node.amount).unwrap();
if balance_with_tip < stake_acct_min_rent {
debug!("Current balance + tip claim amount of {balance_with_tip} is less than required rent-exempt of {stake_acct_min_rent} for pubkey: {}. Skipping.", node.claimant);
let new_balance = current_balance.checked_add(node.amount).unwrap();
let minimum_rent = minimum_balance(*allocated_bytes);
if new_balance < minimum_rent {
debug!("Current balance + claim amount of {new_balance} is less than required rent-exempt of {minimum_rent} for pubkey: {}. Skipping.", node.claimant);
below_min_rent_count = below_min_rent_count.checked_add(1).unwrap();
continue;
}
Expand Down
123 changes: 71 additions & 52 deletions tip-distributor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ use {
solana_client::{nonblocking::rpc_client::RpcClient, rpc_client::RpcClient as SyncRpcClient},
solana_merkle_tree::MerkleTree,
solana_metrics::{datapoint_error, datapoint_warn},
solana_program::instruction::InstructionError,
solana_program::{
instruction::InstructionError,
rent::{
ACCOUNT_STORAGE_OVERHEAD, DEFAULT_EXEMPTION_THRESHOLD, DEFAULT_LAMPORTS_PER_BYTE_YEAR,
},
},
solana_rpc_client_api::{
client_error::{Error, ErrorKind},
request::{RpcError, RpcResponseErrorData, MAX_MULTIPLE_ACCOUNTS},
Expand Down Expand Up @@ -480,7 +485,10 @@ pub async fn sign_and_send_transactions_with_retries_multi_rpc(
rpc_clients: &Arc<Vec<Arc<RpcClient>>>,
mut transactions: Vec<Transaction>,
max_loop_duration: Duration,
) -> (usize, usize) {
) -> (
usize, /* remaining txn count */
usize, /* failed txn count */
) {
let error_count = Arc::new(AtomicUsize::default());
let blockhash = Arc::new(RwLock::new(
blockhash_rpc_client
Expand Down Expand Up @@ -536,18 +544,18 @@ pub async fn sign_and_send_transactions_with_retries_multi_rpc(
let error_count = error_count.clone();
let blockhash = blockhash.clone();
tokio::spawn(async move {
let mut iterations = 0;
let mut iterations = 0usize;
while let Ok(txn) = transactions_receiver.recv() {
let mut retries = 0;
let mut retries = 0usize;
while retries < MAX_RETRIES {
iterations += 1;
iterations = iterations.saturating_add(1);
let (_signed_txn, res) =
signed_send(&signer, &rpc_client, *blockhash.read().await, txn.clone())
.await;
match res {
Ok(_) => break,
Err(_) => {
retries += 1;
retries = retries.saturating_add(1);
error_count.fetch_add(1, Ordering::Relaxed);
tokio::time::sleep(FAIL_DELAY).await;
}
Expand Down Expand Up @@ -691,6 +699,63 @@ async fn signed_send(
(txn, res)
}

/// Fetch accounts in parallel batches with retries.
async fn get_batched_accounts(
rpc_client: &RpcClient,
max_concurrent_rpc_get_reqs: usize,
pubkeys: Vec<Pubkey>,
) -> solana_rpc_client_api::client_error::Result<HashMap<Pubkey, Option<Account>>> {
let semaphore = Arc::new(Semaphore::new(max_concurrent_rpc_get_reqs));
let futs = pubkeys.chunks(MAX_MULTIPLE_ACCOUNTS).map(|pubkeys| {
let semaphore = semaphore.clone();

async move {
let _permit = semaphore.acquire_owned().await.unwrap(); // wait until our turn
let mut retries = 0usize;
loop {
match rpc_client.get_multiple_accounts(pubkeys).await {
Ok(accts) => return Ok(accts),
Err(e) => {
retries = retries.saturating_add(1);
if retries == MAX_RETRIES {
datapoint_error!(
"claim_mev_workflow-get_batched_accounts_error",
("pubkeys", format!("{pubkeys:?}"), String),
("error", 1, i64),
("err_type", "fetch_account", String),
("err_str", e.to_string(), String)
);
return Err(e);
}
tokio::time::sleep(FAIL_DELAY).await;
}
}
}
}
});

let claimant_accounts = futures::future::join_all(futs)
.await
.into_iter()
.collect::<solana_rpc_client_api::client_error::Result<Vec<Vec<Option<Account>>>>>()? // fail on single error
.into_iter()
.flatten()
.collect_vec();

Ok(pubkeys.into_iter().zip(claimant_accounts).collect())
}

/// Calculates the minimum balance needed to be rent-exempt
/// taken from: https://github.com/jito-foundation/jito-solana/blob/d1ba42180d0093dd59480a77132477323a8e3f88/sdk/program/src/rent.rs#L78
pub fn minimum_balance(data_len: usize) -> u64 {
((((ACCOUNT_STORAGE_OVERHEAD
.checked_add(data_len as u64)
.unwrap())
.checked_mul(DEFAULT_LAMPORTS_PER_BYTE_YEAR))
.unwrap() as f64)
* DEFAULT_EXEMPTION_THRESHOLD) as u64
}

mod pubkey_string_conversion {
use {
serde::{self, Deserialize, Deserializer, Serializer},
Expand Down Expand Up @@ -1016,49 +1081,3 @@ mod tests {
});
}
}

/// Fetch accounts in parallel batches with retries.
async fn get_batched_accounts(
rpc_client: &RpcClient,
max_concurrent_rpc_get_reqs: usize,
pubkeys: Vec<Pubkey>,
) -> solana_rpc_client_api::client_error::Result<HashMap<Pubkey, Option<Account>>> {
let semaphore = Arc::new(Semaphore::new(max_concurrent_rpc_get_reqs));
let futs = pubkeys.chunks(MAX_MULTIPLE_ACCOUNTS).map(|pubkeys| {
let semaphore = semaphore.clone();

async move {
let _permit = semaphore.acquire_owned().await.unwrap(); // wait until our turn
let mut retries = 0;
loop {
match rpc_client.get_multiple_accounts(pubkeys).await {
Ok(accts) => return Ok(accts),
Err(e) => {
retries += 1;
if retries == MAX_RETRIES {
datapoint_error!(
"claim_mev_workflow-get_batched_accounts_error",
("pubkeys", format!("{pubkeys:?}"), String),
("error", 1, i64),
("err_type", "fetch_account", String),
("err_str", e.to_string(), String)
);
return Err(e);
}
tokio::time::sleep(FAIL_DELAY).await;
}
}
}
}
});

let claimant_accounts = futures::future::join_all(futs)
.await
.into_iter()
.collect::<solana_rpc_client_api::client_error::Result<Vec<Vec<Option<Account>>>>>()? // fail on single error
.into_iter()
.flatten()
.collect_vec();

Ok(pubkeys.into_iter().zip(claimant_accounts).collect())
}
Loading