From e319a2b692e5a9ec1a04500288527ef72e7f61df Mon Sep 17 00:00:00 2001 From: tomyrd Date: Mon, 23 Dec 2024 12:22:24 -0300 Subject: [PATCH 1/5] feat: give interior mutability to `NodeRpcClient` --- bin/miden-cli/src/lib.rs | 12 ++- bin/miden-cli/src/tests.rs | 6 +- crates/rust-client/Cargo.toml | 2 +- crates/rust-client/src/mock.rs | 19 ++-- crates/rust-client/src/rpc/mod.rs | 37 ++++---- .../rust-client/src/rpc/tonic_client/mod.rs | 86 +++++++------------ .../src/rpc/web_tonic_client/mod.rs | 19 ++-- crates/rust-client/src/tests.rs | 2 +- tests/integration/common.rs | 2 +- tests/integration/main.rs | 2 +- 10 files changed, 81 insertions(+), 106 deletions(-) diff --git a/bin/miden-cli/src/lib.rs b/bin/miden-cli/src/lib.rs index 42107c36b..7e02a57ad 100644 --- a/bin/miden-cli/src/lib.rs +++ b/bin/miden-cli/src/lib.rs @@ -111,10 +111,14 @@ impl Cli { let authenticator = StoreAuthenticator::new_with_rng(store.clone() as Arc, rng); let client = Client::new( - Box::new(TonicRpcClient::new( - cli_config.rpc.endpoint.clone().into(), - cli_config.rpc.timeout_ms, - )), + Box::new( + TonicRpcClient::new( + cli_config.rpc.endpoint.clone().into(), + cli_config.rpc.timeout_ms, + ) + .await + .map_err(ClientError::RpcError)?, + ), rng, store as Arc, Arc::new(authenticator), diff --git a/bin/miden-cli/src/tests.rs b/bin/miden-cli/src/tests.rs index ed3fce17f..469d49082 100644 --- a/bin/miden-cli/src/tests.rs +++ b/bin/miden-cli/src/tests.rs @@ -631,7 +631,11 @@ async fn create_test_client_with_store_path(store_path: &Path) -> TestClient { let authenticator = StoreAuthenticator::new_with_rng(store.clone(), rng); TestClient::new( - Box::new(TonicRpcClient::new(rpc_config.endpoint.into(), rpc_config.timeout_ms)), + Box::new( + TonicRpcClient::new(rpc_config.endpoint.into(), rpc_config.timeout_ms) + .await + .unwrap(), + ), rng, store, std::sync::Arc::new(authenticator), diff --git a/crates/rust-client/Cargo.toml b/crates/rust-client/Cargo.toml index 69038a635..b927c8c4e 100644 --- a/crates/rust-client/Cargo.toml +++ b/crates/rust-client/Cargo.toml @@ -22,7 +22,7 @@ idxdb = ["dep:base64", "dep:serde-wasm-bindgen", "dep:wasm-bindgen", "dep:wasm-b sqlite = ["dep:rusqlite", "dep:deadpool-sqlite", "std"] std = ["miden-objects/std"] testing = ["miden-objects/testing", "miden-lib/testing", "miden-tx/testing"] -tonic = ["dep:hex", "dep:prost", "dep:tonic", "std", "tonic/transport"] +tonic = ["dep:hex", "dep:prost", "dep:tonic", "std", "tonic/transport", "tokio"] web-tonic = ["dep:hex", "dep:prost", "dep:tonic", "dep:tonic-web-wasm-client"] [dependencies] diff --git a/crates/rust-client/src/mock.rs b/crates/rust-client/src/mock.rs index cce2bbf97..35af2ddaf 100644 --- a/crates/rust-client/src/mock.rs +++ b/crates/rust-client/src/mock.rs @@ -206,7 +206,7 @@ use alloc::boxed::Box; #[async_trait(?Send)] impl NodeRpcClient for MockRpcApi { async fn sync_notes( - &mut self, + &self, _block_num: u32, _note_tags: &[NoteTag], ) -> Result { @@ -222,7 +222,7 @@ impl NodeRpcClient for MockRpcApi { /// Executes the specified sync state request and returns the response. async fn sync_state( - &mut self, + &self, block_num: u32, _account_ids: &[AccountId], _note_tags: &[NoteTag], @@ -237,7 +237,7 @@ impl NodeRpcClient for MockRpcApi { /// Creates and executes a [GetBlockHeaderByNumberRequest]. /// Only used for retrieving genesis block right now so that's the only case we need to cover. async fn get_block_header_by_number( - &mut self, + &self, block_num: Option, include_mmr_proof: bool, ) -> Result<(BlockHeader, Option), RpcError> { @@ -259,7 +259,7 @@ impl NodeRpcClient for MockRpcApi { Ok((block.header(), mmr_proof)) } - async fn get_notes_by_id(&mut self, note_ids: &[NoteId]) -> Result, RpcError> { + async fn get_notes_by_id(&self, note_ids: &[NoteId]) -> Result, RpcError> { // assume all off-chain notes for now let hit_notes = note_ids.iter().filter_map(|id| self.notes.get(id)); let mut return_notes = vec![]; @@ -285,22 +285,19 @@ impl NodeRpcClient for MockRpcApi { } async fn submit_proven_transaction( - &mut self, + &self, _proven_transaction: ProvenTransaction, ) -> std::result::Result<(), RpcError> { // TODO: add some basic validations to test error cases Ok(()) } - async fn get_account_update( - &mut self, - _account_id: AccountId, - ) -> Result { + async fn get_account_update(&self, _account_id: AccountId) -> Result { panic!("shouldn't be used for now") } async fn get_account_proofs( - &mut self, + &self, _account_ids: &BTreeSet, _code_commitments: Vec, _include_headers: bool, @@ -310,7 +307,7 @@ impl NodeRpcClient for MockRpcApi { } async fn check_nullifiers_by_prefix( - &mut self, + &self, _prefix: &[u16], ) -> Result, RpcError> { // Always return an empty list for now since it's only used when importing diff --git a/crates/rust-client/src/rpc/mod.rs b/crates/rust-client/src/rpc/mod.rs index fa79ddb15..754c70907 100644 --- a/crates/rust-client/src/rpc/mod.rs +++ b/crates/rust-client/src/rpc/mod.rs @@ -57,43 +57,43 @@ pub trait NodeRpcClient { /// Given a Proven Transaction, send it to the node for it to be included in a future block /// using the `/SubmitProvenTransaction` RPC endpoint. async fn submit_proven_transaction( - &mut self, + &self, proven_transaction: ProvenTransaction, ) -> Result<(), RpcError>; /// Given a block number, fetches the block header corresponding to that height from the node /// using the `/GetBlockHeaderByNumber` endpoint. /// If `include_mmr_proof` is set to true and the function returns an `Ok`, the second value - /// of the return tuple should always be Some(MmrProof). + /// of the return tuple should always be Some(MmrProof) /// /// When `None` is provided, returns info regarding the latest block. async fn get_block_header_by_number( - &mut self, + &self, block_num: Option, include_mmr_proof: bool, ) -> Result<(BlockHeader, Option), RpcError>; - /// Fetches note-related data for a list of [NoteId] using the `/GetNotesById` rpc endpoint. + /// Fetches note-related data for a list of [NoteId] using the `/GetNotesById` rpc endpoint /// /// For any NoteType::Private note, the return data is only the /// [miden_objects::notes::NoteMetadata], whereas for NoteType::Onchain notes, the return /// data includes all details. - async fn get_notes_by_id(&mut self, note_ids: &[NoteId]) -> Result, RpcError>; + async fn get_notes_by_id(&self, note_ids: &[NoteId]) -> Result, RpcError>; /// Fetches info from the node necessary to perform a state sync using the - /// `/SyncState` RPC endpoint. + /// `/SyncState` RPC endpoint /// /// - `block_num` is the last block number known by the client. The returned [StateSyncInfo] /// should contain data starting from the next block, until the first block which contains a /// note of matching the requested tag, or the chain tip if there are no notes. - /// - `account_ids` is a list of account IDs and determines the accounts the client is + /// - `account_ids` is a list of account ids and determines the accounts the client is /// interested in and should receive account updates of. /// - `note_tags` is a list of tags used to filter the notes the client is interested in, which - /// serves as a "note group" filter. Notice that you can't filter by a specific note ID. + /// serves as a "note group" filter. Notice that you can't filter by a specific note id /// - `nullifiers_tags` similar to `note_tags`, is a list of tags used to filter the nullifiers /// corresponding to some notes the client is interested in. async fn sync_state( - &mut self, + &self, block_num: u32, account_ids: &[AccountId], note_tags: &[NoteTag], @@ -101,16 +101,13 @@ pub trait NodeRpcClient { ) -> Result; /// Fetches the current state of an account from the node using the `/GetAccountDetails` RPC - /// endpoint. + /// endpoint /// - /// - `account_id` is the ID of the wanted account. - async fn get_account_update( - &mut self, - account_id: AccountId, - ) -> Result; + /// - `account_id` is the id of the wanted account. + async fn get_account_update(&self, account_id: AccountId) -> Result; async fn sync_notes( - &mut self, + &self, block_num: u32, note_tags: &[NoteTag], ) -> Result; @@ -118,24 +115,24 @@ pub trait NodeRpcClient { /// Fetches the nullifiers corresponding to a list of prefixes using the /// `/CheckNullifiersByPrefix` RPC endpoint. async fn check_nullifiers_by_prefix( - &mut self, + &self, prefix: &[u16], ) -> Result, RpcError>; /// Fetches the current account state, using th `/GetAccountProofs` RPC endpoint. async fn get_account_proofs( - &mut self, + &self, account_ids: &BTreeSet, known_account_codes: Vec, include_headers: bool, ) -> Result; - /// Fetches the commit height where the nullifier was consumed. If the nullifier isn't found, + /// Fetches the commit height where the nullifier was consumed. If the nullifier is not found, /// then `None` is returned. /// /// The default implementation of this method uses [NodeRpcClient::check_nullifiers_by_prefix]. async fn get_nullifier_commit_height( - &mut self, + &self, nullifier: &Nullifier, ) -> Result, RpcError> { let nullifiers = diff --git a/crates/rust-client/src/rpc/tonic_client/mod.rs b/crates/rust-client/src/rpc/tonic_client/mod.rs index be54f111a..1c2138f1e 100644 --- a/crates/rust-client/src/rpc/tonic_client/mod.rs +++ b/crates/rust-client/src/rpc/tonic_client/mod.rs @@ -1,9 +1,4 @@ -use alloc::{ - boxed::Box, - collections::BTreeSet, - string::{String, ToString}, - vec::Vec, -}; +use alloc::{boxed::Box, collections::BTreeSet, string::ToString, vec::Vec}; use std::{collections::BTreeMap, time::Duration}; use async_trait::async_trait; @@ -16,6 +11,7 @@ use miden_objects::{ BlockHeader, Digest, }; use miden_tx::utils::Serializable; +use tokio::sync::RwLock; use tonic::transport::Channel; use tracing::info; @@ -40,52 +36,35 @@ use crate::rpc::generated::requests::GetBlockHeaderByNumberRequest; // ================================================================================================ /// Client for the Node RPC API using tonic. -/// -/// Wraps the ApiClient which defers establishing a connection with a node until necessary. pub struct TonicRpcClient { - rpc_api: Option>, - endpoint: String, - timeout_ms: u64, + rpc_api: RwLock>, } impl TonicRpcClient { /// Returns a new instance of [TonicRpcClient] that'll do calls to the provided [Endpoint] with - /// the given timeout in milliseconds. - pub fn new(endpoint: Endpoint, timeout_ms: u64) -> TonicRpcClient { - TonicRpcClient { - rpc_api: None, - endpoint: endpoint.to_string(), - timeout_ms, - } - } + /// the given timeout in milliseconds + pub async fn new(endpoint: Endpoint, timeout_ms: u64) -> Result { + let endpoint = tonic::transport::Endpoint::try_from(endpoint.to_string()) + .map_err(|err| RpcError::ConnectionError(err.to_string()))? + .timeout(Duration::from_millis(timeout_ms)); + let rpc_api = ApiClient::connect(endpoint) + .await + .map_err(|err| RpcError::ConnectionError(err.to_string()))?; - /// Takes care of establishing the RPC connection if not connected yet and returns a reference - /// to the inner ApiClient. - async fn rpc_api(&mut self) -> Result<&mut ApiClient, RpcError> { - if self.rpc_api.is_some() { - Ok(self.rpc_api.as_mut().unwrap()) - } else { - let endpoint = tonic::transport::Endpoint::try_from(self.endpoint.clone()) - .map_err(|err| RpcError::ConnectionError(err.to_string()))? - .timeout(Duration::from_millis(self.timeout_ms)); - let rpc_api = ApiClient::connect(endpoint) - .await - .map_err(|err| RpcError::ConnectionError(err.to_string()))?; - Ok(self.rpc_api.insert(rpc_api)) - } + Ok(TonicRpcClient { rpc_api: RwLock::new(rpc_api) }) } } #[async_trait(?Send)] impl NodeRpcClient for TonicRpcClient { async fn submit_proven_transaction( - &mut self, + &self, proven_transaction: ProvenTransaction, ) -> Result<(), RpcError> { let request = SubmitProvenTransactionRequest { transaction: proven_transaction.to_bytes(), }; - let rpc_api = self.rpc_api().await?; + let mut rpc_api = self.rpc_api.write().await; rpc_api.submit_proven_transaction(request).await.map_err(|err| { RpcError::RequestError( NodeRpcClientEndpoint::SubmitProvenTx.to_string(), @@ -97,7 +76,7 @@ impl NodeRpcClient for TonicRpcClient { } async fn get_block_header_by_number( - &mut self, + &self, block_num: Option, include_mmr_proof: bool, ) -> Result<(BlockHeader, Option), RpcError> { @@ -108,7 +87,7 @@ impl NodeRpcClient for TonicRpcClient { info!("Calling GetBlockHeaderByNumber: {:?}", request); - let rpc_api = self.rpc_api().await?; + let mut rpc_api = self.rpc_api.write().await; let api_response = rpc_api.get_block_header_by_number(request).await.map_err(|err| { RpcError::RequestError( NodeRpcClientEndpoint::GetBlockHeaderByNumber.to_string(), @@ -144,11 +123,11 @@ impl NodeRpcClient for TonicRpcClient { Ok((block_header, mmr_proof)) } - async fn get_notes_by_id(&mut self, note_ids: &[NoteId]) -> Result, RpcError> { + async fn get_notes_by_id(&self, note_ids: &[NoteId]) -> Result, RpcError> { let request = GetNotesByIdRequest { note_ids: note_ids.iter().map(|id| id.inner().into()).collect(), }; - let rpc_api = self.rpc_api().await?; + let mut rpc_api = self.rpc_api.write().await; let api_response = rpc_api.get_notes_by_id(request).await.map_err(|err| { RpcError::RequestError( NodeRpcClientEndpoint::GetBlockHeaderByNumber.to_string(), @@ -198,7 +177,7 @@ impl NodeRpcClient for TonicRpcClient { /// Sends a sync state request to the Miden node, validates and converts the response /// into a [StateSyncInfo] struct. async fn sync_state( - &mut self, + &self, block_num: u32, account_ids: &[AccountId], note_tags: &[NoteTag], @@ -217,7 +196,7 @@ impl NodeRpcClient for TonicRpcClient { nullifiers, }; - let rpc_api = self.rpc_api().await?; + let mut rpc_api = self.rpc_api.write().await; let response = rpc_api.sync_state(request).await.map_err(|err| { RpcError::RequestError(NodeRpcClientEndpoint::SyncState.to_string(), err.to_string()) })?; @@ -231,17 +210,14 @@ impl NodeRpcClient for TonicRpcClient { /// /// This function will return an error if: /// - /// - There was an error sending the request to the node. + /// - There was an error sending the request to the node /// - The answer had a `None` for one of the expected fields (account, summary, account_hash, /// details). - /// - There is an error during [Account] deserialization. - async fn get_account_update( - &mut self, - account_id: AccountId, - ) -> Result { + /// - There is an error during [Account] deserialization + async fn get_account_update(&self, account_id: AccountId) -> Result { let request = GetAccountDetailsRequest { account_id: Some(account_id.into()) }; - let rpc_api = self.rpc_api().await?; + let mut rpc_api = self.rpc_api.write().await; let response = rpc_api.get_account_details(request).await.map_err(|err| { RpcError::RequestError( @@ -285,12 +261,12 @@ impl NodeRpcClient for TonicRpcClient { /// /// This function will return an error if: /// - /// - One of the requested Accounts isn't public, or isn't returned by the node. + /// - One of the requested Accounts is not public, or is not returned by the node. /// - There was an error sending the request to the node. /// - The answer had a `None` for one of the expected fields. /// - There is an error during storage deserialization. async fn get_account_proofs( - &mut self, + &self, account_ids: &BTreeSet, known_account_codes: Vec, include_headers: bool, @@ -306,7 +282,7 @@ impl NodeRpcClient for TonicRpcClient { code_commitments: known_account_codes.keys().map(|c| c.into()).collect(), }; - let rpc_api = self.rpc_api().await?; + let mut rpc_api = self.rpc_api.write().await; let response = rpc_api .get_account_proofs(request) .await @@ -362,7 +338,7 @@ impl NodeRpcClient for TonicRpcClient { } async fn sync_notes( - &mut self, + &self, block_num: u32, note_tags: &[NoteTag], ) -> Result { @@ -370,7 +346,7 @@ impl NodeRpcClient for TonicRpcClient { let request = SyncNoteRequest { block_num, note_tags }; - let rpc_api = self.rpc_api().await?; + let mut rpc_api = self.rpc_api.write().await; let response = rpc_api.sync_notes(request).await.map_err(|err| { RpcError::RequestError(NodeRpcClientEndpoint::SyncNotes.to_string(), err.to_string()) @@ -380,14 +356,14 @@ impl NodeRpcClient for TonicRpcClient { } async fn check_nullifiers_by_prefix( - &mut self, + &self, prefixes: &[u16], ) -> Result, RpcError> { let request = CheckNullifiersByPrefixRequest { nullifiers: prefixes.iter().map(|&x| x as u32).collect(), prefix_len: 16, }; - let rpc_api = self.rpc_api().await?; + let mut rpc_api = self.rpc_api.write().await; let response = rpc_api.check_nullifiers_by_prefix(request).await.map_err(|err| { RpcError::RequestError( NodeRpcClientEndpoint::CheckNullifiersByPrefix.to_string(), diff --git a/crates/rust-client/src/rpc/web_tonic_client/mod.rs b/crates/rust-client/src/rpc/web_tonic_client/mod.rs index a16825964..632a8d594 100644 --- a/crates/rust-client/src/rpc/web_tonic_client/mod.rs +++ b/crates/rust-client/src/rpc/web_tonic_client/mod.rs @@ -52,7 +52,7 @@ impl WebTonicRpcClient { #[async_trait(?Send)] impl NodeRpcClient for WebTonicRpcClient { async fn submit_proven_transaction( - &mut self, + &self, proven_transaction: ProvenTransaction, ) -> Result<(), RpcError> { let mut query_client = self.build_api_client(); @@ -72,7 +72,7 @@ impl NodeRpcClient for WebTonicRpcClient { } async fn get_block_header_by_number( - &mut self, + &self, block_num: Option, include_mmr_proof: bool, ) -> Result<(BlockHeader, Option), RpcError> { @@ -121,7 +121,7 @@ impl NodeRpcClient for WebTonicRpcClient { Ok((block_header, mmr_proof)) } - async fn get_notes_by_id(&mut self, note_ids: &[NoteId]) -> Result, RpcError> { + async fn get_notes_by_id(&self, note_ids: &[NoteId]) -> Result, RpcError> { let mut query_client = self.build_api_client(); let request = GetNotesByIdRequest { @@ -176,7 +176,7 @@ impl NodeRpcClient for WebTonicRpcClient { /// Sends a sync state request to the Miden node, validates and converts the response /// into a [StateSyncInfo] struct. async fn sync_state( - &mut self, + &self, block_num: u32, account_ids: &[AccountId], note_tags: &[NoteTag], @@ -202,7 +202,7 @@ impl NodeRpcClient for WebTonicRpcClient { } async fn sync_notes( - &mut self, + &self, block_num: u32, note_tags: &[NoteTag], ) -> Result { @@ -230,7 +230,7 @@ impl NodeRpcClient for WebTonicRpcClient { /// - The answer had a `None` for one of the expected fields. /// - There is an error during storage deserialization. async fn get_account_proofs( - &mut self, + &self, account_ids: &BTreeSet, known_account_codes: Vec, include_headers: bool, @@ -315,10 +315,7 @@ impl NodeRpcClient for WebTonicRpcClient { /// - The answer had a `None` for its account, or the account had a `None` at the `details` /// field. /// - There is an error during [Account] deserialization. - async fn get_account_update( - &mut self, - account_id: AccountId, - ) -> Result { + async fn get_account_update(&self, account_id: AccountId) -> Result { let mut query_client = self.build_api_client(); let request = GetAccountDetailsRequest { account_id: Some(account_id.into()) }; @@ -360,7 +357,7 @@ impl NodeRpcClient for WebTonicRpcClient { } async fn check_nullifiers_by_prefix( - &mut self, + &self, prefixes: &[u16], ) -> Result, RpcError> { let mut query_client = self.build_api_client(); diff --git a/crates/rust-client/src/tests.rs b/crates/rust-client/src/tests.rs index d435ffcc0..374eba4b7 100644 --- a/crates/rust-client/src/tests.rs +++ b/crates/rust-client/src/tests.rs @@ -332,7 +332,7 @@ async fn test_sync_state() { #[tokio::test] async fn test_sync_state_mmr() { // generate test client with a random store name - let (mut client, mut rpc_api) = create_test_client().await; + let (mut client, rpc_api) = create_test_client().await; // Import note and create wallet so that synced notes do not get discarded (due to being // irrelevant) insert_new_wallet(&mut client, AccountStorageMode::Private).await.unwrap(); diff --git a/tests/integration/common.rs b/tests/integration/common.rs index 6fad6ca66..244b8c2fb 100644 --- a/tests/integration/common.rs +++ b/tests/integration/common.rs @@ -60,7 +60,7 @@ pub async fn create_test_client() -> TestClient { let authenticator = StoreAuthenticator::new_with_rng(store.clone(), rng); TestClient::new( - Box::new(TonicRpcClient::new(rpc_endpoint, rpc_timeout)), + Box::new(TonicRpcClient::new(rpc_endpoint, rpc_timeout).await.unwrap()), rng, store, Arc::new(authenticator), diff --git a/tests/integration/main.rs b/tests/integration/main.rs index 36e6bd135..560b7c6a3 100644 --- a/tests/integration/main.rs +++ b/tests/integration/main.rs @@ -797,7 +797,7 @@ async fn test_get_account_update() { // [AccountDetails] should be received. // TODO: should we expose the `get_account_update` endpoint from the Client? let (endpoint, timeout, _) = get_client_config(); - let mut rpc_api = TonicRpcClient::new(endpoint, timeout); + let rpc_api = TonicRpcClient::new(endpoint, timeout).await.unwrap(); let details1 = rpc_api.get_account_update(basic_wallet_1.id()).await.unwrap(); let details2 = rpc_api.get_account_update(basic_wallet_2.id()).await.unwrap(); From c4bf4eaa0bd8fa1f8b6de8a4585e732a0ed7f52f Mon Sep 17 00:00:00 2001 From: tomyrd Date: Mon, 23 Dec 2024 13:43:49 -0300 Subject: [PATCH 2/5] refactor: move client's rpc_api to Arc --- bin/miden-cli/src/lib.rs | 2 +- bin/miden-cli/src/tests.rs | 7 ++++--- crates/rust-client/src/lib.rs | 6 +++--- crates/rust-client/src/mock.rs | 4 ++-- crates/web-client/src/lib.rs | 2 +- tests/integration/common.rs | 2 +- 6 files changed, 12 insertions(+), 11 deletions(-) diff --git a/bin/miden-cli/src/lib.rs b/bin/miden-cli/src/lib.rs index 7e02a57ad..916b6474a 100644 --- a/bin/miden-cli/src/lib.rs +++ b/bin/miden-cli/src/lib.rs @@ -111,7 +111,7 @@ impl Cli { let authenticator = StoreAuthenticator::new_with_rng(store.clone() as Arc, rng); let client = Client::new( - Box::new( + Arc::new( TonicRpcClient::new( cli_config.rpc.endpoint.clone().into(), cli_config.rpc.timeout_ms, diff --git a/bin/miden-cli/src/tests.rs b/bin/miden-cli/src/tests.rs index 469d49082..6c4e8c902 100644 --- a/bin/miden-cli/src/tests.rs +++ b/bin/miden-cli/src/tests.rs @@ -3,6 +3,7 @@ use std::{ fs::File, io::Read, path::{Path, PathBuf}, + sync::Arc, }; use assert_cmd::Command; @@ -621,7 +622,7 @@ async fn create_test_client_with_store_path(store_path: &Path) -> TestClient { let store = { let sqlite_store = SqliteStore::new(PathBuf::from(store_path)).await.unwrap(); - std::sync::Arc::new(sqlite_store) + Arc::new(sqlite_store) }; let mut rng = rand::thread_rng(); @@ -631,14 +632,14 @@ async fn create_test_client_with_store_path(store_path: &Path) -> TestClient { let authenticator = StoreAuthenticator::new_with_rng(store.clone(), rng); TestClient::new( - Box::new( + Arc::new( TonicRpcClient::new(rpc_config.endpoint.into(), rpc_config.timeout_ms) .await .unwrap(), ), rng, store, - std::sync::Arc::new(authenticator), + Arc::new(authenticator), true, ) } diff --git a/crates/rust-client/src/lib.rs b/crates/rust-client/src/lib.rs index 09811317b..4e4315f44 100644 --- a/crates/rust-client/src/lib.rs +++ b/crates/rust-client/src/lib.rs @@ -115,7 +115,7 @@ pub struct Client { rng: R, /// An instance of [NodeRpcClient] which provides a way for the client to connect to the /// Miden node. - rpc_api: Box, + rpc_api: Arc, /// An instance of a [LocalTransactionProver] which will be the default prover for the client. tx_prover: Arc, tx_executor: TransactionExecutor, @@ -147,7 +147,7 @@ impl Client { /// /// Returns an error if the client couldn't be instantiated. pub fn new( - rpc_api: Box, + rpc_api: Arc, rng: R, store: Arc, authenticator: Arc, @@ -182,7 +182,7 @@ impl Client { // -------------------------------------------------------------------------------------------- #[cfg(any(test, feature = "testing"))] - pub fn test_rpc_api(&mut self) -> &mut Box { + pub fn test_rpc_api(&mut self) -> &mut Arc { &mut self.rpc_api } diff --git a/crates/rust-client/src/mock.rs b/crates/rust-client/src/mock.rs index 35af2ddaf..e52c6ac19 100644 --- a/crates/rust-client/src/mock.rs +++ b/crates/rust-client/src/mock.rs @@ -329,9 +329,9 @@ pub async fn create_test_client() -> (MockClient, MockRpcApi) { let authenticator = StoreAuthenticator::new_with_rng(store.clone(), rng); let rpc_api = MockRpcApi::new(); - let boxed_rpc_api = Box::new(rpc_api.clone()); + let arc_rpc_api = Arc::new(rpc_api.clone()); - let client = MockClient::new(boxed_rpc_api, rng, store, Arc::new(authenticator), true); + let client = MockClient::new(arc_rpc_api, rng, store, Arc::new(authenticator), true); (client, rpc_api) } diff --git a/crates/web-client/src/lib.rs b/crates/web-client/src/lib.rs index 7acf4dd91..f16051a5e 100644 --- a/crates/web-client/src/lib.rs +++ b/crates/web-client/src/lib.rs @@ -66,7 +66,7 @@ impl WebClient { .map_err(|_| JsValue::from_str("Failed to initialize WebStore"))?; let web_store = Arc::new(web_store); let authenticator = Arc::new(StoreAuthenticator::new_with_rng(web_store.clone(), rng)); - let web_rpc_client = Box::new(WebTonicRpcClient::new( + let web_rpc_client = Arc::new(WebTonicRpcClient::new( &node_url.unwrap_or_else(|| "http://18.203.155.106:57291".to_string()), )); diff --git a/tests/integration/common.rs b/tests/integration/common.rs index 244b8c2fb..33e5dd7ef 100644 --- a/tests/integration/common.rs +++ b/tests/integration/common.rs @@ -60,7 +60,7 @@ pub async fn create_test_client() -> TestClient { let authenticator = StoreAuthenticator::new_with_rng(store.clone(), rng); TestClient::new( - Box::new(TonicRpcClient::new(rpc_endpoint, rpc_timeout).await.unwrap()), + Arc::new(TonicRpcClient::new(rpc_endpoint, rpc_timeout).await.unwrap()), rng, store, Arc::new(authenticator), From 3596f3811be4d847cb8e9c7482cc3f27838115a1 Mon Sep 17 00:00:00 2001 From: tomyrd Date: Mon, 23 Dec 2024 14:41:04 -0300 Subject: [PATCH 3/5] feat: add `StateSync` component --- crates/rust-client/src/components/mod.rs | 1 + .../rust-client/src/components/sync_state.rs | 581 ++++++++++++++++++ crates/rust-client/src/lib.rs | 1 + crates/rust-client/src/notes/import.rs | 4 +- crates/rust-client/src/store/mod.rs | 43 +- .../rust-client/src/store/sqlite_store/mod.rs | 3 +- .../src/store/sqlite_store/sync.rs | 3 +- crates/rust-client/src/store/web_store/mod.rs | 3 +- .../src/store/web_store/sync/mod.rs | 3 +- crates/rust-client/src/sync/block_headers.rs | 101 +-- crates/rust-client/src/sync/mod.rs | 541 +--------------- crates/rust-client/src/tests.rs | 2 +- crates/rust-client/src/transactions/mod.rs | 2 +- 13 files changed, 673 insertions(+), 615 deletions(-) create mode 100644 crates/rust-client/src/components/mod.rs create mode 100644 crates/rust-client/src/components/sync_state.rs diff --git a/crates/rust-client/src/components/mod.rs b/crates/rust-client/src/components/mod.rs new file mode 100644 index 000000000..6d98b3c2a --- /dev/null +++ b/crates/rust-client/src/components/mod.rs @@ -0,0 +1 @@ +pub mod sync_state; diff --git a/crates/rust-client/src/components/sync_state.rs b/crates/rust-client/src/components/sync_state.rs new file mode 100644 index 000000000..2f625c9bd --- /dev/null +++ b/crates/rust-client/src/components/sync_state.rs @@ -0,0 +1,581 @@ +use alloc::{collections::BTreeMap, sync::Arc, vec::Vec}; + +use miden_objects::{ + accounts::{Account, AccountHeader, AccountId}, + crypto::merkle::{InOrderIndex, MmrDelta, MmrPeaks, PartialMmr}, + notes::{NoteId, NoteInclusionProof, NoteTag, Nullifier}, + transaction::TransactionId, + BlockHeader, Digest, +}; +use tracing::*; + +use crate::{ + accounts::AccountUpdates, + notes::{NoteScreener, NoteUpdates}, + rpc::{ + domain::{ + accounts::AccountDetails, + notes::{CommittedNote, NoteDetails}, + nullifiers::NullifierUpdate, + transactions::TransactionUpdate, + }, + NodeRpcClient, RpcError, + }, + store::{ + input_note_states::CommittedNoteState, InputNoteRecord, NoteFilter, OutputNoteRecord, + Store, StoreError, TransactionFilter, + }, + sync::{get_nullifier_prefix, NoteTagRecord, SyncSummary}, + ClientError, +}; + +/// Contains all information needed to apply the update in the store after syncing with the node. +pub struct StateSyncUpdate { + /// The new block header, returned as part of the + /// [StateSyncInfo](crate::rpc::domain::sync::StateSyncInfo) + pub block_header: BlockHeader, + /// Information about note changes after the sync. + pub note_updates: NoteUpdates, + /// Transaction updates for any transaction that was committed between the sync request's + /// block number and the response's block number. + pub transactions_to_commit: Vec, + /// Transaction IDs for any transactions that were discarded in the sync. + pub transactions_to_discard: Vec, + /// New MMR peaks for the locally tracked MMR of the blockchain. + pub new_mmr_peaks: MmrPeaks, + /// New authentications nodes that are meant to be stored in order to authenticate block + /// headers. + pub new_authentication_nodes: Vec<(InOrderIndex, Digest)>, + /// Information abount account changes after the sync. + pub updated_accounts: AccountUpdates, + /// Whether the block header has notes relevant to the client. + pub block_has_relevant_notes: bool, + /// Tag records that are no longer relevant. + pub tags_to_remove: Vec, +} + +impl From<&StateSyncUpdate> for SyncSummary { + fn from(value: &StateSyncUpdate) -> Self { + SyncSummary::new( + value.block_header.block_num(), + value.note_updates.new_input_notes().iter().map(|n| n.id()).collect(), + value.note_updates.committed_note_ids().into_iter().collect(), + value.note_updates.consumed_note_ids().into_iter().collect(), + value + .updated_accounts + .updated_onchain_accounts() + .iter() + .map(|acc| acc.id()) + .collect(), + value + .updated_accounts + .mismatched_offchain_accounts() + .iter() + .map(|(acc_id, _)| *acc_id) + .collect(), + value.transactions_to_commit.iter().map(|tx| tx.transaction_id).collect(), + ) + } +} + +pub enum SyncStatus { + SyncedToLastBlock(StateSyncUpdate), + SyncedToBlock(StateSyncUpdate), +} + +impl SyncStatus { + pub fn into_state_sync_update(self) -> StateSyncUpdate { + match self { + SyncStatus::SyncedToLastBlock(update) => update, + SyncStatus::SyncedToBlock(update) => update, + } + } +} + +pub struct SyncState { + /// The client's store, which provides a way to write and read entities to provide persistence. + store: Arc, + /// An instance of [NodeRpcClient] which provides a way for the component to connect to the + /// Miden node. + rpc_api: Arc, +} + +impl SyncState { + /// Creates a new instance of [SyncState]. + pub fn new(store: Arc, rpc_api: Arc) -> Self { + Self { store, rpc_api } + } + + pub async fn step_sync_state( + &mut self, + current_block_num: u32, + tracked_accounts: Vec, + note_tags: &[NoteTag], + nullifiers: &[Nullifier], + ) -> Result, ClientError> { + let account_ids: Vec = + tracked_accounts.iter().map(|account| account.id()).collect(); + + // To receive information about added nullifiers, we reduce them to the higher 16 bits + // Note that besides filtering by nullifier prefixes, the node also filters by block number + // (it only returns nullifiers from current_block_num until + // response.block_header.block_num()) + let nullifier_tags: Vec = nullifiers.iter().map(get_nullifier_prefix).collect(); + + let response = self + .rpc_api + .sync_state(current_block_num, &account_ids, note_tags, &nullifier_tags) + .await?; + + // We don't need to continue if the chain has not advanced, there are no new changes + if response.block_header.block_num() == current_block_num { + return Ok(None); + } + + let (committed_note_updates, tags_to_remove) = self + .committed_note_updates(response.note_inclusions, &response.block_header) + .await?; + + let incoming_block_has_relevant_notes = + self.check_block_relevance(&committed_note_updates).await?; + + let transactions_to_commit = self.get_transactions_to_commit(response.transactions).await?; + + let (consumed_note_updates, transactions_to_discard) = + self.consumed_note_updates(response.nullifiers, &transactions_to_commit).await?; + + let note_updates = committed_note_updates.combine_with(consumed_note_updates); + + let (onchain_accounts, offchain_accounts): (Vec<_>, Vec<_>) = tracked_accounts + .into_iter() + .partition(|account_header| account_header.id().is_public()); + + let updated_onchain_accounts = self + .get_updated_onchain_accounts(&response.account_hash_updates, &onchain_accounts) + .await?; + + let mismatched_offchain_accounts = self + .validate_local_account_hashes(&response.account_hash_updates, &offchain_accounts) + .await?; + + // Build PartialMmr with current data and apply updates + let (new_peaks, new_authentication_nodes) = { + let current_partial_mmr = self.store.build_current_partial_mmr(false).await?; + + let (current_block, has_relevant_notes) = + self.store.get_block_header_by_num(current_block_num).await?; + + apply_mmr_changes( + current_partial_mmr, + response.mmr_delta, + current_block, + has_relevant_notes, + )? + }; + + let sync_update = StateSyncUpdate { + block_header: response.block_header, + note_updates, + transactions_to_commit, + new_mmr_peaks: new_peaks, + new_authentication_nodes, + updated_accounts: AccountUpdates::new( + updated_onchain_accounts, + mismatched_offchain_accounts, + ), + block_has_relevant_notes: incoming_block_has_relevant_notes, + transactions_to_discard, + tags_to_remove, + }; + + if response.chain_tip == response.block_header.block_num() { + Ok(Some(SyncStatus::SyncedToLastBlock(sync_update))) + } else { + Ok(Some(SyncStatus::SyncedToBlock(sync_update))) + } + } + // HELPERS + // -------------------------------------------------------------------------------------------- + + /// Returns the [NoteUpdates] containing new public note and committed input/output notes and a + /// list or note tag records to be removed from the store. + async fn committed_note_updates( + &mut self, + committed_notes: Vec, + block_header: &BlockHeader, + ) -> Result<(NoteUpdates, Vec), ClientError> { + // We'll only pick committed notes that we are tracking as input/output notes. Since the + // sync response contains notes matching either the provided accounts or the provided tag + // we might get many notes when we only care about a few of those. + let relevant_note_filter = + NoteFilter::List(committed_notes.iter().map(|note| note.note_id()).cloned().collect()); + + let mut committed_input_notes: BTreeMap = self + .store + .get_input_notes(relevant_note_filter.clone()) + .await? + .into_iter() + .map(|n| (n.id(), n)) + .collect(); + + let mut committed_output_notes: BTreeMap = self + .store + .get_output_notes(relevant_note_filter) + .await? + .into_iter() + .map(|n| (n.id(), n)) + .collect(); + + let mut new_public_notes = vec![]; + let mut committed_tracked_input_notes = vec![]; + let mut committed_tracked_output_notes = vec![]; + let mut removed_tags = vec![]; + + for committed_note in committed_notes { + let inclusion_proof = NoteInclusionProof::new( + block_header.block_num(), + committed_note.note_index(), + committed_note.merkle_path().clone(), + )?; + + if let Some(mut note_record) = committed_input_notes.remove(committed_note.note_id()) { + // The note belongs to our locally tracked set of input notes + + let inclusion_proof_received = note_record + .inclusion_proof_received(inclusion_proof.clone(), committed_note.metadata())?; + let block_header_received = note_record.block_header_received(*block_header)?; + + removed_tags.push((¬e_record).try_into()?); + + if inclusion_proof_received || block_header_received { + committed_tracked_input_notes.push(note_record); + } + } + + if let Some(mut note_record) = committed_output_notes.remove(committed_note.note_id()) { + // The note belongs to our locally tracked set of output notes + + if note_record.inclusion_proof_received(inclusion_proof.clone())? { + committed_tracked_output_notes.push(note_record); + } + } + + if !committed_input_notes.contains_key(committed_note.note_id()) + && !committed_output_notes.contains_key(committed_note.note_id()) + { + // The note is public and we are not tracking it, push to the list of IDs to query + new_public_notes.push(*committed_note.note_id()); + } + } + + // Query the node for input note data and build the entities + let new_public_notes = + self.fetch_public_note_details(&new_public_notes, block_header).await?; + + Ok(( + NoteUpdates::new( + new_public_notes, + vec![], + committed_tracked_input_notes, + committed_tracked_output_notes, + ), + removed_tags, + )) + } + + /// Returns the [NoteUpdates] containing consumed input/output notes and a list of IDs of the + /// transactions that were discarded. + async fn consumed_note_updates( + &mut self, + nullifiers: Vec, + committed_transactions: &[TransactionUpdate], + ) -> Result<(NoteUpdates, Vec), ClientError> { + let nullifier_filter = NoteFilter::Nullifiers( + nullifiers.iter().map(|nullifier_update| nullifier_update.nullifier).collect(), + ); + + let mut consumed_input_notes: BTreeMap = self + .store + .get_input_notes(nullifier_filter.clone()) + .await? + .into_iter() + .map(|n| (n.nullifier(), n)) + .collect(); + + let mut consumed_output_notes: BTreeMap = self + .store + .get_output_notes(nullifier_filter) + .await? + .into_iter() + .map(|n| { + ( + n.nullifier() + .expect("Output notes returned by this query should have nullifiers"), + n, + ) + }) + .collect(); + + let mut consumed_tracked_input_notes = vec![]; + let mut consumed_tracked_output_notes = vec![]; + + // Committed transactions + for transaction_update in committed_transactions { + let transaction_nullifiers: Vec = consumed_input_notes + .iter() + .filter_map(|(nullifier, note_record)| { + if note_record.is_processing() + && note_record.consumer_transaction_id() + == Some(&transaction_update.transaction_id) + { + Some(nullifier) + } else { + None + } + }) + .cloned() + .collect(); + + for nullifier in transaction_nullifiers { + if let Some(mut input_note_record) = consumed_input_notes.remove(&nullifier) { + if input_note_record.transaction_committed( + transaction_update.transaction_id, + transaction_update.block_num, + )? { + consumed_tracked_input_notes.push(input_note_record); + } + } + } + } + + // Nullified notes + let mut discarded_transactions = vec![]; + for nullifier_update in nullifiers { + let nullifier = nullifier_update.nullifier; + let block_num = nullifier_update.block_num; + + if let Some(mut input_note_record) = consumed_input_notes.remove(&nullifier) { + if input_note_record.is_processing() { + discarded_transactions.push( + *input_note_record + .consumer_transaction_id() + .expect("Processing note should have consumer transaction id"), + ); + } + + if input_note_record.consumed_externally(nullifier, block_num)? { + consumed_tracked_input_notes.push(input_note_record); + } + } + + if let Some(mut output_note_record) = consumed_output_notes.remove(&nullifier) { + if output_note_record.nullifier_received(nullifier, block_num)? { + consumed_tracked_output_notes.push(output_note_record); + } + } + } + + Ok(( + NoteUpdates::new( + vec![], + vec![], + consumed_tracked_input_notes, + consumed_tracked_output_notes, + ), + discarded_transactions, + )) + } + + /// Queries the node for all received notes that are not being locally tracked in the client + /// + /// The client can receive metadata for private notes that it's not tracking. In this case, + /// notes are ignored for now as they become useless until details are imported. + async fn fetch_public_note_details( + &mut self, + query_notes: &[NoteId], + block_header: &BlockHeader, + ) -> Result, ClientError> { + if query_notes.is_empty() { + return Ok(vec![]); + } + info!("Getting note details for notes that are not being tracked."); + + let notes_data = self.rpc_api.get_notes_by_id(query_notes).await?; + let mut return_notes = Vec::with_capacity(query_notes.len()); + for note_data in notes_data { + match note_data { + NoteDetails::Private(id, ..) => { + // TODO: Is there any benefit to not ignoring these? In any case we do not have + // the recipient which is mandatory right now. + info!("Note {} is private but the client is not tracking it, ignoring.", id); + }, + NoteDetails::Public(note, inclusion_proof) => { + info!("Retrieved details for Note ID {}.", note.id()); + let inclusion_proof = NoteInclusionProof::new( + block_header.block_num(), + inclusion_proof.note_index, + inclusion_proof.merkle_path, + ) + .map_err(ClientError::NoteError)?; + let metadata = *note.metadata(); + + return_notes.push(InputNoteRecord::new( + note.into(), + self.store.get_current_timestamp(), + CommittedNoteState { + metadata, + inclusion_proof, + block_note_root: block_header.note_root(), + } + .into(), + )) + }, + } + } + Ok(return_notes) + } + + /// Extracts information about transactions for uncommitted transactions that the client is + /// tracking from the received [SyncStateResponse] + async fn get_transactions_to_commit( + &self, + mut transactions: Vec, + ) -> Result, ClientError> { + // Get current uncommitted transactions + let uncommitted_transaction_ids = self + .store + .get_transactions(TransactionFilter::Uncomitted) + .await? + .into_iter() + .map(|tx| tx.id) + .collect::>(); + + transactions.retain(|transaction_update| { + uncommitted_transaction_ids.contains(&transaction_update.transaction_id) + }); + + Ok(transactions) + } + + async fn get_updated_onchain_accounts( + &mut self, + account_updates: &[(AccountId, Digest)], + current_onchain_accounts: &[AccountHeader], + ) -> Result, ClientError> { + let mut accounts_to_update: Vec = Vec::new(); + for (remote_account_id, remote_account_hash) in account_updates { + // check if this updated account is tracked by the client + let current_account = current_onchain_accounts + .iter() + .find(|acc| *remote_account_id == acc.id() && *remote_account_hash != acc.hash()); + + if let Some(tracked_account) = current_account { + info!("Public account hash difference detected for account with ID: {}. Fetching node for updates...", tracked_account.id()); + let account_details = self.rpc_api.get_account_update(tracked_account.id()).await?; + if let AccountDetails::Public(account, _) = account_details { + // We should only do the update if it's newer, otherwise we ignore it + if account.nonce().as_int() > tracked_account.nonce().as_int() { + accounts_to_update.push(account); + } + } else { + return Err(RpcError::AccountUpdateForPrivateAccountReceived( + account_details.account_id(), + ) + .into()); + } + } + } + Ok(accounts_to_update) + } + + /// Validates account hash updates and returns a vector with all the offchain account + /// mismatches. + /// + /// Offchain account mismatches happen when the hash account of the local tracked account + /// doesn't match the hash account of the account in the node. This would be an anomaly and may + /// happen for two main reasons: + /// - A different client made a transaction with the account, changing its state. + /// - The local transaction that modified the local state didn't go through, rendering the local + /// account state outdated. + async fn validate_local_account_hashes( + &mut self, + account_updates: &[(AccountId, Digest)], + current_offchain_accounts: &[AccountHeader], + ) -> Result, ClientError> { + let mut mismatched_accounts = vec![]; + + for (remote_account_id, remote_account_hash) in account_updates { + // ensure that if we track that account, it has the same hash + let mismatched_account = current_offchain_accounts + .iter() + .find(|acc| *remote_account_id == acc.id() && *remote_account_hash != acc.hash()); + + // OffChain accounts should always have the latest known state. If we receive a stale + // update we ignore it. + if mismatched_account.is_some() { + let account_by_hash = + self.store.get_account_header_by_hash(*remote_account_hash).await?; + + if account_by_hash.is_none() { + mismatched_accounts.push((*remote_account_id, *remote_account_hash)); + } + } + } + Ok(mismatched_accounts) + } + + /// Checks the relevance of the block by verifying if any of the input notes in the block are + /// relevant to the client. If any of the notes are relevant, the function returns `true`. + pub(crate) async fn check_block_relevance( + &mut self, + committed_notes: &NoteUpdates, + ) -> Result { + // We'll only do the check for either incoming public notes or expected input notes as + // output notes are not really candidates to be consumed here. + + let note_screener = NoteScreener::new(self.store.clone()); + + // Find all relevant Input Notes using the note checker + for input_note in committed_notes + .updated_input_notes() + .iter() + .chain(committed_notes.new_input_notes().iter()) + { + if !note_screener + .check_relevance(&input_note.try_into().map_err(ClientError::NoteRecordError)?) + .await? + .is_empty() + { + return Ok(true); + } + } + + Ok(false) + } +} + +/// Applies changes to the Mmr structure, storing authentication nodes for leaves we track +/// and returns the updated [PartialMmr]. +pub(crate) fn apply_mmr_changes( + current_partial_mmr: PartialMmr, + mmr_delta: MmrDelta, + current_block_header: BlockHeader, + current_block_has_relevant_notes: bool, +) -> Result<(MmrPeaks, Vec<(InOrderIndex, Digest)>), StoreError> { + let mut partial_mmr: PartialMmr = current_partial_mmr; + + // First, apply curent_block to the Mmr + let new_authentication_nodes = partial_mmr + .add(current_block_header.hash(), current_block_has_relevant_notes) + .into_iter(); + + // Apply the Mmr delta to bring Mmr to forest equal to chain tip + let new_authentication_nodes: Vec<(InOrderIndex, Digest)> = partial_mmr + .apply(mmr_delta) + .map_err(StoreError::MmrError)? + .into_iter() + .chain(new_authentication_nodes) + .collect(); + + Ok((partial_mmr.peaks(), new_authentication_nodes)) +} diff --git a/crates/rust-client/src/lib.rs b/crates/rust-client/src/lib.rs index 4e4315f44..6aff26814 100644 --- a/crates/rust-client/src/lib.rs +++ b/crates/rust-client/src/lib.rs @@ -9,6 +9,7 @@ pub use alloc::boxed::Box; extern crate std; pub mod accounts; +pub mod components; pub mod notes; pub mod rpc; pub mod store; diff --git a/crates/rust-client/src/notes/import.rs b/crates/rust-client/src/notes/import.rs index b1d2f35ea..58694d144 100644 --- a/crates/rust-client/src/notes/import.rs +++ b/crates/rust-client/src/notes/import.rs @@ -170,7 +170,7 @@ impl Client { note_record.inclusion_proof_received(inclusion_proof, metadata)?; if block_height < current_block_num { - let mut current_partial_mmr = self.build_current_partial_mmr(true).await?; + let mut current_partial_mmr = self.store.build_current_partial_mmr(true).await?; let block_header = self .get_and_store_authenticated_block(block_height, &mut current_partial_mmr) @@ -214,7 +214,7 @@ impl Client { match committed_note_data { Some((metadata, inclusion_proof)) => { - let mut current_partial_mmr = self.build_current_partial_mmr(true).await?; + let mut current_partial_mmr = self.store.build_current_partial_mmr(true).await?; let block_header = self .get_and_store_authenticated_block( inclusion_proof.location().block_num(), diff --git a/crates/rust-client/src/store/mod.rs b/crates/rust-client/src/store/mod.rs index 974e4480c..3cb9f02ae 100644 --- a/crates/rust-client/src/store/mod.rs +++ b/crates/rust-client/src/store/mod.rs @@ -10,13 +10,14 @@ use core::fmt::Debug; use async_trait::async_trait; use miden_objects::{ accounts::{Account, AccountCode, AccountHeader, AccountId, AuthSecretKey}, - crypto::merkle::{InOrderIndex, MmrPeaks}, + crypto::merkle::{InOrderIndex, MmrPeaks, PartialMmr}, notes::{NoteId, NoteTag, Nullifier}, BlockHeader, Digest, Word, }; use crate::{ - sync::{NoteTagRecord, StateSyncUpdate}, + components::sync_state::StateSyncUpdate, + sync::NoteTagRecord, transactions::{TransactionRecord, TransactionStoreUpdate}, }; @@ -208,6 +209,44 @@ pub trait Store: Send + Sync { has_client_notes: bool, ) -> Result<(), StoreError>; + /// Builds the current view of the chain's [PartialMmr]. Because we want to add all new + /// authentication nodes that could come from applying the MMR updates, we need to track all + /// known leaves thus far. + /// + /// As part of the syncing process, we add the current block number so we don't need to + /// track it here. + async fn build_current_partial_mmr( + &self, + include_current_block: bool, + ) -> Result { + let current_block_num = self.get_sync_height().await?; + + let tracked_nodes = self.get_chain_mmr_nodes(ChainMmrNodeFilter::All).await?; + let current_peaks = self.get_chain_mmr_peaks_by_block_num(current_block_num).await?; + + let track_latest = if current_block_num != 0 { + match self.get_block_header_by_num(current_block_num - 1).await { + Ok((_, previous_block_had_notes)) => Ok(previous_block_had_notes), + Err(StoreError::BlockHeaderNotFound(_)) => Ok(false), + Err(err) => Err(err), + }? + } else { + false + }; + + let mut current_partial_mmr = + PartialMmr::from_parts(current_peaks, tracked_nodes, track_latest); + + if include_current_block { + let (current_block, has_client_notes) = + self.get_block_header_by_num(current_block_num).await?; + + current_partial_mmr.add(current_block.hash(), has_client_notes); + } + + Ok(current_partial_mmr) + } + // ACCOUNT // -------------------------------------------------------------------------------------------- diff --git a/crates/rust-client/src/store/sqlite_store/mod.rs b/crates/rust-client/src/store/sqlite_store/mod.rs index c70294b98..1c58f846b 100644 --- a/crates/rust-client/src/store/sqlite_store/mod.rs +++ b/crates/rust-client/src/store/sqlite_store/mod.rs @@ -20,8 +20,9 @@ use super::{ OutputNoteRecord, Store, TransactionFilter, }; use crate::{ + components::sync_state::StateSyncUpdate, store::StoreError, - sync::{NoteTagRecord, StateSyncUpdate}, + sync::NoteTagRecord, transactions::{TransactionRecord, TransactionStoreUpdate}, }; diff --git a/crates/rust-client/src/store/sqlite_store/sync.rs b/crates/rust-client/src/store/sqlite_store/sync.rs index dab152f13..d9e626fd1 100644 --- a/crates/rust-client/src/store/sqlite_store/sync.rs +++ b/crates/rust-client/src/store/sqlite_store/sync.rs @@ -6,6 +6,7 @@ use rusqlite::{params, Connection, Transaction}; use super::SqliteStore; use crate::{ + components::sync_state::StateSyncUpdate, store::{ sqlite_store::{ accounts::{lock_account, update_account}, @@ -13,7 +14,7 @@ use crate::{ }, StoreError, }, - sync::{NoteTagRecord, NoteTagSource, StateSyncUpdate}, + sync::{NoteTagRecord, NoteTagSource}, }; impl SqliteStore { diff --git a/crates/rust-client/src/store/web_store/mod.rs b/crates/rust-client/src/store/web_store/mod.rs index ee4498413..99054449a 100644 --- a/crates/rust-client/src/store/web_store/mod.rs +++ b/crates/rust-client/src/store/web_store/mod.rs @@ -15,7 +15,8 @@ use super::{ OutputNoteRecord, Store, StoreError, TransactionFilter, }; use crate::{ - sync::{NoteTagRecord, StateSyncUpdate}, + components::sync_state::StateSyncUpdate, + sync::NoteTagRecord, transactions::{TransactionRecord, TransactionStoreUpdate}, }; diff --git a/crates/rust-client/src/store/web_store/sync/mod.rs b/crates/rust-client/src/store/web_store/sync/mod.rs index b7a83aca4..7f1977061 100644 --- a/crates/rust-client/src/store/web_store/sync/mod.rs +++ b/crates/rust-client/src/store/web_store/sync/mod.rs @@ -18,8 +18,9 @@ use super::{ WebStore, }; use crate::{ + components::sync_state::StateSyncUpdate, store::StoreError, - sync::{NoteTagRecord, NoteTagSource, StateSyncUpdate}, + sync::{NoteTagRecord, NoteTagSource}, }; mod js_bindings; diff --git a/crates/rust-client/src/sync/block_headers.rs b/crates/rust-client/src/sync/block_headers.rs index 338944687..a2e3fbc53 100644 --- a/crates/rust-client/src/sync/block_headers.rs +++ b/crates/rust-client/src/sync/block_headers.rs @@ -1,16 +1,14 @@ use alloc::vec::Vec; -use crypto::merkle::{InOrderIndex, MmrDelta, MmrPeaks, PartialMmr}; +use crypto::merkle::{InOrderIndex, MmrPeaks, PartialMmr}; use miden_objects::{ crypto::{self, merkle::MerklePath, rand::FeltRng}, BlockHeader, Digest, }; use tracing::warn; -use super::NoteUpdates; use crate::{ - notes::NoteScreener, - store::{ChainMmrNodeFilter, NoteFilter, StoreError}, + store::{NoteFilter, StoreError}, Client, ClientError, }; @@ -19,7 +17,7 @@ impl Client { /// Updates committed notes with no MMR data. These could be notes that were /// imported with an inclusion proof, but its block header isn't tracked. pub(crate) async fn update_mmr_data(&mut self) -> Result<(), ClientError> { - let mut current_partial_mmr = self.build_current_partial_mmr(true).await?; + let mut current_partial_mmr = self.store.build_current_partial_mmr(true).await?; let mut changed_notes = vec![]; for mut note in self.store.get_input_notes(NoteFilter::Unverified).await? { @@ -70,73 +68,6 @@ impl Client { // HELPERS // -------------------------------------------------------------------------------------------- - /// Checks the relevance of the block by verifying if any of the input notes in the block are - /// relevant to the client. If any of the notes are relevant, the function returns `true`. - pub(crate) async fn check_block_relevance( - &mut self, - committed_notes: &NoteUpdates, - ) -> Result { - // We'll only do the check for either incoming public notes or expected input notes as - // output notes are not really candidates to be consumed here. - - let note_screener = NoteScreener::new(self.store.clone()); - - // Find all relevant Input Notes using the note checker - for input_note in committed_notes - .updated_input_notes() - .iter() - .chain(committed_notes.new_input_notes().iter()) - { - if !note_screener - .check_relevance(&input_note.try_into().map_err(ClientError::NoteRecordError)?) - .await? - .is_empty() - { - return Ok(true); - } - } - - Ok(false) - } - - /// Builds the current view of the chain's [PartialMmr]. Because we want to add all new - /// authentication nodes that could come from applying the MMR updates, we need to track all - /// known leaves thus far. - /// - /// As part of the syncing process, we add the current block number so we don't need to - /// track it here. - pub(crate) async fn build_current_partial_mmr( - &self, - include_current_block: bool, - ) -> Result { - let current_block_num = self.store.get_sync_height().await?; - - let tracked_nodes = self.store.get_chain_mmr_nodes(ChainMmrNodeFilter::All).await?; - let current_peaks = self.store.get_chain_mmr_peaks_by_block_num(current_block_num).await?; - - let track_latest = if current_block_num != 0 { - match self.store.get_block_header_by_num(current_block_num - 1).await { - Ok((_, previous_block_had_notes)) => Ok(previous_block_had_notes), - Err(StoreError::BlockHeaderNotFound(_)) => Ok(false), - Err(err) => Err(ClientError::StoreError(err)), - }? - } else { - false - }; - - let mut current_partial_mmr = - PartialMmr::from_parts(current_peaks, tracked_nodes, track_latest); - - if include_current_block { - let (current_block, has_client_notes) = - self.store.get_block_header_by_num(current_block_num).await?; - - current_partial_mmr.add(current_block.hash(), has_client_notes); - } - - Ok(current_partial_mmr) - } - /// Retrieves and stores a [BlockHeader] by number, and stores its authentication data as well. /// /// If the store already contains MMR data for the requested block number, the request isn't @@ -216,29 +147,3 @@ fn adjust_merkle_path_for_forest( path_nodes } - -/// Applies changes to the Mmr structure, storing authentication nodes for leaves we track -/// and returns the updated [PartialMmr]. -pub(crate) fn apply_mmr_changes( - current_partial_mmr: PartialMmr, - mmr_delta: MmrDelta, - current_block_header: BlockHeader, - current_block_has_relevant_notes: bool, -) -> Result<(MmrPeaks, Vec<(InOrderIndex, Digest)>), StoreError> { - let mut partial_mmr: PartialMmr = current_partial_mmr; - - // First, apply curent_block to the Mmr - let new_authentication_nodes = partial_mmr - .add(current_block_header.hash(), current_block_has_relevant_notes) - .into_iter(); - - // Apply the Mmr delta to bring Mmr to forest equal to chain tip - let new_authentication_nodes: Vec<(InOrderIndex, Digest)> = partial_mmr - .apply(mmr_delta) - .map_err(StoreError::MmrError)? - .into_iter() - .chain(new_authentication_nodes) - .collect(); - - Ok((partial_mmr.peaks(), new_authentication_nodes)) -} diff --git a/crates/rust-client/src/sync/mod.rs b/crates/rust-client/src/sync/mod.rs index a42a5e601..9a5b5ceb6 100644 --- a/crates/rust-client/src/sync/mod.rs +++ b/crates/rust-client/src/sync/mod.rs @@ -1,40 +1,22 @@ //! Provides the client APIs for synchronizing the client's local state with the Miden //! rollup network. It ensures that the client maintains a valid, up-to-date view of the chain. -use alloc::{collections::BTreeMap, vec::Vec}; +use alloc::vec::Vec; use core::cmp::max; -use crypto::merkle::{InOrderIndex, MmrPeaks}; use miden_objects::{ - accounts::{Account, AccountHeader, AccountId}, - crypto::{self, rand::FeltRng}, - notes::{NoteId, NoteInclusionProof, NoteTag, Nullifier}, + accounts::{AccountHeader, AccountId}, + crypto::rand::FeltRng, + notes::{NoteId, NoteTag, Nullifier}, transaction::TransactionId, - BlockHeader, Digest, }; -use tracing::info; use crate::{ - accounts::AccountUpdates, - notes::NoteUpdates, - rpc::{ - domain::{ - accounts::AccountDetails, - notes::{CommittedNote, NoteDetails}, - nullifiers::NullifierUpdate, - transactions::TransactionUpdate, - }, - RpcError, - }, - store::{ - input_note_states::CommittedNoteState, InputNoteRecord, NoteFilter, OutputNoteRecord, - TransactionFilter, - }, + components::sync_state::{SyncState, SyncStatus}, Client, ClientError, }; mod block_headers; -use block_headers::apply_mmr_changes; mod tags; pub use tags::{NoteTagRecord, NoteTagSource}; @@ -108,45 +90,6 @@ impl SyncSummary { } } -enum SyncStatus { - SyncedToLastBlock(SyncSummary), - SyncedToBlock(SyncSummary), -} - -impl SyncStatus { - pub fn into_sync_summary(self) -> SyncSummary { - match self { - SyncStatus::SyncedToLastBlock(summary) => summary, - SyncStatus::SyncedToBlock(summary) => summary, - } - } -} - -/// Contains all information needed to apply the update in the store after syncing with the node. -pub struct StateSyncUpdate { - /// The new block header, returned as part of the - /// [StateSyncInfo](crate::rpc::domain::sync::StateSyncInfo) - pub block_header: BlockHeader, - /// Information about note changes after the sync. - pub note_updates: NoteUpdates, - /// Transaction updates for any transaction that was committed between the sync request's - /// block number and the response's block number. - pub transactions_to_commit: Vec, - /// Transaction IDs for any transactions that were discarded in the sync. - pub transactions_to_discard: Vec, - /// New MMR peaks for the locally tracked MMR of the blockchain. - pub new_mmr_peaks: MmrPeaks, - /// New authentications nodes that are meant to be stored in order to authenticate block - /// headers. - pub new_authentication_nodes: Vec<(InOrderIndex, Digest)>, - /// Information abount account changes after the sync. - pub updated_accounts: AccountUpdates, - /// Whether the block header has notes relevant to the client. - pub block_has_relevant_notes: bool, - /// Tag records that are no longer relevant. - pub tags_to_remove: Vec, -} - // CONSTANTS // ================================================================================================ @@ -186,464 +129,48 @@ impl Client { self.ensure_genesis_in_place().await?; let mut total_sync_summary = SyncSummary::new_empty(0); loop { - let response = self.sync_state_once().await?; - let is_last_block = matches!(response, SyncStatus::SyncedToLastBlock(_)); - total_sync_summary.combine_with(response.into_sync_summary()); - - if is_last_block { - break; - } - } - self.update_mmr_data().await?; - - Ok(total_sync_summary) - } - - async fn sync_state_once(&mut self) -> Result { - let current_block_num = self.store.get_sync_height().await?; - - let accounts: Vec = self - .store - .get_account_headers() - .await? - .into_iter() - .map(|(acc_header, _)| acc_header) - .collect(); - - let note_tags: Vec = - self.store.get_unique_note_tags().await?.into_iter().collect(); - - // To receive information about added nullifiers, we reduce them to the higher 16 bits - // Note that besides filtering by nullifier prefixes, the node also filters by block number - // (it only returns nullifiers from current_block_num until - // response.block_header.block_num()) - let nullifiers_tags: Vec = self - .store - .get_unspent_input_note_nullifiers() - .await? - .iter() - .map(get_nullifier_prefix) - .collect(); - - // Send request - let account_ids: Vec = accounts.iter().map(|acc| acc.id()).collect(); - let response = self - .rpc_api - .sync_state(current_block_num, &account_ids, ¬e_tags, &nullifiers_tags) - .await?; - - // We don't need to continue if the chain has not advanced, there are no new changes - if response.block_header.block_num() == current_block_num { - return Ok(SyncStatus::SyncedToLastBlock(SyncSummary::new_empty(current_block_num))); - } - - let (committed_note_updates, tags_to_remove) = self - .committed_note_updates(response.note_inclusions, &response.block_header) - .await?; - - let incoming_block_has_relevant_notes = - self.check_block_relevance(&committed_note_updates).await?; - - let transactions_to_commit = self.get_transactions_to_commit(response.transactions).await?; - - let (consumed_note_updates, transactions_to_discard) = - self.consumed_note_updates(response.nullifiers, &transactions_to_commit).await?; - - let note_updates = committed_note_updates.combine_with(consumed_note_updates); - - let (onchain_accounts, offchain_accounts): (Vec<_>, Vec<_>) = - accounts.into_iter().partition(|account_header| account_header.id().is_public()); - - let updated_onchain_accounts = self - .get_updated_onchain_accounts(&response.account_hash_updates, &onchain_accounts) - .await?; - - let mismatched_offchain_accounts = self - .validate_local_account_hashes(&response.account_hash_updates, &offchain_accounts) - .await?; - - // Build PartialMmr with current data and apply updates - let (new_peaks, new_authentication_nodes) = { - let current_partial_mmr = self.build_current_partial_mmr(false).await?; - - let (current_block, has_relevant_notes) = - self.store.get_block_header_by_num(current_block_num).await?; - - apply_mmr_changes( - current_partial_mmr, - response.mmr_delta, - current_block, - has_relevant_notes, - )? - }; - - // Store summary to return later - let sync_summary = SyncSummary::new( - response.block_header.block_num(), - note_updates.new_input_notes().iter().map(|n| n.id()).collect(), - note_updates.committed_note_ids().into_iter().collect(), - note_updates.consumed_note_ids().into_iter().collect(), - updated_onchain_accounts.iter().map(|acc| acc.id()).collect(), - mismatched_offchain_accounts.iter().map(|(acc_id, _)| *acc_id).collect(), - transactions_to_commit.iter().map(|tx| tx.transaction_id).collect(), - ); - - let state_sync_update = StateSyncUpdate { - block_header: response.block_header, - note_updates, - transactions_to_commit, - new_mmr_peaks: new_peaks, - new_authentication_nodes, - updated_accounts: AccountUpdates::new( - updated_onchain_accounts, - mismatched_offchain_accounts, - ), - block_has_relevant_notes: incoming_block_has_relevant_notes, - transactions_to_discard, - tags_to_remove, - }; - - // Apply received and computed updates to the store - self.store - .apply_state_sync(state_sync_update) - .await - .map_err(ClientError::StoreError)?; - - if response.chain_tip == response.block_header.block_num() { - Ok(SyncStatus::SyncedToLastBlock(sync_summary)) - } else { - Ok(SyncStatus::SyncedToBlock(sync_summary)) - } - } - - // HELPERS - // -------------------------------------------------------------------------------------------- - - /// Returns the [NoteUpdates] containing new public note and committed input/output notes and a - /// list or note tag records to be removed from the store. - async fn committed_note_updates( - &mut self, - committed_notes: Vec, - block_header: &BlockHeader, - ) -> Result<(NoteUpdates, Vec), ClientError> { - // We'll only pick committed notes that we are tracking as input/output notes. Since the - // sync response contains notes matching either the provided accounts or the provided tag - // we might get many notes when we only care about a few of those. - let relevant_note_filter = - NoteFilter::List(committed_notes.iter().map(|note| note.note_id()).cloned().collect()); - - let mut committed_input_notes: BTreeMap = self - .store - .get_input_notes(relevant_note_filter.clone()) - .await? - .into_iter() - .map(|n| (n.id(), n)) - .collect(); - - let mut committed_output_notes: BTreeMap = self - .store - .get_output_notes(relevant_note_filter) - .await? - .into_iter() - .map(|n| (n.id(), n)) - .collect(); - - let mut new_public_notes = vec![]; - let mut committed_tracked_input_notes = vec![]; - let mut committed_tracked_output_notes = vec![]; - let mut removed_tags = vec![]; - - for committed_note in committed_notes { - let inclusion_proof = NoteInclusionProof::new( - block_header.block_num(), - committed_note.note_index(), - committed_note.merkle_path().clone(), - )?; - - if let Some(mut note_record) = committed_input_notes.remove(committed_note.note_id()) { - // The note belongs to our locally tracked set of input notes - - let inclusion_proof_received = note_record - .inclusion_proof_received(inclusion_proof.clone(), committed_note.metadata())?; - let block_header_received = note_record.block_header_received(*block_header)?; - - removed_tags.push((¬e_record).try_into()?); - - if inclusion_proof_received || block_header_received { - committed_tracked_input_notes.push(note_record); - } - } - - if let Some(mut note_record) = committed_output_notes.remove(committed_note.note_id()) { - // The note belongs to our locally tracked set of output notes - - if note_record.inclusion_proof_received(inclusion_proof.clone())? { - committed_tracked_output_notes.push(note_record); - } - } - - if !committed_input_notes.contains_key(committed_note.note_id()) - && !committed_output_notes.contains_key(committed_note.note_id()) - { - // The note is public and we are not tracking it, push to the list of IDs to query - new_public_notes.push(*committed_note.note_id()); - } - } - - // Query the node for input note data and build the entities - let new_public_notes = - self.fetch_public_note_details(&new_public_notes, block_header).await?; - - Ok(( - NoteUpdates::new( - new_public_notes, - vec![], - committed_tracked_input_notes, - committed_tracked_output_notes, - ), - removed_tags, - )) - } - - /// Returns the [NoteUpdates] containing consumed input/output notes and a list of IDs of the - /// transactions that were discarded. - async fn consumed_note_updates( - &mut self, - nullifiers: Vec, - committed_transactions: &[TransactionUpdate], - ) -> Result<(NoteUpdates, Vec), ClientError> { - let nullifier_filter = NoteFilter::Nullifiers( - nullifiers.iter().map(|nullifier_update| nullifier_update.nullifier).collect(), - ); - - let mut consumed_input_notes: BTreeMap = self - .store - .get_input_notes(nullifier_filter.clone()) - .await? - .into_iter() - .map(|n| (n.nullifier(), n)) - .collect(); - - let mut consumed_output_notes: BTreeMap = self - .store - .get_output_notes(nullifier_filter) - .await? - .into_iter() - .map(|n| { - ( - n.nullifier() - .expect("Output notes returned by this query should have nullifiers"), - n, - ) - }) - .collect(); - - let mut consumed_tracked_input_notes = vec![]; - let mut consumed_tracked_output_notes = vec![]; - - // Committed transactions - for transaction_update in committed_transactions { - let transaction_nullifiers: Vec = consumed_input_notes - .iter() - .filter_map(|(nullifier, note_record)| { - if note_record.is_processing() - && note_record.consumer_transaction_id() - == Some(&transaction_update.transaction_id) - { - Some(nullifier) - } else { - None - } - }) - .cloned() + // Get relevant data for the sync request + let current_block_num = self.store.get_sync_height().await?; + + let accounts: Vec = self + .store + .get_account_headers() + .await? + .into_iter() + .map(|(acc_header, _)| acc_header) .collect(); - for nullifier in transaction_nullifiers { - if let Some(mut input_note_record) = consumed_input_notes.remove(&nullifier) { - if input_note_record.transaction_committed( - transaction_update.transaction_id, - transaction_update.block_num, - )? { - consumed_tracked_input_notes.push(input_note_record); - } - } - } - } + let note_tags: Vec = + self.store.get_unique_note_tags().await?.into_iter().collect(); - // Nullified notes - let mut discarded_transactions = vec![]; - for nullifier_update in nullifiers { - let nullifier = nullifier_update.nullifier; - let block_num = nullifier_update.block_num; + let nullifiers = self.store.get_unspent_input_note_nullifiers().await?; - if let Some(mut input_note_record) = consumed_input_notes.remove(&nullifier) { - if input_note_record.is_processing() { - discarded_transactions.push( - *input_note_record - .consumer_transaction_id() - .expect("Processing note should have consumer transaction id"), - ); - } + // Sync the state with the network + let response = SyncState::new(self.store.clone(), self.rpc_api.clone()) + .step_sync_state(current_block_num, accounts, ¬e_tags, &nullifiers) + .await?; - if input_note_record.consumed_externally(nullifier, block_num)? { - consumed_tracked_input_notes.push(input_note_record); - } - } + let is_last_block = matches!(response, Some(SyncStatus::SyncedToLastBlock(_)) | None); - if let Some(mut output_note_record) = consumed_output_notes.remove(&nullifier) { - if output_note_record.nullifier_received(nullifier, block_num)? { - consumed_tracked_output_notes.push(output_note_record); - } - } - } + if let Some(status) = response { + let sync_update = status.into_state_sync_update(); - Ok(( - NoteUpdates::new( - vec![], - vec![], - consumed_tracked_input_notes, - consumed_tracked_output_notes, - ), - discarded_transactions, - )) - } - - /// Queries the node for all received notes that aren't being locally tracked in the client. - /// - /// The client can receive metadata for private notes that it's not tracking. In this case, - /// notes are ignored for now as they become useless until details are imported. - async fn fetch_public_note_details( - &mut self, - query_notes: &[NoteId], - block_header: &BlockHeader, - ) -> Result, ClientError> { - if query_notes.is_empty() { - return Ok(vec![]); - } - info!("Getting note details for notes that are not being tracked."); - - let notes_data = self.rpc_api.get_notes_by_id(query_notes).await?; - let mut return_notes = Vec::with_capacity(query_notes.len()); - for note_data in notes_data { - match note_data { - NoteDetails::Private(id, ..) => { - // TODO: Is there any benefit to not ignoring these? In any case we do not have - // the recipient which is mandatory right now. - info!("Note {} is private but the client is not tracking it, ignoring.", id); - }, - NoteDetails::Public(note, inclusion_proof) => { - info!("Retrieved details for Note ID {}.", note.id()); - let inclusion_proof = NoteInclusionProof::new( - block_header.block_num(), - inclusion_proof.note_index, - inclusion_proof.merkle_path, - ) - .map_err(ClientError::NoteError)?; - let metadata = *note.metadata(); + total_sync_summary.combine_with((&sync_update).into()); - return_notes.push(InputNoteRecord::new( - note.into(), - self.store.get_current_timestamp(), - CommittedNoteState { - metadata, - inclusion_proof, - block_note_root: block_header.note_root(), - } - .into(), - )) - }, + // Apply received and computed updates to the store + self.store + .apply_state_sync(sync_update) + .await + .map_err(ClientError::StoreError)?; } - } - Ok(return_notes) - } - - /// Extracts information about transactions for uncommitted transactions that the client is - /// tracking from the received [SyncStateResponse]. - async fn get_transactions_to_commit( - &self, - mut transactions: Vec, - ) -> Result, ClientError> { - // Get current uncommitted transactions - let uncommitted_transaction_ids = self - .store - .get_transactions(TransactionFilter::Uncomitted) - .await? - .into_iter() - .map(|tx| tx.id) - .collect::>(); - - transactions.retain(|transaction_update| { - uncommitted_transaction_ids.contains(&transaction_update.transaction_id) - }); - - Ok(transactions) - } - - async fn get_updated_onchain_accounts( - &mut self, - account_updates: &[(AccountId, Digest)], - current_onchain_accounts: &[AccountHeader], - ) -> Result, ClientError> { - let mut accounts_to_update: Vec = Vec::new(); - for (remote_account_id, remote_account_hash) in account_updates { - // check if this updated account is tracked by the client - let current_account = current_onchain_accounts - .iter() - .find(|acc| *remote_account_id == acc.id() && *remote_account_hash != acc.hash()); - if let Some(tracked_account) = current_account { - info!("Public account hash difference detected for account with ID: {}. Fetching node for updates...", tracked_account.id()); - let account_details = self.rpc_api.get_account_update(tracked_account.id()).await?; - if let AccountDetails::Public(account, _) = account_details { - // We should only do the update if it's newer, otherwise we ignore it - if account.nonce().as_int() > tracked_account.nonce().as_int() { - accounts_to_update.push(account); - } - } else { - return Err(RpcError::AccountUpdateForPrivateAccountReceived( - account_details.account_id(), - ) - .into()); - } + if is_last_block { + break; } } - Ok(accounts_to_update) - } - - /// Validates account hash updates and returns a vector with all the offchain account - /// mismatches. - /// - /// Offchain account mismatches happen when the hash account of the local tracked account - /// doesn't match the hash account of the account in the node. This would be an anomaly and may - /// happen for two main reasons: - /// - A different client made a transaction with the account, changing its state. - /// - The local transaction that modified the local state didn't go through, rendering the local - /// account state outdated. - async fn validate_local_account_hashes( - &mut self, - account_updates: &[(AccountId, Digest)], - current_offchain_accounts: &[AccountHeader], - ) -> Result, ClientError> { - let mut mismatched_accounts = vec![]; - - for (remote_account_id, remote_account_hash) in account_updates { - // ensure that if we track that account, it has the same hash - let mismatched_account = current_offchain_accounts - .iter() - .find(|acc| *remote_account_id == acc.id() && *remote_account_hash != acc.hash()); - - // OffChain accounts should always have the latest known state. If we receive a stale - // update we ignore it. - if mismatched_account.is_some() { - let account_by_hash = - self.store.get_account_header_by_hash(*remote_account_hash).await?; + self.update_mmr_data().await?; - if account_by_hash.is_none() { - mismatched_accounts.push((*remote_account_id, *remote_account_hash)); - } - } - } - Ok(mismatched_accounts) + Ok(total_sync_summary) } } diff --git a/crates/rust-client/src/tests.rs b/crates/rust-client/src/tests.rs index 374eba4b7..5470c14ab 100644 --- a/crates/rust-client/src/tests.rs +++ b/crates/rust-client/src/tests.rs @@ -363,7 +363,7 @@ async fn test_sync_state_mmr() { ); // Try reconstructing the chain_mmr from what's in the database - let partial_mmr = client.build_current_partial_mmr(true).await.unwrap(); + let partial_mmr = client.test_store().build_current_partial_mmr(true).await.unwrap(); assert_eq!(partial_mmr.forest(), 6); assert!(partial_mmr.open(0).unwrap().is_none()); assert!(partial_mmr.open(1).unwrap().is_some()); diff --git a/crates/rust-client/src/transactions/mod.rs b/crates/rust-client/src/transactions/mod.rs index 44c14e6b6..7f7398f24 100644 --- a/crates/rust-client/src/transactions/mod.rs +++ b/crates/rust-client/src/transactions/mod.rs @@ -800,7 +800,7 @@ impl Client { let summary = self.sync_state().await?; if summary.block_num != block_num { - let mut current_partial_mmr = self.build_current_partial_mmr(true).await?; + let mut current_partial_mmr = self.store.build_current_partial_mmr(true).await?; self.get_and_store_authenticated_block(block_num, &mut current_partial_mmr) .await?; } From f61b79325a8ae897eb419269d7649ae1b0b88fb9 Mon Sep 17 00:00:00 2001 From: tomyrd Date: Mon, 23 Dec 2024 18:22:03 -0300 Subject: [PATCH 4/5] feat: create `StateSync` trait --- .../rust-client/src/components/sync_state.rs | 66 +++++++++++++++---- crates/rust-client/src/lib.rs | 4 ++ crates/rust-client/src/sync/mod.rs | 25 +------ 3 files changed, 62 insertions(+), 33 deletions(-) diff --git a/crates/rust-client/src/components/sync_state.rs b/crates/rust-client/src/components/sync_state.rs index 2f625c9bd..55e8a4a4a 100644 --- a/crates/rust-client/src/components/sync_state.rs +++ b/crates/rust-client/src/components/sync_state.rs @@ -1,4 +1,4 @@ -use alloc::{collections::BTreeMap, sync::Arc, vec::Vec}; +use alloc::{boxed::Box, collections::BTreeMap, sync::Arc, vec::Vec}; use miden_objects::{ accounts::{Account, AccountHeader, AccountId}, @@ -7,6 +7,7 @@ use miden_objects::{ transaction::TransactionId, BlockHeader, Digest, }; +use tonic::async_trait; use tracing::*; use crate::{ @@ -92,7 +93,34 @@ impl SyncStatus { } } -pub struct SyncState { +/// The [SyncState] trait defines the interface for the state sync process. The sync process is +/// responsible for updating the client's store with the latest state of the network. +/// +/// Methods in this trait shouldn't change the state of the client itself, but rather return the +/// updated state as a result. +#[async_trait(?Send)] +pub trait SyncState { + /// Performs a state sync to the next block on the specified elements. The function returns a + /// [SyncStatus] with the new state if the sync was successful, where the variant depends on + /// whether the client has synced to the last block in the chain or not. `None` will be returned + /// if the elements are already up to date. + /// + /// This method should query the store to get the current states of each element and then + /// create the updates to be applied with the new information from the node. + /// + /// The [Client] will call this method on the sync process until it reaches the last block in + /// the chain. + async fn step_sync_state( + &mut self, + current_block_num: u32, + tracked_accounts: Vec, + note_tags: &[NoteTag], + nullifiers: &[Nullifier], + ) -> Result, ClientError>; +} + +/// Implements the [SyncState] trait for a standard sync process. +pub struct ClientSyncState { /// The client's store, which provides a way to write and read entities to provide persistence. store: Arc, /// An instance of [NodeRpcClient] which provides a way for the component to connect to the @@ -100,13 +128,24 @@ pub struct SyncState { rpc_api: Arc, } -impl SyncState { - /// Creates a new instance of [SyncState]. - pub fn new(store: Arc, rpc_api: Arc) -> Self { - Self { store, rpc_api } - } - - pub async fn step_sync_state( +#[async_trait(?Send)] +impl SyncState for ClientSyncState { + /// The sync process is done in multiple steps: + /// 1. A request is sent to the node to get the state updates. This request includes tracked + /// account IDs and the tags of notes that might have changed or that might be of interest to + /// the client. + /// 2. A response is received with the current state of the network. The response includes + /// information about new/committed/consumed notes, updated accounts, and committed + /// transactions. + /// 3. Tracked notes are updated with their new states. + /// 4. New notes are checked, and only relevant ones are stored. Relevant notes are those that + /// can be consumed by accounts the client is tracking (this is checked by the + /// [crate::notes::NoteScreener]) + /// 5. Transactions are updated with their new states. + /// 6. Tracked public accounts are updated and off-chain accounts are validated against the node + /// state. + /// 7. The MMR is updated with the new peaks and authentication nodes. + async fn step_sync_state( &mut self, current_block_num: u32, tracked_accounts: Vec, @@ -194,8 +233,13 @@ impl SyncState { Ok(Some(SyncStatus::SyncedToBlock(sync_update))) } } - // HELPERS - // -------------------------------------------------------------------------------------------- +} + +impl ClientSyncState { + /// Creates a new instance of [SyncState]. + pub fn new(store: Arc, rpc_api: Arc) -> Self { + Self { store, rpc_api } + } /// Returns the [NoteUpdates] containing new public note and committed input/output notes and a /// list or note tag records to be removed from the store. diff --git a/crates/rust-client/src/lib.rs b/crates/rust-client/src/lib.rs index 6aff26814..9ae0eb763 100644 --- a/crates/rust-client/src/lib.rs +++ b/crates/rust-client/src/lib.rs @@ -67,6 +67,7 @@ pub mod crypto { }; } +use components::sync_state::{ClientSyncState, SyncState}; pub use errors::{ClientError, IdPrefixFetchError}; pub use miden_objects::{Felt, StarkField, Word, ONE, ZERO}; @@ -120,6 +121,7 @@ pub struct Client { /// An instance of a [LocalTransactionProver] which will be the default prover for the client. tx_prover: Arc, tx_executor: TransactionExecutor, + sync_state: Box, } /// Construction and access methods. @@ -163,6 +165,7 @@ impl Client { let tx_executor = TransactionExecutor::new(data_store, authenticator).with_debug_mode(in_debug_mode); let tx_prover = Arc::new(LocalTransactionProver::default()); + let sync_state = Box::new(ClientSyncState::new(store.clone(), rpc_api.clone())); // TODO: Component should be a parameter of the constructor Self { store, @@ -170,6 +173,7 @@ impl Client { rpc_api, tx_executor, tx_prover, + sync_state, } } diff --git a/crates/rust-client/src/sync/mod.rs b/crates/rust-client/src/sync/mod.rs index 9a5b5ceb6..7b010788d 100644 --- a/crates/rust-client/src/sync/mod.rs +++ b/crates/rust-client/src/sync/mod.rs @@ -11,10 +11,7 @@ use miden_objects::{ transaction::TransactionId, }; -use crate::{ - components::sync_state::{SyncState, SyncStatus}, - Client, ClientError, -}; +use crate::{components::sync_state::SyncStatus, Client, ClientError}; mod block_headers; @@ -108,23 +105,6 @@ impl Client { /// Syncs the client's state with the current state of the Miden network. Returns the block /// number the client has been synced to. - /// - /// The sync process is done in multiple steps: - /// 1. A request is sent to the node to get the state updates. This request includes tracked - /// account IDs and the tags of notes that might have changed or that might be of interest to - /// the client. - /// 2. A response is received with the current state of the network. The response includes - /// information about new/committed/consumed notes, updated accounts, and committed - /// transactions. - /// 3. Tracked notes are updated with their new states. - /// 4. New notes are checked, and only relevant ones are stored. Relevant notes are those that - /// can be consumed by accounts the client is tracking (this is checked by the - /// [crate::notes::NoteScreener]) - /// 5. Transactions are updated with their new states. - /// 6. Tracked public accounts are updated and off-chain accounts are validated against the node - /// state. - /// 7. The MMR is updated with the new peaks and authentication nodes. - /// 8. All updates are applied to the store to be persisted. pub async fn sync_state(&mut self) -> Result { self.ensure_genesis_in_place().await?; let mut total_sync_summary = SyncSummary::new_empty(0); @@ -146,7 +126,8 @@ impl Client { let nullifiers = self.store.get_unspent_input_note_nullifiers().await?; // Sync the state with the network - let response = SyncState::new(self.store.clone(), self.rpc_api.clone()) + let response = self + .sync_state .step_sync_state(current_block_num, accounts, ¬e_tags, &nullifiers) .await?; From 8eb341fe9d3b7ada5bb19ae056320588be3b2d99 Mon Sep 17 00:00:00 2001 From: tomyrd Date: Mon, 23 Dec 2024 19:52:12 -0300 Subject: [PATCH 5/5] feat: use `SyncState` for light note sync --- .../rust-client/src/components/sync_state.rs | 17 ++ crates/rust-client/src/notes/import.rs | 157 +++++++++--------- 2 files changed, 98 insertions(+), 76 deletions(-) diff --git a/crates/rust-client/src/components/sync_state.rs b/crates/rust-client/src/components/sync_state.rs index 55e8a4a4a..895e9c916 100644 --- a/crates/rust-client/src/components/sync_state.rs +++ b/crates/rust-client/src/components/sync_state.rs @@ -55,6 +55,23 @@ pub struct StateSyncUpdate { pub tags_to_remove: Vec, } +impl StateSyncUpdate { + pub fn new_empty(block_header: BlockHeader) -> Self { + Self { + block_header, + note_updates: NoteUpdates::new(vec![], vec![], vec![], vec![]), + transactions_to_commit: vec![], + transactions_to_discard: vec![], + new_mmr_peaks: MmrPeaks::new(0, vec![]) + .expect("Blank MmrPeaks should not fail to instantiate"), + new_authentication_nodes: vec![], + updated_accounts: AccountUpdates::new(vec![], vec![]), + block_has_relevant_notes: false, + tags_to_remove: vec![], + } + } +} + impl From<&StateSyncUpdate> for SyncSummary { fn from(value: &StateSyncUpdate) -> Self { SyncSummary::new( diff --git a/crates/rust-client/src/notes/import.rs b/crates/rust-client/src/notes/import.rs index 58694d144..a1e786537 100644 --- a/crates/rust-client/src/notes/import.rs +++ b/crates/rust-client/src/notes/import.rs @@ -1,12 +1,16 @@ -use alloc::string::ToString; +use alloc::{boxed::Box, string::ToString, sync::Arc, vec::Vec}; use miden_objects::{ + accounts::AccountHeader, crypto::rand::FeltRng, - notes::{Note, NoteDetails, NoteFile, NoteId, NoteInclusionProof, NoteMetadata, NoteTag}, + notes::{Note, NoteDetails, NoteFile, NoteId, NoteInclusionProof, NoteTag, Nullifier}, }; +use tonic::async_trait; +use super::NoteUpdates; use crate::{ - rpc::domain::notes::NoteDetails as RpcNoteDetails, + components::sync_state::{StateSyncUpdate, SyncState, SyncStatus}, + rpc::{domain::notes::NoteDetails as RpcNoteDetails, NodeRpcClient}, store::{input_note_states::ExpectedNoteState, InputNoteRecord, InputNoteState}, sync::NoteTagRecord, Client, ClientError, @@ -198,7 +202,7 @@ impl Client { after_block_num: u32, tag: Option, ) -> Result, ClientError> { - let mut note_record = previous_note.unwrap_or({ + let note_record = previous_note.unwrap_or({ InputNoteRecord::new( details, self.store.get_current_timestamp(), @@ -206,91 +210,92 @@ impl Client { ) }); - let committed_note_data = if let Some(tag) = tag { - self.check_expected_note(after_block_num, tag, note_record.details()).await? - } else { - None - }; + if tag.is_none() { + return Ok(Some(note_record)); + } - match committed_note_data { - Some((metadata, inclusion_proof)) => { - let mut current_partial_mmr = self.store.build_current_partial_mmr(true).await?; - let block_header = self - .get_and_store_authenticated_block( - inclusion_proof.location().block_num(), - &mut current_partial_mmr, - ) - .await?; + let tag = tag.expect("tag should be Some"); + let mut sync_block_num = after_block_num; + let mut sync_state = SingleNoteSync::new(note_record.clone(), self.rpc_api.clone()); - let note_changed = - note_record.inclusion_proof_received(inclusion_proof, metadata)?; + loop { + let response = sync_state.step_sync_state(sync_block_num, vec![], &[tag], &[]).await?; + + match response { + None => return Ok(Some(note_record)), + Some(SyncStatus::SyncedToLastBlock(update)) + | Some(SyncStatus::SyncedToBlock(update)) => { + if let Some(new_note_record) = update.note_updates.updated_input_notes().first() + { + return Ok(Some(new_note_record.clone())); + } else { + sync_block_num = update.block_header.block_num(); + } + }, + } + } + } +} - if note_record.block_header_received(block_header)? | note_changed { - self.store - .remove_note_tag(NoteTagRecord::with_note_source( - metadata.tag(), - note_record.id(), - )) - .await?; +struct SingleNoteSync { + rpc_api: Arc, + note_record: InputNoteRecord, +} - Ok(Some(note_record)) - } else { - Ok(None) - } - }, - None => Ok(Some(note_record)), - } +impl SingleNoteSync { + fn new(note_record: InputNoteRecord, rpc_api: Arc) -> Self { + Self { note_record, rpc_api } } +} - /// Checks if a note with the given note_tag and ID is present in the chain between the - /// `request_block_num` and the current block. If found it returns its metadata and inclusion - /// proof. - async fn check_expected_note( +#[async_trait(?Send)] +impl SyncState for SingleNoteSync { + async fn step_sync_state( &mut self, - mut request_block_num: u32, - tag: NoteTag, - expected_note: &miden_objects::notes::NoteDetails, - ) -> Result, ClientError> { - let current_block_num = self.get_sync_height().await?; - loop { - if request_block_num > current_block_num { - return Ok(None); - }; + current_block_num: u32, + _tracked_accounts: Vec, + note_tags: &[NoteTag], + _nullifiers: &[Nullifier], + ) -> Result, ClientError> { + let tag = note_tags.first().expect("note_tags should have at least one element"); + let sync_notes = self.rpc_api.sync_notes(current_block_num, &[*tag]).await?; + + if sync_notes.block_header.block_num() == current_block_num { + return Ok(None); + } - let sync_notes = self.rpc_api.sync_notes(request_block_num, &[tag]).await?; + // This means that notes with that note_tag were found. + // Therefore, we should check if a note with the same id was found. + let committed_note = + sync_notes.notes.iter().find(|note| note.note_id() == &self.note_record.id()); + + let mut update = StateSyncUpdate::new_empty(sync_notes.block_header); + if let Some(note) = committed_note { + // This means that a note with the same id was found. + // Therefore, we should update it. + let note_changed = self.note_record.inclusion_proof_received( + NoteInclusionProof::new( + sync_notes.block_header.block_num(), + note.note_index(), + note.merkle_path().clone(), + )?, + note.metadata(), + )?; - if sync_notes.block_header.block_num() == sync_notes.chain_tip { + if !(self.note_record.block_header_received(sync_notes.block_header)? | note_changed) { + // If note was found but didn't change, we return None (as there is no state change) return Ok(None); } - // This means that notes with that note_tag were found. - // Therefore, we should check if a note with the same id was found. - let committed_note = - sync_notes.notes.iter().find(|note| note.note_id() == &expected_note.id()); - - if let Some(note) = committed_note { - // This means that a note with the same id was found. - // Therefore, we should mark the note as committed. - let note_block_num = sync_notes.block_header.block_num(); - - if note_block_num > current_block_num { - return Ok(None); - }; - - let note_inclusion_proof = NoteInclusionProof::new( - note_block_num, - note.note_index(), - note.merkle_path().clone(), - )?; + let single_note_update = + NoteUpdates::new(vec![], vec![], vec![self.note_record.clone()], vec![]); + update.note_updates = single_note_update; + } - return Ok(Some((note.metadata(), note_inclusion_proof))); - } else { - // This means that a note with the same id was not found. - // Therefore, we should request again for sync_notes with the same note_tag - // and with the block_num of the last block header - // (sync_notes.block_header.unwrap()). - request_block_num = sync_notes.block_header.block_num(); - } + if sync_notes.chain_tip == sync_notes.block_header.block_num() { + Ok(Some(SyncStatus::SyncedToLastBlock(update))) + } else { + Ok(Some(SyncStatus::SyncedToBlock(update))) } } }