diff --git a/Cargo.lock b/Cargo.lock index f749b59..f6b7d1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2050,6 +2050,19 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.13" @@ -3878,7 +3891,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -4839,7 +4852,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -5209,7 +5222,7 @@ dependencies = [ [[package]] name = "metrics_macros" version = "0.1.0" -source = "git+https://github.com/flashbots/rbuilder.git?rev=d96e7215483bac0ab145459f4ddaa811d99459d6#d96e7215483bac0ab145459f4ddaa811d99459d6" +source = "git+https://github.com/flashbots/rbuilder.git?rev=5d152e8edc3c2027263c40b7661323a468a5f01b#5d152e8edc3c2027263c40b7661323a468a5f01b" dependencies = [ "proc-macro2", "quote", @@ -5688,7 +5701,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" dependencies = [ - "proc-macro-crate 1.1.3", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", "syn 2.0.72", @@ -7095,7 +7108,7 @@ dependencies = [ [[package]] name = "rbuilder" version = "0.1.0" -source = "git+https://github.com/flashbots/rbuilder.git?rev=d96e7215483bac0ab145459f4ddaa811d99459d6#d96e7215483bac0ab145459f4ddaa811d99459d6" +source = "git+https://github.com/flashbots/rbuilder.git?rev=5d152e8edc3c2027263c40b7661323a468a5f01b#5d152e8edc3c2027263c40b7661323a468a5f01b" dependencies = [ "ahash", "alloy-chains", @@ -7122,9 +7135,11 @@ dependencies = [ "bigdecimal 0.4.5", "built", "clap", + "crossbeam", "crossbeam-queue", "csv", "ctor", + "dashmap 6.1.0", "derivative", "eth-sparse-mpt", "ethereum-consensus", @@ -7149,6 +7164,7 @@ dependencies = [ "mev-share-sse", "mockall", "once_cell", + "parking_lot 0.12.3", "primitive-types", "priority-queue", "prometheus", @@ -11426,7 +11442,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" [[package]] name = "test_utils" version = "0.1.0" -source = "git+https://github.com/flashbots/rbuilder.git?rev=d96e7215483bac0ab145459f4ddaa811d99459d6#d96e7215483bac0ab145459f4ddaa811d99459d6" +source = "git+https://github.com/flashbots/rbuilder.git?rev=5d152e8edc3c2027263c40b7661323a468a5f01b#5d152e8edc3c2027263c40b7661323a468a5f01b" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index b09c276..9896ea0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,9 +53,9 @@ http = "0.2.9" hyper = "0.14" futures-util = "0.3" -metrics_macros = { git = "https://github.com/flashbots/rbuilder.git", rev = "d96e7215483bac0ab145459f4ddaa811d99459d6"} +metrics_macros = { git = "https://github.com/flashbots/rbuilder.git", rev = "5d152e8edc3c2027263c40b7661323a468a5f01b"} -rbuilder = { git = "https://github.com/flashbots/rbuilder.git", rev = "d96e7215483bac0ab145459f4ddaa811d99459d6"} +rbuilder = { git = "https://github.com/flashbots/rbuilder.git", rev = "5d152e8edc3c2027263c40b7661323a468a5f01b"} [build-dependencies] built = { version = "0.7.1", features = ["git2", "chrono"] } diff --git a/config-live-example.toml b/config-live-example.toml index 8a144e8..894bf7c 100644 --- a/config-live-example.toml +++ b/config-live-example.toml @@ -26,7 +26,7 @@ key_registration_url = "http://127.0.0.1:8090" ignore_cancellable_orders = true sbundle_mergeabe_signers = [] -live_builders = ["mp-ordering", "mgp-ordering", "merging"] +live_builders = ["mp-ordering", "mgp-ordering", "parallel"] top_bid_ws_url = "env:TOP_BID_WS_URL" top_bid_ws_basic_auth = "env:TOP_BID_WS_BASIC_AUTH" @@ -63,8 +63,7 @@ failed_order_retries = 1 drop_failed_orders = true [[builders]] -name = "merging" -algo = "merging-builder" +name = "parallel" +algo = "parallel-builder" discard_txs = true num_threads = 5 -merge_wait_time_ms = 100 diff --git a/src/flashbots_config.rs b/src/flashbots_config.rs index bb430a0..36a884b 100644 --- a/src/flashbots_config.rs +++ b/src/flashbots_config.rs @@ -5,7 +5,7 @@ use alloy_signer_local::PrivateKeySigner; use eyre::Context; use http::StatusCode; use jsonrpsee::RpcModule; -use rbuilder::building::builders::merging_builder::merging_build_backtest; +use rbuilder::building::builders::parallel_builder::parallel_build_backtest; use rbuilder::building::builders::UnfinishedBlockBuildingSinkFactory; use rbuilder::live_builder::base_config::EnvOrValue; use rbuilder::live_builder::block_output::bid_observer::{BidObserver, NullBidObserver}; @@ -30,8 +30,8 @@ use rbuilder::{ utils::build_info::Version, }; use reth::payload::database::CachedReads; -use reth::providers::ProviderFactory; -use reth_db::DatabaseEnv; +use reth::providers::{DatabaseProviderFactory, HeaderProvider, StateProviderFactory}; +use reth_db::Database; use serde::Deserialize; use serde_with::serde_as; use tokio_util::sync::CancellationToken; @@ -118,17 +118,17 @@ impl LiveBuilderConfig for FlashbotsConfig { &self.base_config } - async fn create_builder( + async fn new_builder( &self, + provider: P, cancellation_token: CancellationToken, - ) -> eyre::Result, MevBoostSlotDataGenerator>> { - let provider_factory = self.base_config.provider_factory()?; - + ) -> eyre::Result> + where + DB: Database + Clone + 'static, + P: DatabaseProviderFactory + StateProviderFactory + HeaderProvider + Clone + 'static, + { let (sink_factory, relays, bidding_service_win_control) = self - .create_sink_factory_and_relays( - provider_factory.provider_factory_unchecked(), - cancellation_token.clone(), - ) + .create_sink_factory_and_relays(provider.clone(), cancellation_token.clone()) .await?; let payload_event = MevBoostSlotDataGenerator::new( @@ -144,7 +144,7 @@ impl LiveBuilderConfig for FlashbotsConfig { cancellation_token.clone(), sink_factory, payload_event, - provider_factory, + provider, ) .await?; @@ -170,11 +170,15 @@ impl LiveBuilderConfig for FlashbotsConfig { } /// @Pending fix this ugly copy/paste - fn build_backtest_block( + fn build_backtest_block( &self, building_algorithm_name: &str, - input: BacktestSimulateBlockInput<'_, Arc>, - ) -> eyre::Result<(Block, CachedReads)> { + input: BacktestSimulateBlockInput<'_, P>, + ) -> eyre::Result<(Block, CachedReads)> + where + DB: Database + Clone + 'static, + P: DatabaseProviderFactory + StateProviderFactory + Clone + 'static, + { let builder_cfg = self.builder(building_algorithm_name)?; match builder_cfg.builder { SpecificBuilderConfig::OrderingBuilder(config) => { @@ -182,7 +186,9 @@ impl LiveBuilderConfig for FlashbotsConfig { config, input, ) } - SpecificBuilderConfig::MergingBuilder(config) => merging_build_backtest(input, config), + SpecificBuilderConfig::ParallelBuilder(config) => { + parallel_build_backtest(input, config) + } } } } @@ -282,15 +288,19 @@ impl FlashbotsConfig { /// BlockSealingBidderFactory: performs sealing/bidding. Sends bids to the RelaySubmitSinkFactory /// UnfinishedBlockBuildingSinkFactoryWrapper: sends all the tbv info via redis and forwards to BlockSealingBidderFactory #[allow(clippy::type_complexity)] - async fn create_sink_factory_and_relays( + async fn create_sink_factory_and_relays( &self, - provider_factory: ProviderFactory>, + provider: P, cancellation_token: CancellationToken, ) -> eyre::Result<( Box, Vec, Arc, - )> { + )> + where + DB: Database + Clone + 'static, + P: DatabaseProviderFactory + StateProviderFactory + HeaderProvider + Clone + 'static, + { let block_processor_key = if let Some(key_registration_url) = &self.key_registration_url { if self.blocks_processor_url.is_none() { return Self::bail_blocks_processor_url_not_set(); @@ -307,7 +317,7 @@ impl FlashbotsConfig { // BlockSealingBidderFactory let (wallet_balance_watcher, wallet_history) = WalletBalanceWatcher::new( - provider_factory, + provider, self.base_config.coinbase_signer()?.address, WALLET_INIT_HISTORY_SIZE, )?;