Skip to content

Commit

Permalink
Make IPC Path optional (#314)
Browse files Browse the repository at this point in the history
Make `el_node_ipc_path` optional in config.

#302 

## ✅ I have completed the following steps:

* [✅ ] Run `make lint`
* [✅ ] Run `make test`
* [ ] Added tests (if applicable)

---------

Signed-off-by: 7suyash7 <[email protected]>
  • Loading branch information
7suyash7 authored Jan 6, 2025
1 parent 798de18 commit 06840a8
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 23 deletions.
2 changes: 1 addition & 1 deletion crates/rbuilder/src/bin/dummy-builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async fn main() -> eyre::Result<()> {
let order_input_config = OrderInputConfig::new(
false,
true,
DEFAULT_EL_NODE_IPC_PATH.parse().unwrap(),
Some(PathBuf::from(DEFAULT_EL_NODE_IPC_PATH)),
DEFAULT_INCOMING_BUNDLES_PORT,
default_ip(),
DEFAULT_SERVE_MAX_CONNECTIONS,
Expand Down
4 changes: 2 additions & 2 deletions crates/rbuilder/src/live_builder/base_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct BaseConfig {

pub flashbots_db: Option<EnvOrValue<String>>,

pub el_node_ipc_path: PathBuf,
pub el_node_ipc_path: Option<PathBuf>,
pub jsonrpc_server_port: u16,
#[serde(default = "default_ip")]
pub jsonrpc_server_ip: Ipv4Addr,
Expand Down Expand Up @@ -391,7 +391,7 @@ impl Default for BaseConfig {
error_storage_path: None,
coinbase_secret_key: None,
flashbots_db: None,
el_node_ipc_path: "/tmp/reth.ipc".parse().unwrap(),
el_node_ipc_path: None,
jsonrpc_server_port: DEFAULT_INCOMING_BUNDLES_PORT,
jsonrpc_server_ip: default_ip(),
ignore_cancellable_orders: true,
Expand Down
51 changes: 35 additions & 16 deletions crates/rbuilder/src/live_builder/order_input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@ use self::{
orderpool::{OrderPool, OrderPoolSubscriptionId},
replaceable_order_sink::ReplaceableOrderSink,
};
use crate::primitives::{serialize::CancelShareBundle, BundleReplacementKey, Order};
use crate::telemetry::{set_current_block, set_ordepool_count};
use crate::{
primitives::{serialize::CancelShareBundle, BundleReplacementKey, Order},
telemetry::{set_current_block, set_ordepool_count},
};
use alloy_consensus::Header;
use jsonrpsee::RpcModule;
use parking_lot::Mutex;
use reth_provider::StateProviderFactory;
use std::time::Instant;
use std::{net::Ipv4Addr, path::PathBuf, sync::Arc, time::Duration};
use std::{
net::Ipv4Addr,
path::{Path, PathBuf},
sync::Arc,
time::{Duration, Instant},
};
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -82,7 +88,7 @@ pub struct OrderInputConfig {
/// if true -- txs with blobs are ignored
ignore_blobs: bool,
/// Path to reth ipc
ipc_path: PathBuf,
ipc_path: Option<PathBuf>,
/// Input RPC port
server_port: u16,
/// Input RPC ip
Expand All @@ -103,7 +109,7 @@ impl OrderInputConfig {
pub fn new(
ignore_cancellable_orders: bool,
ignore_blobs: bool,
ipc_path: PathBuf,
ipc_path: Option<PathBuf>,
server_port: u16,
server_ip: Ipv4Addr,
serve_max_connections: u32,
Expand All @@ -123,7 +129,11 @@ impl OrderInputConfig {
}

pub fn from_config(config: &BaseConfig) -> eyre::Result<Self> {
let el_node_ipc_path = expand_path(config.el_node_ipc_path.clone())?;
let el_node_ipc_path = config
.el_node_ipc_path
.as_ref()
.map(|p| expand_path(p.as_path()))
.transpose()?;

Ok(OrderInputConfig {
ignore_cancellable_orders: config.ignore_cancellable_orders,
Expand All @@ -139,7 +149,7 @@ impl OrderInputConfig {

pub fn default_e2e() -> Self {
Self {
ipc_path: PathBuf::from("/tmp/anvil.ipc"),
ipc_path: Some(PathBuf::from("/tmp/anvil.ipc")),
results_channel_timeout: Duration::new(5, 0),
ignore_cancellable_orders: false,
ignore_blobs: false,
Expand Down Expand Up @@ -215,12 +225,21 @@ where
global_cancel.clone(),
)
.await?;
let txpool_fetcher = txpool_fetcher::subscribe_to_txpool_with_blobs(
config.clone(),
order_sender.clone(),
global_cancel.clone(),
)
.await?;

let mut handles = vec![clean_job, rpc_server];

if config.ipc_path.is_some() {
info!("IPC path configured, starting txpool subscription");
let txpool_fetcher = txpool_fetcher::subscribe_to_txpool_with_blobs(
config.clone(),
order_sender.clone(),
global_cancel.clone(),
)
.await?;
handles.push(txpool_fetcher);
} else {
info!("No IPC path configured, skipping txpool subscription");
}

let handle = tokio::spawn(async move {
info!("OrderPoolJobs: started");
Expand Down Expand Up @@ -277,7 +296,7 @@ where
new_commands.clear();
}

for handle in [clean_job, rpc_server, txpool_fetcher] {
for handle in handles {
handle
.await
.map_err(|err| {
Expand All @@ -291,7 +310,7 @@ where
Ok((handle, subscriber))
}

pub fn expand_path(path: PathBuf) -> eyre::Result<PathBuf> {
pub fn expand_path(path: &Path) -> eyre::Result<PathBuf> {
let path_str = path
.to_str()
.ok_or_else(|| eyre::eyre!("Invalid UTF-8 in path"))?;
Expand Down
12 changes: 10 additions & 2 deletions crates/rbuilder/src/live_builder/order_input/txpool_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ pub async fn subscribe_to_txpool_with_blobs(
results: mpsc::Sender<ReplaceableOrderPoolCommand>,
global_cancel: CancellationToken,
) -> eyre::Result<JoinHandle<()>> {
let ipc = IpcConnect::new(config.ipc_path);
let ipc_path = config
.ipc_path
.ok_or_else(|| eyre::eyre!("No IPC path configured"))?;
let ipc = IpcConnect::new(ipc_path);
let provider = ProviderBuilder::new().on_ipc(ipc).await?;

let handle = tokio::spawn(async move {
Expand Down Expand Up @@ -114,6 +117,7 @@ async fn get_tx_with_blobs(

#[cfg(test)]
mod test {

use super::*;
use alloy_consensus::{SidecarBuilder, SimpleCoder};
use alloy_network::{EthereumWallet, TransactionBuilder};
Expand All @@ -122,6 +126,7 @@ mod test {
use alloy_provider::{Provider, ProviderBuilder};
use alloy_rpc_types::TransactionRequest;
use alloy_signer_local::PrivateKeySigner;
use std::path::PathBuf;

#[tokio::test]
/// Test that the fetcher can retrieve transactions (both normal and blob) from the txpool
Expand All @@ -133,7 +138,10 @@ mod test {

let (sender, mut receiver) = mpsc::channel(10);
subscribe_to_txpool_with_blobs(
OrderInputConfig::default_e2e(),
OrderInputConfig {
ipc_path: Some(PathBuf::from("/tmp/anvil.ipc")),
..OrderInputConfig::default_e2e()
},
sender,
CancellationToken::new(),
)
Expand Down
7 changes: 5 additions & 2 deletions crates/rbuilder/src/utils/provider_factory_reopen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ use reth_provider::{
HeaderProvider, StateProviderBox, StateProviderFactory, StaticFileProviderFactory,
};
use revm_primitives::{B256, U256};
use std::ops::DerefMut;
use std::{ops::RangeBounds, path::PathBuf, sync::Arc};
use std::{
ops::{DerefMut, RangeBounds},
path::PathBuf,
sync::Arc,
};
use tracing::debug;

/// This struct is used as a workaround for https://github.com/paradigmxyz/reth/issues/7836
Expand Down

0 comments on commit 06840a8

Please sign in to comment.