From 177cc928498eb6a3a98521c7725587856163e487 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 30 Sep 2024 16:20:24 +0300 Subject: [PATCH 01/12] transport-service/tests: Test assumptions for dropping connections on dropped substreams Signed-off-by: Alexandru Vasile --- src/protocol/transport_service.rs | 93 ++++++++++++++++++++++++++++++- 1 file changed, 92 insertions(+), 1 deletion(-) diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index c2149283..3b3c768e 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -599,7 +599,7 @@ impl Stream for TransportService { mod tests { use super::*; use crate::{ - protocol::TransportService, + protocol::{ProtocolCommand, TransportService}, transport::{ manager::{handle::InnerTransportManagerCommand, TransportManagerHandle}, KEEP_ALIVE_TIMEOUT, @@ -1254,4 +1254,95 @@ mod tests { None => panic!("expected {peer} to exist"), } } + + #[tokio::test] + async fn downgraded_connection_without_substreams_is_closed() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let (mut service, sender, _) = transport_service(); + let peer = PeerId::random(); + + // register first connection + let (cmd_tx1, mut cmd_rx1) = channel(64); + sender + .send(InnerTransportEvent::ConnectionEstablished { + peer, + connection: ConnectionId::from(1337usize), + endpoint: Endpoint::dialer(Multiaddr::empty(), ConnectionId::from(1337usize)), + sender: ConnectionHandle::new(ConnectionId::from(1337usize), cmd_tx1), + }) + .await + .unwrap(); + + if let Some(TransportEvent::ConnectionEstablished { + peer: connected_peer, + endpoint, + }) = service.next().await + { + assert_eq!(connected_peer, peer); + assert_eq!(endpoint.address(), &Multiaddr::empty()); + } else { + panic!("expected event from `TransportService`"); + }; + + // verify the first connection state is correct + assert_eq!( + service.keep_alive_tracker.pending_keep_alive_timeouts.len(), + 1 + ); + match service.connections.get(&peer) { + Some(context) => { + assert_eq!( + context.primary.connection_id(), + &ConnectionId::from(1337usize) + ); + // Check the connection is still active. + assert!(context.primary.is_active()); + assert!(context.secondary.is_none()); + } + None => panic!("expected {peer} to exist"), + } + + // Open a substream to the peer. + let substream_id = service.open_substream(peer).unwrap(); + + // Simulate keep-alive timeout expiration. + service + .connections + .get_mut(&peer) + .unwrap() + .downgrade(&ConnectionId::from(1337usize)); + + let protocol_command = cmd_rx1.recv().await.unwrap(); + // ProtocolCommand + match protocol_command { + ProtocolCommand::OpenSubstream { + protocol, + substream_id: opened_substream_id, + permit, + .. + } => { + assert_eq!(protocol, ProtocolName::from("/notif/1")); + assert_eq!(substream_id, opened_substream_id); + + // Individual transports like TCP will open a substream + // and then will generate a `SubstreamOpened` event via + // the protocol-set handler. + // + // The substream is used by individual protocols and then + // is closed. This simulates the substream being closed. + drop(permit); + } + _ => panic!("expected `ProtocolCommand::OpenSubstream`"), + } + + // Open a substream to the peer. + // However, the connection was downgraded and all substreams are closed. + assert_eq!( + service.open_substream(peer), + Err(SubstreamError::ConnectionClosed) + ); + } } From 52cd146ea2a84e444f5eb1e79e572f38b8f9a3a1 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 30 Sep 2024 16:26:00 +0300 Subject: [PATCH 02/12] transport-service/tests: Extend test for multiple substreams being dropped Signed-off-by: Alexandru Vasile --- src/protocol/transport_service.rs | 73 ++++++++++++++++++++++++++----- 1 file changed, 62 insertions(+), 11 deletions(-) diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index 3b3c768e..bb1b0b0b 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -1305,8 +1305,9 @@ mod tests { None => panic!("expected {peer} to exist"), } - // Open a substream to the peer. + // Open substreams to the peer. let substream_id = service.open_substream(peer).unwrap(); + let second_substream_id = service.open_substream(peer).unwrap(); // Simulate keep-alive timeout expiration. service @@ -1315,8 +1316,59 @@ mod tests { .unwrap() .downgrade(&ConnectionId::from(1337usize)); + let mut permits = Vec::new(); + + // First substream. + let protocol_command = cmd_rx1.recv().await.unwrap(); + match protocol_command { + ProtocolCommand::OpenSubstream { + protocol, + substream_id: opened_substream_id, + permit, + .. + } => { + assert_eq!(protocol, ProtocolName::from("/notif/1")); + assert_eq!(substream_id, opened_substream_id); + + // Save the substream permit for later. + permits.push(permit); + } + _ => panic!("expected `ProtocolCommand::OpenSubstream`"), + } + + // Second substream. + let protocol_command = cmd_rx1.recv().await.unwrap(); + match protocol_command { + ProtocolCommand::OpenSubstream { + protocol, + substream_id: opened_substream_id, + permit, + .. + } => { + assert_eq!(protocol, ProtocolName::from("/notif/1")); + assert_eq!(second_substream_id, opened_substream_id); + + // Save the substream permit for later. + permits.push(permit); + } + _ => panic!("expected `ProtocolCommand::OpenSubstream`"), + } + + // Drop one permit. + let permit = permits.pop(); + // Individual transports like TCP will open a substream + // and then will generate a `SubstreamOpened` event via + // the protocol-set handler. + // + // The substream is used by individual protocols and then + // is closed. This simulates the substream being closed. + drop(permit); + + // Open a new substream to the peer. This will succeed as long as we still have + // one substream open. + let substream_id = service.open_substream(peer).unwrap(); + // Handle the substream. let protocol_command = cmd_rx1.recv().await.unwrap(); - // ProtocolCommand match protocol_command { ProtocolCommand::OpenSubstream { protocol, @@ -1327,19 +1379,18 @@ mod tests { assert_eq!(protocol, ProtocolName::from("/notif/1")); assert_eq!(substream_id, opened_substream_id); - // Individual transports like TCP will open a substream - // and then will generate a `SubstreamOpened` event via - // the protocol-set handler. - // - // The substream is used by individual protocols and then - // is closed. This simulates the substream being closed. - drop(permit); + // Save the substream permit for later. + permits.push(permit); } _ => panic!("expected `ProtocolCommand::OpenSubstream`"), } - // Open a substream to the peer. - // However, the connection was downgraded and all substreams are closed. + // Drop all substreams. + drop(permits); + + // Cannot open a new substream because: + // 1. connection was downgraded by keep-alive timeout + // 2. all substreams were dropped. assert_eq!( service.open_substream(peer), Err(SubstreamError::ConnectionClosed) From 35c67f0693ba0b996cdab973b2e5dfbaddd352b4 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 30 Sep 2024 17:25:21 +0300 Subject: [PATCH 03/12] transport-service: Simplify keep alive hashmap tracking Signed-off-by: Alexandru Vasile --- src/protocol/transport_service.rs | 41 +++++++++++-------------------- 1 file changed, 15 insertions(+), 26 deletions(-) diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index bb1b0b0b..5e6fac9e 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -33,7 +33,7 @@ use multihash::Multihash; use tokio::sync::mpsc::{channel, Receiver, Sender}; use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{HashMap, HashSet}, fmt::Debug, pin::Pin, sync::{ @@ -113,7 +113,7 @@ struct KeepAliveTracker { pending_keep_alive_timeouts: FuturesUnordered>, /// Track substream last activity. - last_activity: HashMap>, + last_activity: HashMap<(PeerId, ConnectionId), Instant>, /// Saved waker. waker: Option, @@ -151,21 +151,13 @@ impl KeepAliveTracker { /// Called on connection closed event. pub fn on_connection_closed(&mut self, peer: PeerId, connection_id: ConnectionId) { - if let Entry::Occupied(mut entry) = self.last_activity.entry(peer) { - entry.get_mut().remove(&connection_id); - if entry.get().is_empty() { - entry.remove(); - } - } + self.last_activity.remove(&(peer, connection_id)); } /// Called on substream opened event to track the last activity. pub fn substream_activity(&mut self, peer: PeerId, connection_id: ConnectionId) { // Keep track of the connection ID and the time the substream was opened. - self.last_activity - .entry(peer) - .or_default() - .insert(connection_id.clone(), Instant::now()); + self.last_activity.insert((peer, connection_id), Instant::now()); } } @@ -190,20 +182,17 @@ impl Stream for KeepAliveTracker { // activity since the timeout was started for this connection. let next_keep_alive = this .last_activity - .get(&peer) - .map(|activities| { - activities.iter().find_map(|(connection, when)| { - if connection_id == *connection && when.elapsed() < this.keep_alive_timeout - { - Some(( - peer, - connection_id, - this.keep_alive_timeout - when.elapsed(), - )) - } else { - None - } - }) + .get(&(peer, connection_id)) + .map(|when| { + if when.elapsed() < this.keep_alive_timeout { + Some(( + peer, + connection_id, + this.keep_alive_timeout - when.elapsed(), + )) + } else { + None + } }) .flatten(); From 38665fd5fd7590374eb5ff7ceca8c0ca1fd508da Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 30 Sep 2024 17:54:23 +0300 Subject: [PATCH 04/12] transport-service: Replace FuturesUnordered with more efficient polling Signed-off-by: Alexandru Vasile --- src/protocol/transport_service.rs | 99 ++++++++++++------------------- 1 file changed, 37 insertions(+), 62 deletions(-) diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index 5e6fac9e..8e702b32 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -27,13 +27,13 @@ use crate::{ PeerId, DEFAULT_CHANNEL_SIZE, }; -use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; +use futures::{Stream, StreamExt}; use multiaddr::{Multiaddr, Protocol}; use multihash::Multihash; use tokio::sync::mpsc::{channel, Receiver, Sender}; use std::{ - collections::{HashMap, HashSet}, + collections::{HashMap, HashSet, VecDeque}, fmt::Debug, pin::Pin, sync::{ @@ -109,11 +109,14 @@ struct KeepAliveTracker { /// Close the connection if no substreams are open within this time frame. keep_alive_timeout: Duration, - /// Pending keep-alive timeouts. - pending_keep_alive_timeouts: FuturesUnordered>, - /// Track substream last activity. last_activity: HashMap<(PeerId, ConnectionId), Instant>, + /// Insertion order of the last activity. + /// + /// Since the time added to the `last_activity` hashmap is always monotonically increasing, + /// we can use this to optimize the search for the next keep-alive timeout. This + /// results in fewer polls. + last_activity_order: VecDeque<(PeerId, ConnectionId)>, /// Saved waker. waker: Option, @@ -124,33 +127,22 @@ impl KeepAliveTracker { pub fn new(keep_alive_timeout: Duration) -> Self { Self { keep_alive_timeout, - pending_keep_alive_timeouts: FuturesUnordered::new(), last_activity: HashMap::new(), + last_activity_order: VecDeque::new(), waker: None, } } - fn inner_on_connection_established( - &mut self, - peer: PeerId, - connection_id: ConnectionId, - keep_alive_timeout: Duration, - ) { - self.pending_keep_alive_timeouts.push(Box::pin(async move { - tokio::time::sleep(keep_alive_timeout).await; - (peer, connection_id) - })); - - self.waker.take().map(|waker| waker.wake()); - } - /// Called on connection established event to add a new keep-alive timeout. pub fn on_connection_established(&mut self, peer: PeerId, connection_id: ConnectionId) { - self.inner_on_connection_established(peer, connection_id, self.keep_alive_timeout); + self.substream_activity(peer, connection_id); } /// Called on connection closed event. pub fn on_connection_closed(&mut self, peer: PeerId, connection_id: ConnectionId) { + // The connection is closed, we'll pop the `last_activity_order` elements + // in the poll method. This ensures we leverage the `VecDeque` ordering + // instead of searching and allocating for a new vector here. self.last_activity.remove(&(peer, connection_id)); } @@ -158,6 +150,11 @@ impl KeepAliveTracker { pub fn substream_activity(&mut self, peer: PeerId, connection_id: ConnectionId) { // Keep track of the connection ID and the time the substream was opened. self.last_activity.insert((peer, connection_id), Instant::now()); + // Keep track of the order of the last activity insertions. + self.last_activity_order.push_back((peer, connection_id)); + + // Wake any pending poll. + self.waker.take().map(|waker| waker.wake()); } } @@ -167,60 +164,38 @@ impl Stream for KeepAliveTracker { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - if this.pending_keep_alive_timeouts.is_empty() { - // Save current waker. - this.waker = Some(cx.waker().clone()); - return Poll::Pending; - } - - if let Poll::Ready(event) = this.pending_keep_alive_timeouts.poll_next_unpin(cx) { - let Some((peer, connection_id)) = event else { - return Poll::Ready(None); + loop { + let Some(key) = this.last_activity_order.iter().next() else { + // No elements are tracked in the last activity order. Wait for connections. + this.waker = Some(cx.waker().clone()); + return Poll::Pending; }; - // Keep-alive timeout triggered for this connection. Double check if there is any - // activity since the timeout was started for this connection. - let next_keep_alive = this - .last_activity - .get(&(peer, connection_id)) - .map(|when| { - if when.elapsed() < this.keep_alive_timeout { - Some(( - peer, - connection_id, - this.keep_alive_timeout - when.elapsed(), - )) - } else { - None - } - }) - .flatten(); + let Some(when) = this.last_activity.get(key) else { + // The key does not correspond to an active connection. Remove it from the order. + this.last_activity_order.pop_front(); + continue; + }; - if let Some((peer, connection_id, keep_alive_timeout)) = next_keep_alive { - tracing::trace!( - target: LOG_TARGET, - ?peer, - ?connection_id, - ?keep_alive_timeout, - "keep-alive timeout extended", - ); - this.inner_on_connection_established(peer, connection_id, keep_alive_timeout); + if when.elapsed() < this.keep_alive_timeout { + // No keep-alive timeout reached yet. this.waker = Some(cx.waker().clone()); return Poll::Pending; } + // Keep-alive timeout reached. tracing::trace!( target: LOG_TARGET, - ?peer, - ?connection_id, + peer = ?key.0, + connection_id = ?key.1, "keep-alive timeout triggered", ); + let key = *key; + this.last_activity_order.pop_front(); + this.last_activity.remove(&key); - return Poll::Ready(Some((peer, connection_id))); + return Poll::Ready(Some(key)); } - - this.waker = Some(cx.waker().clone()); - return Poll::Pending; } } From 71571a887afb4a465fd52f5d1551bc058c6d6595 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 30 Sep 2024 18:01:29 +0300 Subject: [PATCH 05/12] transport-service: Upgrade connection on substream activity Signed-off-by: Alexandru Vasile --- src/protocol/connection.rs | 9 +++++++++ src/protocol/transport_service.rs | 26 ++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/src/protocol/connection.rs b/src/protocol/connection.rs index 788f770c..035befaa 100644 --- a/src/protocol/connection.rs +++ b/src/protocol/connection.rs @@ -94,6 +94,15 @@ impl ConnectionHandle { } } + /// Try to upgrade the connection to active state. + pub fn try_open(&mut self) { + if let ConnectionType::Inactive(inactive) = &self.connection { + if let Some(active) = inactive.upgrade() { + self.connection = ConnectionType::Active(active); + } + } + } + /// Attempt to acquire permit which will keep the connection open for indefinite time. pub fn try_get_permit(&self) -> Option { match &self.connection { diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index 8e702b32..e8d81527 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -96,6 +96,29 @@ impl ConnectionContext { "connection doesn't exist, cannot downgrade", ); } + + /// Try to upgrade the connection to active state. + fn try_upgrade(&mut self, connection_id: &ConnectionId) { + if self.primary.connection_id() == connection_id { + self.primary.try_open(); + return; + } + + if let Some(handle) = &mut self.secondary { + if handle.connection_id() == connection_id { + handle.try_open(); + return; + } + } + + tracing::debug!( + target: LOG_TARGET, + primary = ?self.primary.connection_id(), + secondary = ?self.secondary.as_ref().map(|handle| handle.connection_id()), + ?connection_id, + "connection doesn't exist, cannot upgrade", + ); + } } /// Tracks connection keep-alive timeouts. @@ -524,6 +547,9 @@ impl Stream for TransportService { }) => { if protocol == self.protocol { self.keep_alive_tracker.substream_activity(peer, connection_id); + if let Some(context) = self.connections.get_mut(&peer) { + context.try_upgrade(&connection_id); + } } return Poll::Ready(Some(TransportEvent::SubstreamOpened { From b28aa67129ee83967728030741c4a53a2970fa5f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 1 Oct 2024 14:48:01 +0300 Subject: [PATCH 06/12] transport_service: Ensure proper ordering Signed-off-by: Alexandru Vasile --- src/protocol/transport_service.rs | 54 +++++++++++++++++++++++++++---- 1 file changed, 48 insertions(+), 6 deletions(-) diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index e8d81527..24abfe59 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -139,6 +139,25 @@ struct KeepAliveTracker { /// Since the time added to the `last_activity` hashmap is always monotonically increasing, /// we can use this to optimize the search for the next keep-alive timeout. This /// results in fewer polls. + /// + /// # Notes + /// + /// We might have a situation where the connection is updated multiple times: + /// + /// - (peerA, id0): SubstreamA, instant T0 + /// - (peerA, id0): SubstreamC, instant T2 + /// + /// // Peer B opens in between the updates the following substream: + /// - (peerB, id1): SubstreamZ, instant T3 + /// + /// - (peerA, id0): SubstreamB, instant T4 + /// + /// For this example, the insertion order contains the following keys: + /// `[(peerA, id0), (peerA, id0), (peerB, id1), (peerA, id0)]`. + /// + /// To ensure a correct ordering, we'll discard elements that expire after the next element in + /// the order. This ensures that we always have the most recent activity at the front of the + /// queue. last_activity_order: VecDeque<(PeerId, ConnectionId)>, /// Saved waker. @@ -188,20 +207,44 @@ impl Stream for KeepAliveTracker { let this = self.get_mut(); loop { - let Some(key) = this.last_activity_order.iter().next() else { + let Some(key) = this.last_activity_order.pop_front() else { // No elements are tracked in the last activity order. Wait for connections. this.waker = Some(cx.waker().clone()); return Poll::Pending; }; - - let Some(when) = this.last_activity.get(key) else { - // The key does not correspond to an active connection. Remove it from the order. - this.last_activity_order.pop_front(); + let Some(when) = this.last_activity.get(&key) else { + // The key does not correspond to an active connection. continue; }; + // Compare with the next key to ensure: + // - (1). identical keys are discarded + // - (2). the oldest instant is at the front of the queue (preserve ordering) + // - (3). stale connections are ignored + if let Some(next_key) = this.last_activity_order.front() { + // (1). Handle multiple inserts for the same connection. + if &key == next_key { + continue; + } + + if let Some(next_when) = this.last_activity.get(next_key) { + // (2). If the next key is older, we'll discard the current key. + // The current key corresponds to a more recent entry that will be polled later. + if next_when < when { + continue; + } + } else { + // (3). Preserve the current key if the next key corresponds to stale + // connection. We can't ensure (1) or (2) at this point if the connection + // is stale. We'll handle this in the next iteration. + this.last_activity_order.push_front(key); + continue; + } + } + if when.elapsed() < this.keep_alive_timeout { // No keep-alive timeout reached yet. + this.last_activity_order.push_front(key); this.waker = Some(cx.waker().clone()); return Poll::Pending; } @@ -213,7 +256,6 @@ impl Stream for KeepAliveTracker { connection_id = ?key.1, "keep-alive timeout triggered", ); - let key = *key; this.last_activity_order.pop_front(); this.last_activity.remove(&key); From d3058eb3d55b2e180d824a4b90ce490790c99c1a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 1 Oct 2024 15:02:00 +0300 Subject: [PATCH 07/12] transport_service: Break down logic to multiple fns Signed-off-by: Alexandru Vasile --- src/protocol/transport_service.rs | 51 +++++++++++++++++++------------ 1 file changed, 32 insertions(+), 19 deletions(-) diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index 24abfe59..8285c57b 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -198,22 +198,17 @@ impl KeepAliveTracker { // Wake any pending poll. self.waker.take().map(|waker| waker.wake()); } -} - -impl Stream for KeepAliveTracker { - type Item = (PeerId, ConnectionId); - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); + /// Get the next ordered key element. + /// + /// The returned element is no longer part of the `last_activity_order` queue. + /// The method ensures that the oldest instant is at the front of the queue and + /// that the returned element corresponds to a connection that is still active. + fn get_next_ordered(&mut self) -> Option<(PeerId, ConnectionId)> { loop { - let Some(key) = this.last_activity_order.pop_front() else { - // No elements are tracked in the last activity order. Wait for connections. - this.waker = Some(cx.waker().clone()); - return Poll::Pending; - }; - let Some(when) = this.last_activity.get(&key) else { - // The key does not correspond to an active connection. + let key = self.last_activity_order.pop_front()?; + // Key corresponds to a connection that is no longer active. + let Some(when) = self.last_activity.get(&key) else { continue; }; @@ -221,13 +216,13 @@ impl Stream for KeepAliveTracker { // - (1). identical keys are discarded // - (2). the oldest instant is at the front of the queue (preserve ordering) // - (3). stale connections are ignored - if let Some(next_key) = this.last_activity_order.front() { + if let Some(next_key) = self.last_activity_order.front() { // (1). Handle multiple inserts for the same connection. if &key == next_key { continue; } - if let Some(next_when) = this.last_activity.get(next_key) { + if let Some(next_when) = self.last_activity.get(next_key) { // (2). If the next key is older, we'll discard the current key. // The current key corresponds to a more recent entry that will be polled later. if next_when < when { @@ -237,13 +232,32 @@ impl Stream for KeepAliveTracker { // (3). Preserve the current key if the next key corresponds to stale // connection. We can't ensure (1) or (2) at this point if the connection // is stale. We'll handle this in the next iteration. - this.last_activity_order.push_front(key); + self.last_activity_order.push_front(key); continue; } } + return Some(key); + } + } +} + +impl Stream for KeepAliveTracker { + type Item = (PeerId, ConnectionId); + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + loop { + let Some(key) = this.get_next_ordered() else { + // No more keys to process. + this.waker = Some(cx.waker().clone()); + return Poll::Pending; + }; + + let when = this.last_activity.get(&key).expect("key exists; qed by get_next_ordered"); + // No keep-alive timeout reached yet. if when.elapsed() < this.keep_alive_timeout { - // No keep-alive timeout reached yet. this.last_activity_order.push_front(key); this.waker = Some(cx.waker().clone()); return Poll::Pending; @@ -256,7 +270,6 @@ impl Stream for KeepAliveTracker { connection_id = ?key.1, "keep-alive timeout triggered", ); - this.last_activity_order.pop_front(); this.last_activity.remove(&key); return Poll::Ready(Some(key)); From fa9ab265b67082e9618904b815e078a5e31e356a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 3 Oct 2024 15:03:27 +0300 Subject: [PATCH 08/12] transport-service/tests: Keep track of connection ordering Signed-off-by: Alexandru Vasile --- src/protocol/transport_service.rs | 76 +++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index 8285c57b..43950fa1 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -1441,4 +1441,80 @@ mod tests { Err(SubstreamError::ConnectionClosed) ); } + + #[tokio::test] + async fn keep_alive_pop_elements() { + let mut tracker = KeepAliveTracker::new(Duration::from_secs(1)); + + let (peer1, connection1) = (PeerId::random(), ConnectionId::from(1usize)); + let (peer2, connection2) = (PeerId::random(), ConnectionId::from(2usize)); + + tracker.substream_activity(peer1, connection1); + tracker.substream_activity(peer2, connection2); + + let key = tracker.get_next_ordered().unwrap(); + assert_eq!(key, (peer1, connection1)); + + let key = tracker.get_next_ordered().unwrap(); + assert_eq!(key, (peer2, connection2)); + + // No more elements. + assert!(tracker.get_next_ordered().is_none()); + assert!(tracker.last_activity_order.is_empty()); + } + + #[tokio::test] + async fn keep_alive_pop_multiple_elements_in_order() { + let mut tracker = KeepAliveTracker::new(Duration::from_secs(1)); + + let (peer1, connection1) = (PeerId::random(), ConnectionId::from(1usize)); + let (peer2, connection2) = (PeerId::random(), ConnectionId::from(2usize)); + + // T0. + tracker.substream_activity(peer1, connection1); + // T1 update. + tracker.substream_activity(peer1, connection1); + // Different peer on T2. + tracker.substream_activity(peer2, connection2); + // T3 update for peer1. + tracker.substream_activity(peer1, connection1); + + // This setup effectively tracks: + // [ (peer2, connection2) -> T2; (peer1, connection1) -> T3 ] + + let key = tracker.get_next_ordered().unwrap(); + assert_eq!(key, (peer2, connection2)); + + let key = tracker.get_next_ordered().unwrap(); + assert_eq!(key, (peer1, connection1)); + + // No more elements. + assert!(tracker.get_next_ordered().is_none()); + assert!(tracker.last_activity_order.is_empty()); + } + + #[tokio::test] + async fn keep_alive_pop_multiple_elements_in_order_last_element() { + let mut tracker = KeepAliveTracker::new(Duration::from_secs(1)); + + let (peer1, connection1) = (PeerId::random(), ConnectionId::from(1usize)); + let (peer2, connection2) = (PeerId::random(), ConnectionId::from(2usize)); + + // T0. + tracker.substream_activity(peer1, connection1); + // T1 update. + tracker.substream_activity(peer1, connection1); + // T2 update. + tracker.substream_activity(peer1, connection1); + + // This setup effectively tracks: + // [ (peer1, connection1) -> T3 ] + + let key = tracker.get_next_ordered().unwrap(); + assert_eq!(key, (peer1, connection1)); + + // No more elements. + assert!(tracker.get_next_ordered().is_none()); + assert!(tracker.last_activity_order.is_empty()); + } } From eedad302dd4953b847076bd48a2370e17be48e27 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 3 Oct 2024 15:32:19 +0300 Subject: [PATCH 09/12] transport-service: Remove next stale connections Signed-off-by: Alexandru Vasile --- src/protocol/transport_service.rs | 70 +++++++++++-------------------- 1 file changed, 24 insertions(+), 46 deletions(-) diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index 43950fa1..958ffcf4 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -229,9 +229,11 @@ impl KeepAliveTracker { continue; } } else { - // (3). Preserve the current key if the next key corresponds to stale - // connection. We can't ensure (1) or (2) at this point if the connection + // (3). Remove the next key if it corresponds to a stale connection + // and preserve the current key. + // We can't ensure (1) or (2) at this point if the next connection // is stale. We'll handle this in the next iteration. + self.last_activity_order.pop_front(); self.last_activity_order.push_front(key); continue; } @@ -819,7 +821,11 @@ mod tests { } #[tokio::test] - async fn secondary_closing_doesnt_emit_event() { + async fn secondary_closing_does_not_emit_event() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + let (mut service, sender, _) = transport_service(); let peer = PeerId::random(); @@ -1024,10 +1030,7 @@ mod tests { }; // verify the first connection state is correct - assert_eq!( - service.keep_alive_tracker.pending_keep_alive_timeouts.len(), - 1 - ); + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); match service.connections.get(&peer) { Some(context) => { assert_eq!( @@ -1058,18 +1061,12 @@ mod tests { panic!("expected event from `TransportService`"); } - // verify that the keep-alive timeout still exists for the peer but the peer itself - // doesn't exist anymore - // - // the peer is removed because there is no connection to them - assert_eq!( - service.keep_alive_tracker.pending_keep_alive_timeouts.len(), - 1 - ); + // Because the connection was closed, the peer is no longer tracked for keep-alive. + // This leads to better tracking overall since we don't have to track stale connections. + assert!(service.keep_alive_tracker.last_activity.is_empty()); assert!(service.connections.get(&peer).is_none()); - // register new primary connection but verify that there are now two pending keep-alive - // timeouts + // Register new primary connection. let (cmd_tx1, _cmd_rx1) = channel(64); sender .send(InnerTransportEvent::ConnectionEstablished { @@ -1092,11 +1089,7 @@ mod tests { panic!("expected event from `TransportService`"); }; - // verify the first connection state is correct - assert_eq!( - service.keep_alive_tracker.pending_keep_alive_timeouts.len(), - 2 - ); + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); match service.connections.get(&peer) { Some(context) => { assert_eq!( @@ -1155,10 +1148,7 @@ mod tests { }; // verify the first connection state is correct - assert_eq!( - service.keep_alive_tracker.pending_keep_alive_timeouts.len(), - 1 - ); + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); match service.connections.get(&peer) { Some(context) => { assert_eq!( @@ -1189,6 +1179,8 @@ mod tests { } None => panic!("expected {peer} to exist"), } + + assert_eq!(service.keep_alive_tracker.last_activity.len(), 0); } #[tokio::test] @@ -1224,10 +1216,7 @@ mod tests { }; // verify the first connection state is correct - assert_eq!( - service.keep_alive_tracker.pending_keep_alive_timeouts.len(), - 1 - ); + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); match service.connections.get(&peer) { Some(context) => { assert_eq!( @@ -1247,20 +1236,15 @@ mod tests { // This ensures we reset the keep-alive timer when other protocols // want to open a substream. + // We are still tracking the same peer. service.open_substream(peer).unwrap(); - assert_eq!( - service.keep_alive_tracker.pending_keep_alive_timeouts.len(), - 1 - ); + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); poll_service(&mut service).await; // The keep alive timeout should be advanced. tokio::time::sleep(std::time::Duration::from_secs(3)).await; poll_service(&mut service).await; - assert_eq!( - service.keep_alive_tracker.pending_keep_alive_timeouts.len(), - 1 - ); + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); // If the `service.open_substream` wasn't called, the connection would have been downgraded. // Instead the keep-alive was forwarded `KEEP_ALIVE_TIMEOUT` seconds into the future. // Verify the connection is still active. @@ -1280,10 +1264,7 @@ mod tests { tokio::time::sleep(KEEP_ALIVE_TIMEOUT).await; poll_service(&mut service).await; - assert_eq!( - service.keep_alive_tracker.pending_keep_alive_timeouts.len(), - 0 - ); + assert_eq!(service.keep_alive_tracker.last_activity.len(), 0); // The connection had no substream activity for `KEEP_ALIVE_TIMEOUT` seconds. // Verify the connection is downgraded. @@ -1333,10 +1314,7 @@ mod tests { }; // verify the first connection state is correct - assert_eq!( - service.keep_alive_tracker.pending_keep_alive_timeouts.len(), - 1 - ); + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); match service.connections.get(&peer) { Some(context) => { assert_eq!( From 4aebc0da5d73b0f39684cec3a8209156c57a2f73 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 3 Oct 2024 15:45:22 +0300 Subject: [PATCH 10/12] transport-service/tests: Ensure the connection is upgraded Signed-off-by: Alexandru Vasile --- src/protocol/transport_service.rs | 193 +++++++++++++++++++++++++++++- 1 file changed, 192 insertions(+), 1 deletion(-) diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index 958ffcf4..02b03740 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -527,6 +527,7 @@ impl TransportService { ); self.keep_alive_tracker.substream_activity(peer, connection_id); + connection.try_open(); connection .open_substream( @@ -1420,6 +1421,197 @@ mod tests { ); } + #[tokio::test] + async fn substream_opening_upgrades_connection_and_resets_keep_alive() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let (mut service, sender, _) = transport_service(); + let peer = PeerId::random(); + + // register first connection + let (cmd_tx1, mut cmd_rx1) = channel(64); + sender + .send(InnerTransportEvent::ConnectionEstablished { + peer, + connection: ConnectionId::from(1337usize), + endpoint: Endpoint::dialer(Multiaddr::empty(), ConnectionId::from(1337usize)), + sender: ConnectionHandle::new(ConnectionId::from(1337usize), cmd_tx1), + }) + .await + .unwrap(); + + if let Some(TransportEvent::ConnectionEstablished { + peer: connected_peer, + endpoint, + }) = service.next().await + { + assert_eq!(connected_peer, peer); + assert_eq!(endpoint.address(), &Multiaddr::empty()); + } else { + panic!("expected event from `TransportService`"); + }; + + // verify the first connection state is correct + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); + match service.connections.get(&peer) { + Some(context) => { + assert_eq!( + context.primary.connection_id(), + &ConnectionId::from(1337usize) + ); + // Check the connection is still active. + assert!(context.primary.is_active()); + assert!(context.secondary.is_none()); + } + None => panic!("expected {peer} to exist"), + } + + // Open substreams to the peer. + let substream_id = service.open_substream(peer).unwrap(); + let second_substream_id = service.open_substream(peer).unwrap(); + + let mut permits = Vec::new(); + // First substream. + let protocol_command = cmd_rx1.recv().await.unwrap(); + match protocol_command { + ProtocolCommand::OpenSubstream { + protocol, + substream_id: opened_substream_id, + permit, + .. + } => { + assert_eq!(protocol, ProtocolName::from("/notif/1")); + assert_eq!(substream_id, opened_substream_id); + + // Save the substream permit for later. + permits.push(permit); + } + _ => panic!("expected `ProtocolCommand::OpenSubstream`"), + } + + // Second substream. + let protocol_command = cmd_rx1.recv().await.unwrap(); + match protocol_command { + ProtocolCommand::OpenSubstream { + protocol, + substream_id: opened_substream_id, + permit, + .. + } => { + assert_eq!(protocol, ProtocolName::from("/notif/1")); + assert_eq!(second_substream_id, opened_substream_id); + + // Save the substream permit for later. + permits.push(permit); + } + _ => panic!("expected `ProtocolCommand::OpenSubstream`"), + } + + // Sleep to trigger keep-alive timeout. + poll_service(&mut service).await; + tokio::time::sleep(KEEP_ALIVE_TIMEOUT + std::time::Duration::from_secs(1)).await; + poll_service(&mut service).await; + + // Verify the connection is downgraded. + match service.connections.get(&peer) { + Some(context) => { + assert_eq!( + context.primary.connection_id(), + &ConnectionId::from(1337usize) + ); + // Check the connection is not active. + assert!(!context.primary.is_active()); + assert!(context.secondary.is_none()); + } + None => panic!("expected {peer} to exist"), + } + assert_eq!(service.keep_alive_tracker.last_activity.len(), 0); + + // Open a new substream to the peer. This will succeed as long as we still have + // at least substream permit. + let substream_id = service.open_substream(peer).unwrap(); + let protocol_command = cmd_rx1.recv().await.unwrap(); + match protocol_command { + ProtocolCommand::OpenSubstream { + protocol, + substream_id: opened_substream_id, + permit, + .. + } => { + assert_eq!(protocol, ProtocolName::from("/notif/1")); + assert_eq!(substream_id, opened_substream_id); + + // Save the substream permit for later. + permits.push(permit); + } + _ => panic!("expected `ProtocolCommand::OpenSubstream`"), + } + + poll_service(&mut service).await; + + // Verify the connection is upgraded and keep-alive is tracked. + match service.connections.get(&peer) { + Some(context) => { + assert_eq!( + context.primary.connection_id(), + &ConnectionId::from(1337usize) + ); + // Check the connection is active, because it was upgraded by the last substream. + assert!(context.primary.is_active()); + assert!(context.secondary.is_none()); + } + None => panic!("expected {peer} to exist"), + } + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); + + // Drop all substreams + drop(permits); + + // The connection is still active, because it was upgraded by the last substream open. + match service.connections.get(&peer) { + Some(context) => { + assert_eq!( + context.primary.connection_id(), + &ConnectionId::from(1337usize) + ); + // Check the connection is active, because it was upgraded by the last substream. + assert!(context.primary.is_active()); + assert!(context.secondary.is_none()); + } + None => panic!("expected {peer} to exist"), + } + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); + + // Sleep to trigger keep-alive timeout. + poll_service(&mut service).await; + tokio::time::sleep(KEEP_ALIVE_TIMEOUT + std::time::Duration::from_secs(1)).await; + poll_service(&mut service).await; + + match service.connections.get(&peer) { + Some(context) => { + assert_eq!( + context.primary.connection_id(), + &ConnectionId::from(1337usize) + ); + // No longer active because it was downgraded by keep-alive and no + // substream opens were made. + assert!(!context.primary.is_active()); + assert!(context.secondary.is_none()); + } + None => panic!("expected {peer} to exist"), + } + + // Cannot open a new substream because: + // 1. connection was downgraded by keep-alive timeout + // 2. all substreams were dropped. + assert_eq!( + service.open_substream(peer), + Err(SubstreamError::ConnectionClosed) + ); + } + #[tokio::test] async fn keep_alive_pop_elements() { let mut tracker = KeepAliveTracker::new(Duration::from_secs(1)); @@ -1476,7 +1668,6 @@ mod tests { let mut tracker = KeepAliveTracker::new(Duration::from_secs(1)); let (peer1, connection1) = (PeerId::random(), ConnectionId::from(1usize)); - let (peer2, connection2) = (PeerId::random(), ConnectionId::from(2usize)); // T0. tracker.substream_activity(peer1, connection1); From ff1325296498b387af95f8f357e0a802618c427e Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 3 Oct 2024 15:53:59 +0300 Subject: [PATCH 11/12] transport-service/tests: Ensure next stale connections are removed Signed-off-by: Alexandru Vasile --- src/protocol/transport_service.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index 02b03740..f68def15 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -1686,4 +1686,26 @@ mod tests { assert!(tracker.get_next_ordered().is_none()); assert!(tracker.last_activity_order.is_empty()); } + + #[tokio::test] + async fn keep_alive_remove_next_stale_key() { + let mut tracker = KeepAliveTracker::new(Duration::from_secs(1)); + + let (peer1, connection1) = (PeerId::random(), ConnectionId::from(1usize)); + let (peer2, connection2) = (PeerId::random(), ConnectionId::from(2usize)); + + // [ (peer1, connection1) -> T0; (peer2, connection2) -> T1 ] + // However, the connection2 is closed. + tracker.substream_activity(peer1, connection1); + tracker.substream_activity(peer2, connection2); + + tracker.on_connection_closed(peer2, connection2); + + let key = tracker.get_next_ordered().unwrap(); + assert_eq!(key, (peer1, connection1)); + + // No more elements. + assert!(tracker.get_next_ordered().is_none()); + assert!(tracker.last_activity_order.is_empty()); + } } From 7ff145ac7ca5ba3030c12c1bca1405f54a39a525 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 3 Oct 2024 16:27:51 +0300 Subject: [PATCH 12/12] transport-service/tests: Ensure downgrading works after timeout Signed-off-by: Alexandru Vasile --- src/protocol/transport_service.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index f68def15..1dd8eb6d 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -367,7 +367,7 @@ impl TransportService { match self.connections.get_mut(&peer) { Some(context) => match context.secondary { Some(_) => { - tracing::error!( + tracing::debug!( target: LOG_TARGET, ?peer, ?connection_id, @@ -1334,11 +1334,9 @@ mod tests { let second_substream_id = service.open_substream(peer).unwrap(); // Simulate keep-alive timeout expiration. - service - .connections - .get_mut(&peer) - .unwrap() - .downgrade(&ConnectionId::from(1337usize)); + poll_service(&mut service).await; + tokio::time::sleep(KEEP_ALIVE_TIMEOUT + std::time::Duration::from_secs(1)).await; + poll_service(&mut service).await; let mut permits = Vec::new(); @@ -1412,6 +1410,10 @@ mod tests { // Drop all substreams. drop(permits); + poll_service(&mut service).await; + tokio::time::sleep(KEEP_ALIVE_TIMEOUT + std::time::Duration::from_secs(1)).await; + poll_service(&mut service).await; + // Cannot open a new substream because: // 1. connection was downgraded by keep-alive timeout // 2. all substreams were dropped.