diff --git a/Cargo.lock b/Cargo.lock index dfccd0ca..48031fe8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5748,9 +5748,9 @@ dependencies = [ [[package]] name = "metrics" -version = "0.24.0" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ae428771d17306715c5091d446327d1cfdedc82185c65ba8423ab404e45bf10" +checksum = "7a7deb012b3b2767169ff203fadb4c6b0b82b947512e5eb9e0b78c2e186ad9e3" dependencies = [ "ahash", "portable-atomic", @@ -6520,6 +6520,7 @@ dependencies = [ "eyre", "futures-util", "jsonrpsee 0.24.7", + "metrics", "op-alloy-consensus", "op-alloy-rpc-types-engine", "reth", @@ -6531,6 +6532,8 @@ dependencies = [ "reth-ethereum-payload-builder", "reth-evm", "reth-execution-types", + "reth-exex", + "reth-metrics", "reth-node-api", "reth-node-ethereum", "reth-optimism-chainspec", diff --git a/Cargo.toml b/Cargo.toml index ac02bce0..5866e148 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,8 @@ reth-chainspec = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.5" } reth-evm = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.5" } reth-evm-ethereum = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.5" } reth-execution-errors = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.5" } +reth-exex = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.5" } +reth-metrics = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.5" } reth-trie-db = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.5" } reth-payload-primitives = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.5" } reth-transaction-pool = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.5" } @@ -151,9 +153,10 @@ tokio-util = "0.7.12" url = "2.5.2" libc = { version = "0.2.161" } +lazy_static = "1.4.0" tikv-jemallocator = { version = "0.6" } tracing = "0.1.37" - +metrics = { version = "0.24.1" } eth-sparse-mpt = { path = "crates/eth-sparse-mpt" } sysperf = { path = "crates/sysperf" } diff --git a/crates/op-rbuilder/Cargo.toml b/crates/op-rbuilder/Cargo.toml index 38d35906..c43f1610 100644 --- a/crates/op-rbuilder/Cargo.toml +++ b/crates/op-rbuilder/Cargo.toml @@ -15,6 +15,7 @@ reth-optimism-primitives.workspace = true reth-cli-util.workspace = true reth-payload-primitives.workspace = true reth-evm.workspace = true +reth-exex.workspace = true reth-chainspec.workspace = true reth-primitives.workspace = true reth-node-api.workspace = true @@ -24,6 +25,7 @@ reth-node-ethereum.workspace = true reth-chain-state.workspace = true reth-execution-types.workspace = true reth-ethereum-payload-builder.workspace = true +reth-metrics.workspace = true reth-provider.workspace = true reth-revm.workspace = true reth-trie.workspace = true @@ -61,6 +63,7 @@ async-trait = { workspace = true } clap_builder = { workspace = true } clap.workspace = true derive_more.workspace = true +metrics.workspace = true [target.'cfg(unix)'.dependencies] tikv-jemallocator = { version = "0.6", optional = true } diff --git a/crates/op-rbuilder/src/main.rs b/crates/op-rbuilder/src/main.rs index e20f7e89..c50cd767 100644 --- a/crates/op-rbuilder/src/main.rs +++ b/crates/op-rbuilder/src/main.rs @@ -1,5 +1,6 @@ use clap::Parser; use generator::EmptyBlockPayloadJobGenerator; +use monitoring::Monitoring; use payload_builder::OpPayloadBuilder as FBPayloadBuilder; use payload_builder_vanilla::OpPayloadBuilderVanilla; use reth::builder::Node; @@ -31,6 +32,8 @@ use reth_optimism_primitives::OpPrimitives; use reth_transaction_pool::PoolTransaction; pub mod generator; +mod metrics; +mod monitoring; pub mod payload_builder; mod payload_builder_vanilla; mod tx_signer; @@ -109,6 +112,10 @@ fn main() { .payload(CustomPayloadBuilder::new(builder_args.builder_signer)), ) .with_add_ons(op_node.add_ons()) + .install_exex("monitoring", move |ctx| { + let builder_signer = builder_args.builder_signer; + async move { Ok(Monitoring::new(ctx, builder_signer).start()) } + }) .launch_with_fn(|builder| { let launcher = EngineNodeLauncher::new( builder.task_executor().clone(), diff --git a/crates/op-rbuilder/src/metrics.rs b/crates/op-rbuilder/src/metrics.rs new file mode 100644 index 00000000..8bb18702 --- /dev/null +++ b/crates/op-rbuilder/src/metrics.rs @@ -0,0 +1,25 @@ +use reth_metrics::{metrics::Gauge, Metrics}; + +/// op-rbuilder metrics +#[derive(Metrics)] +#[metrics(scope = "op_rbuilder")] +pub struct OpRBuilderMetrics { + /// Number of builder built blocks + pub builder_built_blocks: Gauge, + /// Last built block height + pub last_built_block_height: Gauge, +} + +impl OpRBuilderMetrics { + pub fn inc_builder_built_blocks(&self) { + self.builder_built_blocks.increment(1); + } + + pub fn dec_builder_built_blocks(&self) { + self.builder_built_blocks.decrement(1); + } + + pub fn set_last_built_block_height(&self, height: u64) { + self.last_built_block_height.set(height as f64); + } +} diff --git a/crates/op-rbuilder/src/monitoring.rs b/crates/op-rbuilder/src/monitoring.rs new file mode 100644 index 00000000..00451e59 --- /dev/null +++ b/crates/op-rbuilder/src/monitoring.rs @@ -0,0 +1,129 @@ +use alloy_consensus::{Transaction, TxReceipt}; +use futures_util::TryStreamExt; +use reth::core::primitives::SignedTransaction; +use reth_exex::{ExExContext, ExExEvent}; +use reth_node_api::{FullNodeComponents, NodeTypes}; +use reth_optimism_primitives::{OpPrimitives, OpTransactionSigned}; +use reth_primitives::{Block, SealedBlockWithSenders}; +use reth_provider::Chain; +use tracing::info; + +use crate::{metrics::OpRBuilderMetrics, tx_signer::Signer}; + +const OP_BUILDER_TX_PREFIX: &[u8] = b"Block Number:"; + +pub struct Monitoring { + ctx: ExExContext, + builder_signer: Option, + metrics: OpRBuilderMetrics, +} + +impl Monitoring +where + Node: FullNodeComponents>, +{ + pub fn new(ctx: ExExContext, builder_signer: Option) -> Self { + Self { + ctx, + builder_signer, + metrics: Default::default(), + } + } + + pub async fn start(mut self) -> eyre::Result<()> { + // Process all new chain state notifications + while let Some(notification) = self.ctx.notifications.try_next().await? { + if let Some(reverted_chain) = notification.reverted_chain() { + self.revert(&reverted_chain).await?; + } + if let Some(committed_chain) = notification.committed_chain() { + self.commit(&committed_chain).await?; + self.ctx + .events + .send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?; + } + } + + Ok(()) + } + + /// Process a new chain commit. + /// + /// This function decodes the builder tx and then emits metrics + async fn commit(&mut self, chain: &Chain) -> eyre::Result<()> { + info!("Processing new chain commit"); + let txs = decode_chain_into_builder_txs(chain, self.builder_signer); + + for (block, _) in txs { + self.metrics.inc_builder_built_blocks(); + self.metrics.set_last_built_block_height(block.number); + info!( + block_number = block.number, + "Committed block built by builder" + ); + } + + Ok(()) + } + + /// Process a chain revert. + /// + /// This function decodes all transactions in the block, updates the metrics for builder built blocks + async fn revert(&mut self, chain: &Chain) -> eyre::Result<()> { + info!("Processing new chain revert"); + let mut txs = decode_chain_into_builder_txs(chain, self.builder_signer); + // Reverse the order of txs to start reverting from the tip + txs.reverse(); + + if let Some((block, _)) = txs.last() { + self.metrics.set_last_built_block_height(block.number - 1); + } + + for (block, _) in txs { + self.metrics.dec_builder_built_blocks(); + info!( + block_number = block.number, + "Reverted block built by builder" + ); + } + + Ok(()) + } +} + +/// Decode chain of blocks and filter list to builder txs +fn decode_chain_into_builder_txs( + chain: &Chain, + builder_signer: Option, +) -> Vec<( + SealedBlockWithSenders>, + OpTransactionSigned, +)> { + chain + // Get all blocks and receipts + .blocks_and_receipts() + // Get all receipts + .flat_map(|(block, receipts)| { + let block_clone = block.clone(); + block + .body() + .transactions + .iter() + .zip(receipts.iter().flatten()) + .filter_map(move |(tx, receipt)| { + let is_builder_tx = receipt.status() + && tx.input().starts_with(OP_BUILDER_TX_PREFIX) + && tx.recover_signer().is_some_and(|signer| { + builder_signer.is_some_and(|bs| signer == bs.address) + }); + + if is_builder_tx { + // Clone the entire block and the transaction + Some((block_clone.clone(), tx.clone())) + } else { + None + } + }) + }) + .collect() +} diff --git a/crates/rbuilder/Cargo.toml b/crates/rbuilder/Cargo.toml index fe10e6a1..adc84f6b 100644 --- a/crates/rbuilder/Cargo.toml +++ b/crates/rbuilder/Cargo.toml @@ -105,7 +105,7 @@ uuid = { version = "1.6.1", features = ["serde", "v5", "v4"] } prometheus = "0.13.4" hyper = { version = "1.3.1", features = ["server", "full"] } warp = "0.3.7" -lazy_static = "1.4.0" +lazy_static.workspace = true ctor = "0.2" toml = "0.8.8" ahash = "0.8.6"