Skip to content

Commit

Permalink
Run MEV claims + reclaiming rent-exempt amounts in parallel. (#582)
Browse files Browse the repository at this point in the history
  • Loading branch information
buffalu authored Jan 29, 2024
1 parent 1235c67 commit d3f235b
Show file tree
Hide file tree
Showing 7 changed files with 720 additions and 731 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ledger/src/bank_forks_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ pub fn load_bank_forks(
snapshot_utils::get_highest_incremental_snapshot_archive_info(
&snapshot_config.incremental_snapshot_archives_dir,
full_snapshot_archive_info.slot(),
None,
halt_at_slot,
);

Some((
Expand Down
1 change: 1 addition & 0 deletions tip-distributor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ solana-rpc-client-api = { workspace = true }
solana-runtime = { workspace = true }
solana-sdk = { workspace = true }
solana-stake-program = { workspace = true }
solana-transaction-status = { workspace = true }
solana-vote = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
Expand Down
199 changes: 106 additions & 93 deletions tip-distributor/src/bin/claim-mev-tips.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
//! This binary claims MEV tips.
use {
clap::Parser,
futures::future::join_all,
gethostname::gethostname,
log::*,
solana_metrics::{datapoint_error, datapoint_info, set_host_id},
solana_sdk::{pubkey::Pubkey, signature::read_keypair_file},
solana_sdk::{
pubkey::Pubkey,
signature::{read_keypair_file, Keypair},
},
solana_tip_distributor::{
claim_mev_workflow::{claim_mev_tips, ClaimMevError},
read_json_from_file,
Expand Down Expand Up @@ -37,21 +41,9 @@ struct Args {
#[arg(long, env)]
keypair_path: PathBuf,

/// Number of unique connections to the RPC server for sending txns
#[arg(long, env, default_value_t = 128)]
rpc_send_connection_count: u64,

/// Rate-limits the maximum number of GET requests per RPC connection
#[arg(long, env, default_value_t = 256)]
max_concurrent_rpc_get_reqs: usize,

/// Number of retries for main claim send loop. Loop is time bounded.
#[arg(long, env, default_value_t = 5)]
max_loop_retries: u64,

/// Limits how long before send loop runs before stopping. Defaults to 10 mins
#[arg(long, env, default_value_t = 10 * 60)]
max_loop_duration_secs: u64,
/// Limits how long before send loop runs before stopping
#[arg(long, env, default_value_t = 60 * 60)]
max_retry_duration_secs: u64,

/// Specifies whether to reclaim any rent.
#[arg(long, env, default_value_t = true)]
Expand All @@ -61,40 +53,28 @@ struct Args {
#[arg(long, env)]
should_reclaim_tdas: bool,

/// The price to pay per compute unit aka "Priority Fee".
/// The price to pay for priority fee
#[arg(long, env, default_value_t = 1)]
micro_lamports_per_compute_unit: u64,
micro_lamports: u64,
}

#[tokio::main]
async fn main() -> Result<(), ClaimMevError> {
env_logger::init();
gethostname()
.into_string()
.map(set_host_id)
.expect("set hostname");
let args: Args = Args::parse();
let keypair = Arc::new(read_keypair_file(&args.keypair_path).expect("read keypair file"));
let merkle_trees: GeneratedMerkleTreeCollection =
read_json_from_file(&args.merkle_trees_path).expect("read GeneratedMerkleTreeCollection");
let max_loop_duration = Duration::from_secs(args.max_loop_duration_secs);

info!(
"Starting to claim mev tips for epoch: {}",
merkle_trees.epoch
);
async fn start_mev_claim_process(
merkle_trees: GeneratedMerkleTreeCollection,
rpc_url: String,
tip_distribution_program_id: Pubkey,
signer: Arc<Keypair>,
max_loop_duration: Duration,
micro_lamports: u64,
) -> Result<(), ClaimMevError> {
let start = Instant::now();

match claim_mev_tips(
merkle_trees.clone(),
args.rpc_url.clone(),
args.rpc_send_connection_count,
args.max_concurrent_rpc_get_reqs,
&args.tip_distribution_program_id,
keypair.clone(),
args.max_loop_retries,
&merkle_trees,
rpc_url,
tip_distribution_program_id,
signer,
max_loop_duration,
args.micro_lamports_per_compute_unit,
micro_lamports,
)
.await
{
Expand All @@ -104,11 +84,6 @@ async fn main() -> Result<(), ClaimMevError> {
("epoch", merkle_trees.epoch, i64),
("error", 1, i64),
("err_str", e.to_string(), String),
(
"merkle_trees_path",
args.merkle_trees_path.to_string_lossy(),
String
),
("elapsed_us", start.elapsed().as_micros(), i64),
);
Err(e)
Expand All @@ -117,61 +92,99 @@ async fn main() -> Result<(), ClaimMevError> {
datapoint_info!(
"claim_mev_workflow-claim_completion",
("epoch", merkle_trees.epoch, i64),
(
"merkle_trees_path",
args.merkle_trees_path.to_string_lossy(),
String
),
("elapsed_us", start.elapsed().as_micros(), i64),
);
Ok(())
}
}?;
}
}

async fn start_rent_claim(
rpc_url: String,
tip_distribution_program_id: Pubkey,
signer: Arc<Keypair>,
max_loop_duration: Duration,
should_reclaim_tdas: bool,
micro_lamports: u64,
epoch: u64,
) -> Result<(), ClaimMevError> {
let start = Instant::now();
match reclaim_rent(
rpc_url,
tip_distribution_program_id,
signer,
max_loop_duration,
should_reclaim_tdas,
micro_lamports,
)
.await
{
Err(e) => {
datapoint_error!(
"claim_mev_workflow-reclaim_rent_error",
("epoch", epoch, i64),
("error", 1, i64),
("err_str", e.to_string(), String),
("elapsed_us", start.elapsed().as_micros(), i64),
);
Err(e)
}
Ok(()) => {
datapoint_info!(
"claim_mev_workflow-reclaim_rent_completion",
("epoch", epoch, i64),
("elapsed_us", start.elapsed().as_micros(), i64),
);
Ok(())
}
}
}

#[tokio::main]
async fn main() -> Result<(), ClaimMevError> {
env_logger::init();

gethostname()
.into_string()
.map(set_host_id)
.expect("set hostname");

let args: Args = Args::parse();
let keypair = Arc::new(read_keypair_file(&args.keypair_path).expect("read keypair file"));
let merkle_trees: GeneratedMerkleTreeCollection =
read_json_from_file(&args.merkle_trees_path).expect("read GeneratedMerkleTreeCollection");
let max_loop_duration = Duration::from_secs(args.max_retry_duration_secs);

info!(
"Starting to claim mev tips for epoch: {}",
merkle_trees.epoch
);
let epoch = merkle_trees.epoch;

let mut futs = vec![];
futs.push(tokio::spawn(start_mev_claim_process(
merkle_trees,
args.rpc_url.clone(),
args.tip_distribution_program_id,
keypair.clone(),
max_loop_duration,
args.micro_lamports,
)));
if args.should_reclaim_rent {
let start = Instant::now();
match reclaim_rent(
args.rpc_url,
args.rpc_send_connection_count,
futs.push(tokio::spawn(start_rent_claim(
args.rpc_url.clone(),
args.tip_distribution_program_id,
keypair,
args.max_loop_retries,
keypair.clone(),
max_loop_duration,
args.should_reclaim_tdas,
args.micro_lamports_per_compute_unit,
)
.await
{
Err(e) => {
datapoint_error!(
"claim_mev_workflow-reclaim_rent_error",
("epoch", merkle_trees.epoch, i64),
("error", 1, i64),
("err_str", e.to_string(), String),
(
"merkle_trees_path",
args.merkle_trees_path.to_string_lossy(),
String
),
("elapsed_us", start.elapsed().as_micros(), i64),
);
Err(e)
}
Ok(()) => {
datapoint_info!(
"claim_mev_workflow-reclaim_rent_completion",
("epoch", merkle_trees.epoch, i64),
(
"merkle_trees_path",
args.merkle_trees_path.to_string_lossy(),
String
),
("elapsed_us", start.elapsed().as_micros(), i64),
);
Ok(())
}
}?;
args.micro_lamports,
epoch,
)));
}
let results = join_all(futs).await;
solana_metrics::flush(); // sometimes last datapoint doesn't get emitted. this increases likelihood.
for r in results {
r.map_err(|e| ClaimMevError::UncaughtError { e: e.to_string() })??;
}
Ok(())
}
Loading

0 comments on commit d3f235b

Please sign in to comment.