Skip to content

Commit

Permalink
Merge pull request #3395 from ProvableHQ/cache_transmissions
Browse files Browse the repository at this point in the history
Cache transmissions for faster processing
  • Loading branch information
zosorock authored Oct 26, 2024
2 parents b544b7a + 9e9d3b1 commit 0e42776
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 43 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions node/bft/storage-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@ test = [ "memory" ]
[dependencies.aleo-std]
workspace = true

[dependencies.anyhow]
version = "1.0.79"

[dependencies.indexmap]
version = "2.1"
features = [ "serde", "rayon" ]

[dependencies.lru]
version = "0.12.1"

[dependencies.parking_lot]
version = "0.12"
optional = true
Expand Down
94 changes: 59 additions & 35 deletions node/bft/storage-service/src/persistent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use crate::StorageService;
use snarkvm::{
ledger::{
committee::Committee,
narwhal::{BatchHeader, Transmission, TransmissionID},
store::{
cow_to_cloned,
Expand All @@ -33,10 +34,14 @@ use snarkvm::{
};

use aleo_std::StorageMode;
use anyhow::anyhow;
use indexmap::{IndexSet, indexset};
use lru::LruCache;
use parking_lot::Mutex;
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
num::NonZeroUsize,
};
use tracing::error;

Expand All @@ -47,24 +52,40 @@ pub struct BFTPersistentStorage<N: Network> {
transmissions: DataMap<TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>)>,
/// The map of `aborted transmission ID` to `certificate IDs` entries.
aborted_transmission_ids: DataMap<TransmissionID<N>, IndexSet<Field<N>>>,
/// The LRU cache for `transmission ID` to `(transmission, certificate IDs)` entries that are part of the persistent storage.
cache_transmissions: Mutex<LruCache<TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>)>>,
/// The LRU cache for `aborted transmission ID` to `certificate IDs` entries that are part of the persistent storage.
cache_aborted_transmission_ids: Mutex<LruCache<TransmissionID<N>, IndexSet<Field<N>>>>,
}

impl<N: Network> BFTPersistentStorage<N> {
/// Initializes a new BFT persistent storage service.
pub fn open(storage_mode: StorageMode) -> Result<Self> {
let capacity = NonZeroUsize::new(
(Committee::<N>::MAX_COMMITTEE_SIZE as usize) * (BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH) * 2,
)
.ok_or_else(|| anyhow!("Could not construct NonZeroUsize"))?;

Ok(Self {
transmissions: internal::RocksDB::open_map(N::ID, storage_mode.clone(), MapID::BFT(BFTMap::Transmissions))?,
aborted_transmission_ids: internal::RocksDB::open_map(
N::ID,
storage_mode,
MapID::BFT(BFTMap::AbortedTransmissionIDs),
)?,
cache_transmissions: Mutex::new(LruCache::new(capacity)),
cache_aborted_transmission_ids: Mutex::new(LruCache::new(capacity)),
})
}

/// Initializes a new BFT persistent storage service.
/// Initializes a new BFT persistent storage service for testing.
#[cfg(any(test, feature = "test"))]
pub fn open_testing(temp_dir: std::path::PathBuf, dev: Option<u16>) -> Result<Self> {
let capacity = NonZeroUsize::new(
(Committee::<N>::MAX_COMMITTEE_SIZE as usize) * (BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH) * 2,
)
.ok_or_else(|| anyhow!("Could not construct NonZeroUsize"))?;

Ok(Self {
transmissions: internal::RocksDB::open_map_testing(
temp_dir.clone(),
Expand All @@ -76,6 +97,8 @@ impl<N: Network> BFTPersistentStorage<N> {
dev,
MapID::BFT(BFTMap::AbortedTransmissionIDs),
)?,
cache_transmissions: Mutex::new(LruCache::new(capacity)),
cache_aborted_transmission_ids: Mutex::new(LruCache::new(capacity)),
})
}
}
Expand All @@ -102,7 +125,12 @@ impl<N: Network> StorageService<N> for BFTPersistentStorage<N> {
/// Returns the transmission for the given `transmission ID`.
/// If the transmission ID does not exist in storage, `None` is returned.
fn get_transmission(&self, transmission_id: TransmissionID<N>) -> Option<Transmission<N>> {
// Get the transmission.
// Try to get the transmission from the cache first.
if let Some((transmission, _)) = self.cache_transmissions.lock().get_mut(&transmission_id) {
return Some(transmission.clone());
}

// If not found in cache, check persistent storage.
match self.transmissions.get_confirmed(&transmission_id) {
Ok(Some(Cow::Owned((transmission, _)))) => Some(transmission),
Ok(Some(Cow::Borrowed((transmission, _)))) => Some(transmission.clone()),
Expand Down Expand Up @@ -153,24 +181,19 @@ impl<N: Network> StorageService<N> for BFTPersistentStorage<N> {
aborted_transmission_ids: HashSet<TransmissionID<N>>,
mut missing_transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
) {
// Inserts the following:
// - Inserts **only the missing** transmissions from storage.
// - Inserts the certificate ID into the corresponding set for **all** transmissions.
// First, handle the non-aborted transmissions.
'outer: for transmission_id in transmission_ids {
// Retrieve the transmission entry.
match self.transmissions.get_confirmed(&transmission_id) {
// Try to fetch from the persistent storage.
let (transmission, certificate_ids) = match self.transmissions.get_confirmed(&transmission_id) {
Ok(Some(entry)) => {
// The transmission exists in storage; update its certificate IDs.
let (transmission, mut certificate_ids) = cow_to_cloned!(entry);
// Insert the certificate ID into the set.
certificate_ids.insert(certificate_id);
// Update the transmission entry.
if let Err(e) = self.transmissions.insert(transmission_id, (transmission, certificate_ids)) {
error!("Failed to insert transmission {transmission_id} into storage - {e}");
continue 'outer;
}
(transmission, certificate_ids)
}
Ok(None) => {
// Retrieve the missing transmission.
// The transmission is missing from persistent storage.
// Check if it exists in the `missing_transmissions` map provided.
let Some(transmission) = missing_transmissions.remove(&transmission_id) else {
if !aborted_transmission_ids.contains(&transmission_id)
&& !self.contains_transmission(transmission_id)
Expand All @@ -181,45 +204,46 @@ impl<N: Network> StorageService<N> for BFTPersistentStorage<N> {
};
// Prepare the set of certificate IDs.
let certificate_ids = indexset! { certificate_id };
// Insert the transmission and a new set with the certificate ID.
if let Err(e) = self.transmissions.insert(transmission_id, (transmission, certificate_ids)) {
error!("Failed to insert transmission {transmission_id} into storage - {e}");
continue 'outer;
}
(transmission, certificate_ids)
}
Err(e) => {
// Handle any errors during the retrieval.
error!("Failed to process the 'insert' for transmission {transmission_id} into storage - {e}");
continue 'outer;
continue;
}
};
// Insert the transmission into persistent storage.
if let Err(e) = self.transmissions.insert(transmission_id, (transmission.clone(), certificate_ids.clone()))
{
error!("Failed to insert transmission {transmission_id} into storage - {e}");
}
// Insert the transmission into the cache.
self.cache_transmissions.lock().put(transmission_id, (transmission, certificate_ids));
}
// Inserts the aborted transmission IDs.

// Next, handle the aborted transmission IDs.
for aborted_transmission_id in aborted_transmission_ids {
// Retrieve the transmission entry.
match self.aborted_transmission_ids.get_confirmed(&aborted_transmission_id) {
let certificate_ids = match self.aborted_transmission_ids.get_confirmed(&aborted_transmission_id) {
Ok(Some(entry)) => {
let mut certificate_ids = cow_to_cloned!(entry);
// Insert the certificate ID into the set.
certificate_ids.insert(certificate_id);
// Update the transmission entry.
if let Err(e) = self.aborted_transmission_ids.insert(aborted_transmission_id, certificate_ids) {
error!("Failed to insert aborted transmission ID {aborted_transmission_id} into storage - {e}");
}
}
Ok(None) => {
// Prepare the set of certificate IDs.
let certificate_ids = indexset! { certificate_id };
// Insert the transmission and a new set with the certificate ID.
if let Err(e) = self.aborted_transmission_ids.insert(aborted_transmission_id, certificate_ids) {
error!("Failed to insert aborted transmission ID {aborted_transmission_id} into storage - {e}");
}
certificate_ids
}
Ok(None) => indexset! { certificate_id },
Err(e) => {
error!(
"Failed to process the 'insert' for aborted transmission ID {aborted_transmission_id} into storage - {e}"
);
continue;
}
};
// Insert the certificate IDs into the persistent storage.
if let Err(e) = self.aborted_transmission_ids.insert(aborted_transmission_id, certificate_ids.clone()) {
error!("Failed to insert aborted transmission ID {aborted_transmission_id} into storage - {e}");
}
// Insert the certificate IDs into the cache.
self.cache_aborted_transmission_ids.lock().put(aborted_transmission_id, certificate_ids);
}
}

Expand Down
27 changes: 19 additions & 8 deletions node/router/tests/disconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common::*;
use snarkos_node_tcp::{P2P, protocols::Handshake};

use core::time::Duration;
use deadline::deadline;

#[tokio::test]
async fn test_disconnect_without_handshake() {
Expand All @@ -34,8 +35,12 @@ async fn test_disconnect_without_handshake() {

// Connect node0 to node1.
node0.connect(node1.local_ip());
// Sleep briefly.
tokio::time::sleep(Duration::from_millis(200)).await;
// Await both nodes being connected.
let node0_ = node0.clone();
let node1_ = node1.clone();
deadline!(Duration::from_secs(1), move || {
node0_.tcp().num_connected() == 1 && node1_.tcp().num_connected() == 1
});

print_tcp!(node0);
print_tcp!(node1);
Expand All @@ -50,8 +55,9 @@ async fn test_disconnect_without_handshake() {
// collection of connected peers is only altered during the handshake,
// as well as the address resolver needed for the higher-level calls
node0.tcp().disconnect(node1.local_ip()).await;
// Sleep briefly.
tokio::time::sleep(Duration::from_millis(100)).await;
// Await disconnection.
let node0_ = node0.clone();
deadline!(Duration::from_secs(1), move || { node0_.tcp().num_connected() == 0 });

print_tcp!(node0);
print_tcp!(node1);
Expand Down Expand Up @@ -80,8 +86,12 @@ async fn test_disconnect_with_handshake() {

// Connect node0 to node1.
node0.connect(node1.local_ip());
// Sleep briefly.
tokio::time::sleep(Duration::from_millis(1000)).await;
// Await for the nodes to be connected.
let node0_ = node0.clone();
let node1_ = node1.clone();
deadline!(Duration::from_secs(1), move || {
node0_.tcp().num_connected() == 1 && node1_.tcp().num_connected() == 1
});

print_tcp!(node0);
print_tcp!(node1);
Expand All @@ -98,8 +108,9 @@ async fn test_disconnect_with_handshake() {

// Disconnect node0 from node1.
node0.disconnect(node1.local_ip());
// Sleep briefly.
tokio::time::sleep(Duration::from_millis(100)).await;
// Await nodes being disconnected.
let node0_ = node0.clone();
deadline!(Duration::from_secs(1), move || { node0_.tcp().num_connected() == 0 });

print_tcp!(node0);
print_tcp!(node1);
Expand Down

0 comments on commit 0e42776

Please sign in to comment.