From 5def4b4a8d662638a35f7d0bbba5e680863ce9ae Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Fri, 15 Nov 2024 15:44:13 +0900 Subject: [PATCH] init argus --- apps/argus/Cargo.toml | 8 + apps/argus/README.md | 19 +++ apps/argus/config.sample.yaml | 18 +++ apps/argus/rust-toolchain | 1 + apps/argus/src/config.rs | 60 +++++++ apps/argus/src/contract.rs | 41 +++++ apps/argus/src/error.rs | 64 ++++++++ apps/argus/src/hermes.rs | 129 +++++++++++++++ apps/argus/src/keeper.rs | 293 ++++++++++++++++++++++++++++++++++ apps/argus/src/main.rs | 94 +++++++++++ apps/argus/src/storage.rs | 164 +++++++++++++++++++ apps/argus/src/types.rs | 32 ++++ 12 files changed, 923 insertions(+) create mode 100644 apps/argus/Cargo.toml create mode 100644 apps/argus/README.md create mode 100644 apps/argus/config.sample.yaml create mode 100644 apps/argus/rust-toolchain create mode 100644 apps/argus/src/config.rs create mode 100644 apps/argus/src/contract.rs create mode 100644 apps/argus/src/error.rs create mode 100644 apps/argus/src/hermes.rs create mode 100644 apps/argus/src/keeper.rs create mode 100644 apps/argus/src/main.rs create mode 100644 apps/argus/src/storage.rs create mode 100644 apps/argus/src/types.rs diff --git a/apps/argus/Cargo.toml b/apps/argus/Cargo.toml new file mode 100644 index 0000000000..e53fa0894b --- /dev/null +++ b/apps/argus/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "argus" +version = "0.1.0" +edition = "2021" + +[dependencies] +alloy = { version = "0.3", features = ["full", "node-bindings"] } +tokio = { version = "1.28", features = ["full"] } diff --git a/apps/argus/README.md b/apps/argus/README.md new file mode 100644 index 0000000000..7d5033e4b2 --- /dev/null +++ b/apps/argus/README.md @@ -0,0 +1,19 @@ +# Argus + +Argus is a webservice that serves price updates according to the Pulse protocol. +The service also operates a keeper task that performs callback transactions for user requests. + +A single instance of this service can simultaneously serve price updates for several different blockchains. +Each blockchain is configured in `config.yaml`. + +## How It Works + +1. Continuously polls the Pulse contract's storage to discover new price update requests +2. Fetches required price data from Pyth Network +3. Batches multiple requests when possible for gas efficiency +4. Executes callbacks with appropriate gas limits specified in the original requests +5. Monitors transaction success and handles retries when necessary + +## Architecture + +The service is built on Rust for performance and reliability, sharing architectural patterns with Fortuna (the Entropy protocol's keeper service). However, unlike Fortuna which relies on event subscriptions, Argus uses direct storage polling for more reliable request discovery. diff --git a/apps/argus/config.sample.yaml b/apps/argus/config.sample.yaml new file mode 100644 index 0000000000..8a4d247ecc --- /dev/null +++ b/apps/argus/config.sample.yaml @@ -0,0 +1,18 @@ +chains: + ethereum: + geth_rpc_addr: "https://eth-mainnet.g.alchemy.com/v2/YOUR-API-KEY" + contract_addr: "0x1234..." + poll_interval: 5 + min_batch_size: 1 + max_batch_size: 10 + batch_timeout: 30 + min_keeper_balance: 1000000000000000000 # 1 ETH + gas_limit: 500000 + +provider: + uri: "http://localhost:8080" + address: "0x5678..." + private_key: "0xabcd..." # Provider private key + +keeper: + private_key: "0xdef0..." # Keeper private key diff --git a/apps/argus/rust-toolchain b/apps/argus/rust-toolchain new file mode 100644 index 0000000000..f984c0ee0c --- /dev/null +++ b/apps/argus/rust-toolchain @@ -0,0 +1 @@ +nightly-2023-07-23 diff --git a/apps/argus/src/config.rs b/apps/argus/src/config.rs new file mode 100644 index 0000000000..271a92feab --- /dev/null +++ b/apps/argus/src/config.rs @@ -0,0 +1,60 @@ +use { + alloy::{ + primitives::Address, + providers::{Provider, ProviderBuilder}, + signers::Signer, + }, + anyhow::Result, + serde::Deserialize, + std::{fs, time::Duration}, +}; + +#[derive(Debug, Deserialize)] +pub struct Config { + pub chains: HashMap, + pub provider: ProviderConfig, + pub keeper: KeeperConfig, +} + +#[derive(Debug, Deserialize)] +pub struct ChainConfig { + pub geth_rpc_addr: String, + pub contract_addr: Address, + pub poll_interval: u64, // in seconds + pub min_batch_size: usize, + pub max_batch_size: usize, + pub batch_timeout: u64, // in seconds + pub min_keeper_balance: u64, + pub gas_limit: u64, +} + +#[derive(Debug, Deserialize)] +pub struct ProviderConfig { + pub uri: String, + pub address: Address, + pub private_key: SecretString, +} + +#[derive(Debug, Deserialize)] +pub struct KeeperConfig { + pub private_key: SecretString, +} + +#[derive(Debug, Deserialize)] +pub struct SecretString(String); + +impl Config { + pub fn load(path: &str) -> Result { + let contents = fs::read_to_string(path)?; + Ok(serde_yaml::from_str(&contents)?) + } + + pub fn create_provider(&self, chain_id: &str) -> Result { + let chain = self.chains.get(chain_id).ok_or_else(|| anyhow!("Chain not found"))?; + Ok(Provider::builder().rpc_url(&chain.geth_rpc_addr).build()?) + } + + pub fn create_signer(&self, secret: &SecretString) -> Result { + Ok(Signer::from_private_key(secret.0.parse()?)?) + } +} diff --git a/apps/argus/src/contract.rs b/apps/argus/src/contract.rs new file mode 100644 index 0000000000..576c8e17eb --- /dev/null +++ b/apps/argus/src/contract.rs @@ -0,0 +1,41 @@ +use { + alloy::{ + contract::{Contract, ContractInstance}, + primitives::{Address, Bytes, U256}, + providers::Provider, + signers::Signer, + }, + anyhow::Result, + std::sync::Arc, +}; + +// Contract ABI definition +abigen!(Pulse, "target_chains/ethereum/contracts/contracts/pulse/IPulse.sol"); + +pub struct PulseContract { + instance: ContractInstance, Pulse>, +} + +impl PulseContract

{ + pub fn new(address: Address, provider: Arc

) -> Self { + Self { + instance: ContractInstance::new(address, Arc::new(Pulse::new()), provider), + } + } + + pub async fn execute_callback( + &self, + provider: Address, + sequence_number: U64, + price_ids: Vec<[u8; 32]>, + update_data: Vec, + callback_gas_limit: U256, + ) -> Result { + let tx = self.instance + .execute_callback(provider, sequence_number, price_ids, update_data, callback_gas_limit) + .send() + .await?; + + Ok(tx.tx_hash()) + } +} diff --git a/apps/argus/src/error.rs b/apps/argus/src/error.rs new file mode 100644 index 0000000000..73b063e873 --- /dev/null +++ b/apps/argus/src/error.rs @@ -0,0 +1,64 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum ArgusError { + #[error("Failed to fetch price updates from Hermes: {0}")] + HermesError(#[from] HermesError), + + #[error("Contract error: {0}")] + ContractError(#[from] ContractError), + + #[error("Storage error: {0}")] + StorageError(#[from] StorageError), + + #[error("Configuration error: {0}")] + ConfigError(String), + + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +#[derive(Debug, Error)] +pub enum HermesError { + #[error("HTTP request failed: {0}")] + RequestFailed(#[from] reqwest::Error), + + #[error("Invalid response encoding: {0}")] + InvalidEncoding(String), + + #[error("No price updates found")] + NoPriceUpdates, + + #[error("Failed to parse price data: {0}")] + ParseError(String), + + #[error("Failed to decode hex data: {0}")] + HexDecodeError(#[from] hex::FromHexError), +} + +#[derive(Debug, Error)] +pub enum ContractError { + #[error("Transaction failed: {0}")] + TransactionFailed(String), + + #[error("Gas estimation failed: {0}")] + GasEstimationFailed(String), + + #[error("Invalid contract address: {0}")] + InvalidAddress(String), + + #[error("Contract call failed: {0}")] + CallFailed(String), +} + +#[derive(Debug, Error)] +pub enum StorageError { + #[error("Failed to read storage slot: {0}")] + ReadError(String), + + #[error("Failed to parse storage data: {0}")] + ParseError(String), + + #[error("Invalid storage layout: {0}")] + InvalidLayout(String), +} diff --git a/apps/argus/src/hermes.rs b/apps/argus/src/hermes.rs new file mode 100644 index 0000000000..3609e038e0 --- /dev/null +++ b/apps/argus/src/hermes.rs @@ -0,0 +1,129 @@ +use { + crate::{ + error::{ + ArgusError, + HermesError, + }, + types::PriceData, + }, + reqwest::Client, + serde::{ + Deserialize, + Serialize, + }, + std::time::{ + SystemTime, + UNIX_EPOCH, + }, +}; + +const HERMES_API_URL: &str = "https://hermes.pyth.network"; + +#[derive(Debug, Serialize, Deserialize)] +struct HermesResponse { + binary: BinaryUpdate, + parsed: Option>, +} + +#[derive(Debug, Serialize, Deserialize)] +struct BinaryUpdate { + data: Vec, + encoding: String, +} + +#[derive(Debug, Serialize, Deserialize)] +struct ParsedPriceUpdate { + id: String, + price: RpcPrice, + ema_price: RpcPrice, +} + +#[derive(Debug, Serialize, Deserialize)] +struct RpcPrice { + price: String, + conf: String, + expo: i32, + publish_time: u64, +} + +pub struct HermesClient { + client: Client, +} + +impl HermesClient { + pub fn new() -> Self { + Self { + client: Client::new(), + } + } + + pub async fn get_price_updates( + &self, + price_ids: &[[u8; 32]], + ) -> Result)>, HermesError> { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| HermesError::ParseError(format!("Failed to get timestamp: {}", e)))? + .as_secs(); + + let mut url = format!( + "{}/v2/updates/price/{}?parsed=true&encoding=hex", + HERMES_API_URL, now + ); + + for price_id in price_ids { + url.push_str(&format!("&ids[]={}", hex::encode(price_id))); + } + + let response = self + .client + .get(&url) + .send() + .await + .map_err(|e| HermesError::RequestFailed(e))? + .error_for_status() + .map_err(|e| HermesError::RequestFailed(e))? + .json::() + .await + .map_err(|e| HermesError::RequestFailed(e))?; + + let update_data = if response.binary.encoding == "hex" { + response + .binary + .data + .into_iter() + .map(|data| hex::decode(&data)) + .collect::, _>>() + .map_err(HermesError::HexDecodeError)? + } else { + return Err(HermesError::InvalidEncoding(response.binary.encoding)); + }; + + let price_updates = response.parsed.ok_or(HermesError::NoPriceUpdates)?; + + if price_updates.is_empty() { + return Err(HermesError::NoPriceUpdates); + } + + let mut results = Vec::with_capacity(price_updates.len()); + for (update, data) in price_updates.into_iter().zip(update_data) { + let price_data = PriceData { + price: update + .price + .price + .parse() + .map_err(|e| HermesError::ParseError(format!("Invalid price: {}", e)))?, + conf: update + .price + .conf + .parse() + .map_err(|e| HermesError::ParseError(format!("Invalid conf: {}", e)))?, + expo: update.price.expo, + publish_time: update.price.publish_time, + }; + results.push((price_data, data)); + } + + Ok(results) + } +} diff --git a/apps/argus/src/keeper.rs b/apps/argus/src/keeper.rs new file mode 100644 index 0000000000..43f808e6e0 --- /dev/null +++ b/apps/argus/src/keeper.rs @@ -0,0 +1,293 @@ +use { + crate::{ + contract::PulseContract, + hermes::HermesClient, + types::{PriceData, PriceUpdateRequest, UpdateBatch}, + error::{ArgusError, ContractError}, + }, + alloy::{ + primitives::{Address, Bytes, U256}, + providers::Provider, + signers::Signer, + }, + anyhow::Result, + std::{collections::HashMap, sync::Arc}, + tokio::{sync::mpsc, time}, +}; + +#[derive(Clone)] +pub struct KeeperMetrics { + pub transactions_submitted: Counter, + pub transaction_failures: Counter, + pub gas_used: Histogram, + pub batch_size: Histogram, +} + +pub struct Keeper { + provider: Arc, + signer: Arc, + request_rx: mpsc::Receiver, + metrics: Arc, + min_batch_size: usize, + max_batch_size: usize, + batch_timeout: Duration, + hermes_client: HermesClient, +} + +impl Keeper { + pub async fn new( + provider: Arc, + signer: Arc, + request_rx: mpsc::Receiver, + metrics: Arc, + min_batch_size: usize, + max_batch_size: usize, + batch_timeout: Duration, + ) -> Result { + Ok(Self { + provider, + signer, + request_rx, + metrics, + min_batch_size, + max_batch_size, + batch_timeout, + hermes_client: HermesClient::new(), + }) + } + + pub async fn run(&mut self) -> Result<()> { + let mut pending_requests = Vec::new(); + let mut batch_timer = time::interval(self.batch_timeout); + + loop { + tokio::select! { + Some(request) = self.request_rx.recv() => { + pending_requests.push(request); + + if pending_requests.len() >= self.max_batch_size { + self.process_batch(&mut pending_requests).await?; + } + } + _ = batch_timer.tick() => { + if pending_requests.len() >= self.min_batch_size { + self.process_batch(&mut pending_requests).await?; + } + } + } + } + } + + async fn process_batch(&self, requests: &mut Vec) -> Result<(), ArgusError> { + if requests.is_empty() { + return Ok(()); + } + + let batch = self.prepare_batch(requests).await?; + self.metrics.batch_size.observe(batch.requests.len() as f64); + + match self.submit_batch(batch).await { + Ok(_) => { + self.metrics.transactions_submitted.inc(); + requests.clear(); + Ok(()) + } + Err(e) => { + self.metrics.transaction_failures.inc(); + tracing::error!("Failed to submit batch: {}", e); + Err(e) + } + } + } + + async fn submit_batch(&self, batch: UpdateBatch) -> Result<(), ArgusError> { + let tx = self.build_batch_tx(&batch) + .map_err(|e| ContractError::TransactionFailed(e.to_string()))?; + + let signed_tx = self.signer.sign_transaction(tx) + .await + .map_err(|e| ContractError::TransactionFailed(format!("Failed to sign: {}", e)))?; + + let pending_tx = self.provider.send_raw_transaction(signed_tx.into()) + .await + .map_err(|e| ContractError::TransactionFailed(format!("Failed to send: {}", e)))?; + + let receipt = pending_tx.await + .map_err(|e| ContractError::TransactionFailed(format!("Failed to get receipt: {}", e)))?; + + if let Some(gas_used) = receipt.gas_used { + self.metrics.gas_used.observe(gas_used.as_f64()); + } + + // Check if transaction was successful + if !receipt.status.unwrap_or_default().is_success() { + return Err(ContractError::TransactionFailed("Transaction reverted".into()).into()); + } + + Ok(()) + } + + async fn prepare_batch(&self, requests: &[PriceUpdateRequest]) -> Result { + // Group requests by price ID to minimize Hermes API calls + let mut price_id_map: HashMap<[u8; 32], Vec> = HashMap::new(); + for (i, req) in requests.iter().enumerate() { + for price_id in &req.price_ids { + price_id_map.entry(*price_id).or_default().push(i); + } + } + + // Get all unique price IDs + let price_ids: Vec<[u8; 32]> = price_id_map.keys().copied().collect(); + + // Fetch price data from Hermes in a single batch request + let price_updates = self.hermes_client.get_price_updates(&price_ids).await?; + + let mut price_data = Vec::new(); + let mut update_data = Vec::new(); + + for (data, vaa) in price_updates { + price_data.push(data); + update_data.push(vaa); + } + + Ok(UpdateBatch { + requests: requests.to_vec(), + price_data, + update_data: update_data.into_iter().map(Bytes::from).collect(), + }) + } + + fn build_batch_tx(&self, batch: &UpdateBatch) -> Result { + let contract = PulseContract::new(self.contract_addr, self.provider.clone()); + + let tx = contract.execute_callback( + batch.requests[0].provider, + batch.requests[0].sequence_number, + batch.requests[0].price_ids.clone(), + batch.update_data.clone(), + batch.requests[0].callback_gas_limit, + ); + + Ok(tx) + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + crate::types::PriceUpdateRequest, + tokio::sync::mpsc, + }; + + fn setup_test_metrics() -> Arc { + Arc::new(KeeperMetrics { + transactions_submitted: Counter::default(), + transaction_failures: Counter::default(), + gas_used: Histogram::new([1.0, 5.0, 10.0, 50.0, 100.0, 500.0, 1000.0].into_iter()), + batch_size: Histogram::new([1.0, 2.0, 5.0, 10.0, 20.0, 50.0].into_iter()), + }) + } + + #[tokio::test] + async fn test_process_empty_batch() { + let (tx, rx) = mpsc::channel(100); + let provider = Arc::new(Provider::mock()); + let signer = Arc::new(Signer::new_random()); + let metrics = setup_test_metrics(); + + let keeper = Keeper::new( + provider, + signer, + rx, + metrics.clone(), + 1, + 10, + Duration::from_secs(5), + ).await.unwrap(); + + let mut requests = Vec::new(); + assert!(keeper.process_batch(&mut requests).await.is_ok()); + assert!(requests.is_empty()); + } + + #[tokio::test] + async fn test_batch_size_metrics() { + let (tx, rx) = mpsc::channel(100); + let provider = Arc::new(Provider::mock()); + let signer = Arc::new(Signer::new_random()); + let metrics = setup_test_metrics(); + + let keeper = Keeper::new( + provider, + signer, + rx, + metrics.clone(), + 1, + 10, + Duration::from_secs(5), + ).await.unwrap(); + + let mut requests = vec![ + PriceUpdateRequest { + provider: Address::zero(), + sequence_number: 1.into(), + publish_time: 1234.into(), + price_ids: vec![[0u8; 32]], + callback_gas_limit: 100000.into(), + requester: Address::zero(), + }, + PriceUpdateRequest { + provider: Address::zero(), + sequence_number: 2.into(), + publish_time: 1234.into(), + price_ids: vec![[0u8; 32]], + callback_gas_limit: 100000.into(), + requester: Address::zero(), + }, + ]; + + // Process batch should succeed and update metrics + keeper.process_batch(&mut requests).await.unwrap(); + + // Check that batch size metric was updated + let batch_size = metrics.batch_size.get_or_create(&()).get_count(); + assert_eq!(batch_size, 2); + } + + #[tokio::test] + async fn test_transaction_failure_metrics() { + let (tx, rx) = mpsc::channel(100); + let provider = Arc::new(Provider::mock().with_error()); // Mock provider that returns errors + let signer = Arc::new(Signer::new_random()); + let metrics = setup_test_metrics(); + + let keeper = Keeper::new( + provider, + signer, + rx, + metrics.clone(), + 1, + 10, + Duration::from_secs(5), + ).await.unwrap(); + + let mut requests = vec![ + PriceUpdateRequest { + provider: Address::zero(), + sequence_number: 1.into(), + publish_time: 1234.into(), + price_ids: vec![[0u8; 32]], + callback_gas_limit: 100000.into(), + requester: Address::zero(), + }, + ]; + + // Process batch should fail and update failure metrics + assert!(keeper.process_batch(&mut requests).await.is_err()); + + // Check that failure metric was updated + let failures = metrics.transaction_failures.get(); + assert_eq!(failures, 1); + } +} diff --git a/apps/argus/src/main.rs b/apps/argus/src/main.rs new file mode 100644 index 0000000000..f6167d4553 --- /dev/null +++ b/apps/argus/src/main.rs @@ -0,0 +1,94 @@ +use { + crate::{ + keeper::{Keeper, KeeperMetrics}, + storage::{StorageMetrics, StoragePoller}, + types::PriceUpdateRequest, + }, + anyhow::Result, + clap::Parser, + std::{sync::Arc, time::Duration}, + tokio::sync::{mpsc, RwLock}, +}; + +mod keeper; +mod storage; +mod types; + +#[derive(Parser)] +struct Opts { + #[clap(long, default_value = "config.yaml")] + config: String, +} + +#[tokio::main] +async fn main() -> Result<()> { + // Initialize logging + tracing_subscriber::fmt::init(); + + // Parse command line arguments + let opts = Opts::parse(); + + // Initialize metrics registry + let metrics_registry = Arc::new(RwLock::new(prometheus_client::registry::Registry::default())); + + // Load config + let config = config::load_config(&opts.config)?; + + // Set up channel between storage poller and keeper + let (request_tx, request_rx) = mpsc::channel(1000); + + // Initialize metrics + let storage_metrics = Arc::new(StorageMetrics { + requests_found: Counter::default(), + polling_errors: Counter::default(), + }); + + let keeper_metrics = Arc::new(KeeperMetrics { + transactions_submitted: Counter::default(), + transaction_failures: Counter::default(), + gas_used: Histogram::new([1.0, 5.0, 10.0, 50.0, 100.0, 500.0, 1000.0].into_iter()), + batch_size: Histogram::new([1.0, 2.0, 5.0, 10.0, 20.0, 50.0].into_iter()), + }); + + // Register metrics + { + let mut registry = metrics_registry.write().await; + // Register all metrics... + } + + // Initialize components + let provider = Arc::new(config.create_provider()?); + + let storage_poller = StoragePoller::new( + provider.clone(), + config.contract_address, + Duration::from_secs(config.poll_interval), + request_tx, + storage_metrics, + ).await?; + + let mut keeper = Keeper::new( + provider, + config.create_hot_wallet()?, + config.create_cold_wallet()?, + request_rx, + keeper_metrics, + config.min_batch_size, + config.max_batch_size, + Duration::from_secs(config.batch_timeout), + ).await?; + + // Start components + let storage_handle = tokio::spawn(async move { + storage_poller.start_polling().await + }); + + let keeper_handle = tokio::spawn(async move { + keeper.run().await + }); + + // Wait for components to finish + tokio::try_join!(storage_handle, keeper_handle)?; + + Ok(()) +} diff --git a/apps/argus/src/storage.rs b/apps/argus/src/storage.rs new file mode 100644 index 0000000000..33e79115cd --- /dev/null +++ b/apps/argus/src/storage.rs @@ -0,0 +1,164 @@ +use { + crate::types::PriceUpdateRequest, + alloy::{ + primitives::{Address, U256}, + providers::Provider, + }, + anyhow::Result, + prometheus_client::{ + metrics::{counter::Counter, family::Family}, + registry::Registry, + }, + sha3::{Digest, Keccak256}, + std::{sync::Arc, time::Duration}, + tokio::{sync::mpsc, time}, +}; + +const NUM_REQUESTS: u8 = 32; +const NUM_REQUESTS_MASK: u8 = 0x1f; + +#[derive(Clone, Debug)] +pub struct StorageMetrics { + pub requests_found: Counter, + pub polling_errors: Counter, +} + +pub struct StoragePoller { + provider: Arc, + contract_addr: Address, + poll_interval: Duration, + request_tx: mpsc::Sender, + metrics: Arc, +} + +impl StoragePoller { + pub async fn new( + provider: Arc, + contract_addr: Address, + poll_interval: Duration, + request_tx: mpsc::Sender, + metrics: Arc, + ) -> Result { + Ok(Self { + provider, + contract_addr, + poll_interval, + request_tx, + metrics, + }) + } + + pub async fn start_polling(&self) -> Result<()> { + loop { + match self.poll_requests().await { + Ok(requests) => { + for request in requests { + if let Err(e) = self.request_tx.send(request).await { + tracing::error!("Failed to send request to keeper: {}", e); + self.metrics.polling_errors.inc(); + } else { + self.metrics.requests_found.inc(); + } + } + } + Err(e) => { + tracing::error!("Error polling requests: {}", e); + self.metrics.polling_errors.inc(); + } + } + + time::sleep(self.poll_interval).await; + } + } + + async fn poll_requests(&self) -> Result> { + let mut requests = Vec::new(); + + // The Pulse contract has a fixed array of 32 requests and an overflow mapping + // First read the fixed array (slot 2 in the contract) + for i in 0..NUM_REQUESTS { + let slot = self.calculate_request_slot(i); + let request = self.read_request_at_slot(slot).await?; + + // sequence_number == 0 means empty/inactive request + if request.sequence_number.as_u64() != 0 { + requests.push(request); + } + } + + // TODO: Read overflow mapping if needed + // The overflow mapping is used when there's a hash collision in the fixed array + // We'll need to read slot keccak256(key, OVERFLOW_SLOT) where key is keccak256(provider, sequence) + + Ok(requests) + } + + fn calculate_request_slot(&self, index: u8) -> U256 { + // In the Pulse contract, the requests array is at slot 2 + // For arrays, Solidity stores data at: keccak256(slot) + index + const REQUESTS_SLOT: u8 = 2; + + // Calculate base slot for requests array + let base_slot = U256::from(REQUESTS_SLOT); + + // Calculate actual slot: keccak256(slot) + index + let array_slot = keccak256(&base_slot.to_be_bytes::<32>()); + U256::from_be_bytes(array_slot) + U256::from(index) + } + + async fn read_request_at_slot(&self, slot: U256) -> Result { + // Each Request struct takes multiple slots: + // slot + 0: provider (address) and sequence_number (uint64) packed together + // slot + 1: publish_time (uint256) + // slot + 2: priceIds array length + // slot + 3: callback_gas_limit (uint256) + // slot + 4: requester (address) + // priceIds array is stored starting at keccak256(slot + 2) + + let slot_0 = self.provider.get_storage_at(self.contract_addr, slot).await?; + let slot_1 = self.provider.get_storage_at(self.contract_addr, slot + 1).await?; + let slot_2 = self.provider.get_storage_at(self.contract_addr, slot + 2).await?; + let slot_3 = self.provider.get_storage_at(self.contract_addr, slot + 3).await?; + let slot_4 = self.provider.get_storage_at(self.contract_addr, slot + 4).await?; + + // Parse provider (20 bytes) and sequence_number (8 bytes) from slot_0 + let provider = Address::from_slice(&slot_0[0..20]); + let sequence_number = U64::from_be_bytes(slot_0[20..28].try_into()?); + + // Parse publish_time + let publish_time = U256::from_be_bytes(slot_1); + + // Parse price IDs array + let price_ids_length = U256::from_be_bytes(slot_2).as_usize(); + let mut price_ids = Vec::with_capacity(price_ids_length); + + if price_ids_length > 0 { + let price_ids_slot = keccak256(&(slot + 2).to_be_bytes::<32>()); + for i in 0..price_ids_length { + let price_id_slot = U256::from_be_bytes(price_ids_slot) + U256::from(i); + let price_id_data = self.provider.get_storage_at(self.contract_addr, price_id_slot).await?; + price_ids.push(price_id_data.try_into()?); + } + } + + // Parse callback gas limit and requester + let callback_gas_limit = U256::from_be_bytes(slot_3); + let requester = Address::from_slice(&slot_4[0..20]); + + Ok(PriceUpdateRequest { + provider, + sequence_number, + publish_time, + price_ids, + callback_gas_limit, + requester, + }) + } +} + +// Helper function to calculate keccak256 hash +fn keccak256(data: &[u8]) -> [u8; 32] { + let mut hasher = Keccak256::new(); + hasher.update(data); + hasher.finalize().into() +} diff --git a/apps/argus/src/types.rs b/apps/argus/src/types.rs new file mode 100644 index 0000000000..2c923d1da2 --- /dev/null +++ b/apps/argus/src/types.rs @@ -0,0 +1,32 @@ +use { + alloy::{ + primitives::{Address, U256, U64}, + vec::Vec, + }, + serde::{Deserialize, Serialize}, +}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PriceUpdateRequest { + pub provider: Address, + pub sequence_number: U64, + pub publish_time: U256, + pub price_ids: Vec<[u8; 32]>, + pub callback_gas_limit: U256, + pub requester: Address, +} + +#[derive(Debug, Clone)] +pub struct PriceData { + pub price: i64, + pub conf: u64, + pub expo: i32, + pub publish_time: u64, +} + +#[derive(Debug, Clone)] +pub struct UpdateBatch { + pub requests: Vec, + pub price_data: Vec, + pub update_data: Vec>, +}