diff --git a/src/protocol/connection.rs b/src/protocol/connection.rs index 788f770c..035befaa 100644 --- a/src/protocol/connection.rs +++ b/src/protocol/connection.rs @@ -94,6 +94,15 @@ impl ConnectionHandle { } } + /// Try to upgrade the connection to active state. + pub fn try_open(&mut self) { + if let ConnectionType::Inactive(inactive) = &self.connection { + if let Some(active) = inactive.upgrade() { + self.connection = ConnectionType::Active(active); + } + } + } + /// Attempt to acquire permit which will keep the connection open for indefinite time. pub fn try_get_permit(&self) -> Option { match &self.connection { diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index c2149283..1dd8eb6d 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -27,13 +27,13 @@ use crate::{ PeerId, DEFAULT_CHANNEL_SIZE, }; -use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; +use futures::{Stream, StreamExt}; use multiaddr::{Multiaddr, Protocol}; use multihash::Multihash; use tokio::sync::mpsc::{channel, Receiver, Sender}; use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{HashMap, HashSet, VecDeque}, fmt::Debug, pin::Pin, sync::{ @@ -96,6 +96,29 @@ impl ConnectionContext { "connection doesn't exist, cannot downgrade", ); } + + /// Try to upgrade the connection to active state. + fn try_upgrade(&mut self, connection_id: &ConnectionId) { + if self.primary.connection_id() == connection_id { + self.primary.try_open(); + return; + } + + if let Some(handle) = &mut self.secondary { + if handle.connection_id() == connection_id { + handle.try_open(); + return; + } + } + + tracing::debug!( + target: LOG_TARGET, + primary = ?self.primary.connection_id(), + secondary = ?self.secondary.as_ref().map(|handle| handle.connection_id()), + ?connection_id, + "connection doesn't exist, cannot upgrade", + ); + } } /// Tracks connection keep-alive timeouts. @@ -109,11 +132,33 @@ struct KeepAliveTracker { /// Close the connection if no substreams are open within this time frame. keep_alive_timeout: Duration, - /// Pending keep-alive timeouts. - pending_keep_alive_timeouts: FuturesUnordered>, - /// Track substream last activity. - last_activity: HashMap>, + last_activity: HashMap<(PeerId, ConnectionId), Instant>, + /// Insertion order of the last activity. + /// + /// Since the time added to the `last_activity` hashmap is always monotonically increasing, + /// we can use this to optimize the search for the next keep-alive timeout. This + /// results in fewer polls. + /// + /// # Notes + /// + /// We might have a situation where the connection is updated multiple times: + /// + /// - (peerA, id0): SubstreamA, instant T0 + /// - (peerA, id0): SubstreamC, instant T2 + /// + /// // Peer B opens in between the updates the following substream: + /// - (peerB, id1): SubstreamZ, instant T3 + /// + /// - (peerA, id0): SubstreamB, instant T4 + /// + /// For this example, the insertion order contains the following keys: + /// `[(peerA, id0), (peerA, id0), (peerB, id1), (peerA, id0)]`. + /// + /// To ensure a correct ordering, we'll discard elements that expire after the next element in + /// the order. This ensures that we always have the most recent activity at the front of the + /// queue. + last_activity_order: VecDeque<(PeerId, ConnectionId)>, /// Saved waker. waker: Option, @@ -124,48 +169,78 @@ impl KeepAliveTracker { pub fn new(keep_alive_timeout: Duration) -> Self { Self { keep_alive_timeout, - pending_keep_alive_timeouts: FuturesUnordered::new(), last_activity: HashMap::new(), + last_activity_order: VecDeque::new(), waker: None, } } - fn inner_on_connection_established( - &mut self, - peer: PeerId, - connection_id: ConnectionId, - keep_alive_timeout: Duration, - ) { - self.pending_keep_alive_timeouts.push(Box::pin(async move { - tokio::time::sleep(keep_alive_timeout).await; - (peer, connection_id) - })); - - self.waker.take().map(|waker| waker.wake()); - } - /// Called on connection established event to add a new keep-alive timeout. pub fn on_connection_established(&mut self, peer: PeerId, connection_id: ConnectionId) { - self.inner_on_connection_established(peer, connection_id, self.keep_alive_timeout); + self.substream_activity(peer, connection_id); } /// Called on connection closed event. pub fn on_connection_closed(&mut self, peer: PeerId, connection_id: ConnectionId) { - if let Entry::Occupied(mut entry) = self.last_activity.entry(peer) { - entry.get_mut().remove(&connection_id); - if entry.get().is_empty() { - entry.remove(); - } - } + // The connection is closed, we'll pop the `last_activity_order` elements + // in the poll method. This ensures we leverage the `VecDeque` ordering + // instead of searching and allocating for a new vector here. + self.last_activity.remove(&(peer, connection_id)); } /// Called on substream opened event to track the last activity. pub fn substream_activity(&mut self, peer: PeerId, connection_id: ConnectionId) { // Keep track of the connection ID and the time the substream was opened. - self.last_activity - .entry(peer) - .or_default() - .insert(connection_id.clone(), Instant::now()); + self.last_activity.insert((peer, connection_id), Instant::now()); + // Keep track of the order of the last activity insertions. + self.last_activity_order.push_back((peer, connection_id)); + + // Wake any pending poll. + self.waker.take().map(|waker| waker.wake()); + } + + /// Get the next ordered key element. + /// + /// The returned element is no longer part of the `last_activity_order` queue. + /// The method ensures that the oldest instant is at the front of the queue and + /// that the returned element corresponds to a connection that is still active. + fn get_next_ordered(&mut self) -> Option<(PeerId, ConnectionId)> { + loop { + let key = self.last_activity_order.pop_front()?; + // Key corresponds to a connection that is no longer active. + let Some(when) = self.last_activity.get(&key) else { + continue; + }; + + // Compare with the next key to ensure: + // - (1). identical keys are discarded + // - (2). the oldest instant is at the front of the queue (preserve ordering) + // - (3). stale connections are ignored + if let Some(next_key) = self.last_activity_order.front() { + // (1). Handle multiple inserts for the same connection. + if &key == next_key { + continue; + } + + if let Some(next_when) = self.last_activity.get(next_key) { + // (2). If the next key is older, we'll discard the current key. + // The current key corresponds to a more recent entry that will be polled later. + if next_when < when { + continue; + } + } else { + // (3). Remove the next key if it corresponds to a stale connection + // and preserve the current key. + // We can't ensure (1) or (2) at this point if the next connection + // is stale. We'll handle this in the next iteration. + self.last_activity_order.pop_front(); + self.last_activity_order.push_front(key); + continue; + } + } + + return Some(key); + } } } @@ -175,63 +250,32 @@ impl Stream for KeepAliveTracker { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - if this.pending_keep_alive_timeouts.is_empty() { - // Save current waker. - this.waker = Some(cx.waker().clone()); - return Poll::Pending; - } - - if let Poll::Ready(event) = this.pending_keep_alive_timeouts.poll_next_unpin(cx) { - let Some((peer, connection_id)) = event else { - return Poll::Ready(None); + loop { + let Some(key) = this.get_next_ordered() else { + // No more keys to process. + this.waker = Some(cx.waker().clone()); + return Poll::Pending; }; - // Keep-alive timeout triggered for this connection. Double check if there is any - // activity since the timeout was started for this connection. - let next_keep_alive = this - .last_activity - .get(&peer) - .map(|activities| { - activities.iter().find_map(|(connection, when)| { - if connection_id == *connection && when.elapsed() < this.keep_alive_timeout - { - Some(( - peer, - connection_id, - this.keep_alive_timeout - when.elapsed(), - )) - } else { - None - } - }) - }) - .flatten(); - - if let Some((peer, connection_id, keep_alive_timeout)) = next_keep_alive { - tracing::trace!( - target: LOG_TARGET, - ?peer, - ?connection_id, - ?keep_alive_timeout, - "keep-alive timeout extended", - ); - this.inner_on_connection_established(peer, connection_id, keep_alive_timeout); + let when = this.last_activity.get(&key).expect("key exists; qed by get_next_ordered"); + // No keep-alive timeout reached yet. + if when.elapsed() < this.keep_alive_timeout { + this.last_activity_order.push_front(key); this.waker = Some(cx.waker().clone()); return Poll::Pending; } + // Keep-alive timeout reached. tracing::trace!( target: LOG_TARGET, - ?peer, - ?connection_id, + peer = ?key.0, + connection_id = ?key.1, "keep-alive timeout triggered", ); + this.last_activity.remove(&key); - return Poll::Ready(Some((peer, connection_id))); + return Poll::Ready(Some(key)); } - - this.waker = Some(cx.waker().clone()); - return Poll::Pending; } } @@ -323,7 +367,7 @@ impl TransportService { match self.connections.get_mut(&peer) { Some(context) => match context.secondary { Some(_) => { - tracing::error!( + tracing::debug!( target: LOG_TARGET, ?peer, ?connection_id, @@ -483,6 +527,7 @@ impl TransportService { ); self.keep_alive_tracker.substream_activity(peer, connection_id); + connection.try_open(); connection .open_substream( @@ -560,6 +605,9 @@ impl Stream for TransportService { }) => { if protocol == self.protocol { self.keep_alive_tracker.substream_activity(peer, connection_id); + if let Some(context) = self.connections.get_mut(&peer) { + context.try_upgrade(&connection_id); + } } return Poll::Ready(Some(TransportEvent::SubstreamOpened { @@ -599,7 +647,7 @@ impl Stream for TransportService { mod tests { use super::*; use crate::{ - protocol::TransportService, + protocol::{ProtocolCommand, TransportService}, transport::{ manager::{handle::InnerTransportManagerCommand, TransportManagerHandle}, KEEP_ALIVE_TIMEOUT, @@ -774,7 +822,11 @@ mod tests { } #[tokio::test] - async fn secondary_closing_doesnt_emit_event() { + async fn secondary_closing_does_not_emit_event() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + let (mut service, sender, _) = transport_service(); let peer = PeerId::random(); @@ -979,10 +1031,7 @@ mod tests { }; // verify the first connection state is correct - assert_eq!( - service.keep_alive_tracker.pending_keep_alive_timeouts.len(), - 1 - ); + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); match service.connections.get(&peer) { Some(context) => { assert_eq!( @@ -1013,18 +1062,12 @@ mod tests { panic!("expected event from `TransportService`"); } - // verify that the keep-alive timeout still exists for the peer but the peer itself - // doesn't exist anymore - // - // the peer is removed because there is no connection to them - assert_eq!( - service.keep_alive_tracker.pending_keep_alive_timeouts.len(), - 1 - ); + // Because the connection was closed, the peer is no longer tracked for keep-alive. + // This leads to better tracking overall since we don't have to track stale connections. + assert!(service.keep_alive_tracker.last_activity.is_empty()); assert!(service.connections.get(&peer).is_none()); - // register new primary connection but verify that there are now two pending keep-alive - // timeouts + // Register new primary connection. let (cmd_tx1, _cmd_rx1) = channel(64); sender .send(InnerTransportEvent::ConnectionEstablished { @@ -1047,11 +1090,7 @@ mod tests { panic!("expected event from `TransportService`"); }; - // verify the first connection state is correct - assert_eq!( - service.keep_alive_tracker.pending_keep_alive_timeouts.len(), - 2 - ); + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); match service.connections.get(&peer) { Some(context) => { assert_eq!( @@ -1110,10 +1149,7 @@ mod tests { }; // verify the first connection state is correct - assert_eq!( - service.keep_alive_tracker.pending_keep_alive_timeouts.len(), - 1 - ); + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); match service.connections.get(&peer) { Some(context) => { assert_eq!( @@ -1144,6 +1180,8 @@ mod tests { } None => panic!("expected {peer} to exist"), } + + assert_eq!(service.keep_alive_tracker.last_activity.len(), 0); } #[tokio::test] @@ -1179,10 +1217,7 @@ mod tests { }; // verify the first connection state is correct - assert_eq!( - service.keep_alive_tracker.pending_keep_alive_timeouts.len(), - 1 - ); + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); match service.connections.get(&peer) { Some(context) => { assert_eq!( @@ -1202,20 +1237,15 @@ mod tests { // This ensures we reset the keep-alive timer when other protocols // want to open a substream. + // We are still tracking the same peer. service.open_substream(peer).unwrap(); - assert_eq!( - service.keep_alive_tracker.pending_keep_alive_timeouts.len(), - 1 - ); + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); poll_service(&mut service).await; // The keep alive timeout should be advanced. tokio::time::sleep(std::time::Duration::from_secs(3)).await; poll_service(&mut service).await; - assert_eq!( - service.keep_alive_tracker.pending_keep_alive_timeouts.len(), - 1 - ); + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); // If the `service.open_substream` wasn't called, the connection would have been downgraded. // Instead the keep-alive was forwarded `KEEP_ALIVE_TIMEOUT` seconds into the future. // Verify the connection is still active. @@ -1235,12 +1265,257 @@ mod tests { tokio::time::sleep(KEEP_ALIVE_TIMEOUT).await; poll_service(&mut service).await; + assert_eq!(service.keep_alive_tracker.last_activity.len(), 0); + + // The connection had no substream activity for `KEEP_ALIVE_TIMEOUT` seconds. + // Verify the connection is downgraded. + match service.connections.get(&peer) { + Some(context) => { + assert_eq!( + context.primary.connection_id(), + &ConnectionId::from(1337usize) + ); + assert!(!context.primary.is_active()); + assert!(context.secondary.is_none()); + } + None => panic!("expected {peer} to exist"), + } + } + + #[tokio::test] + async fn downgraded_connection_without_substreams_is_closed() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let (mut service, sender, _) = transport_service(); + let peer = PeerId::random(); + + // register first connection + let (cmd_tx1, mut cmd_rx1) = channel(64); + sender + .send(InnerTransportEvent::ConnectionEstablished { + peer, + connection: ConnectionId::from(1337usize), + endpoint: Endpoint::dialer(Multiaddr::empty(), ConnectionId::from(1337usize)), + sender: ConnectionHandle::new(ConnectionId::from(1337usize), cmd_tx1), + }) + .await + .unwrap(); + + if let Some(TransportEvent::ConnectionEstablished { + peer: connected_peer, + endpoint, + }) = service.next().await + { + assert_eq!(connected_peer, peer); + assert_eq!(endpoint.address(), &Multiaddr::empty()); + } else { + panic!("expected event from `TransportService`"); + }; + + // verify the first connection state is correct + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); + match service.connections.get(&peer) { + Some(context) => { + assert_eq!( + context.primary.connection_id(), + &ConnectionId::from(1337usize) + ); + // Check the connection is still active. + assert!(context.primary.is_active()); + assert!(context.secondary.is_none()); + } + None => panic!("expected {peer} to exist"), + } + + // Open substreams to the peer. + let substream_id = service.open_substream(peer).unwrap(); + let second_substream_id = service.open_substream(peer).unwrap(); + + // Simulate keep-alive timeout expiration. + poll_service(&mut service).await; + tokio::time::sleep(KEEP_ALIVE_TIMEOUT + std::time::Duration::from_secs(1)).await; + poll_service(&mut service).await; + + let mut permits = Vec::new(); + + // First substream. + let protocol_command = cmd_rx1.recv().await.unwrap(); + match protocol_command { + ProtocolCommand::OpenSubstream { + protocol, + substream_id: opened_substream_id, + permit, + .. + } => { + assert_eq!(protocol, ProtocolName::from("/notif/1")); + assert_eq!(substream_id, opened_substream_id); + + // Save the substream permit for later. + permits.push(permit); + } + _ => panic!("expected `ProtocolCommand::OpenSubstream`"), + } + + // Second substream. + let protocol_command = cmd_rx1.recv().await.unwrap(); + match protocol_command { + ProtocolCommand::OpenSubstream { + protocol, + substream_id: opened_substream_id, + permit, + .. + } => { + assert_eq!(protocol, ProtocolName::from("/notif/1")); + assert_eq!(second_substream_id, opened_substream_id); + + // Save the substream permit for later. + permits.push(permit); + } + _ => panic!("expected `ProtocolCommand::OpenSubstream`"), + } + + // Drop one permit. + let permit = permits.pop(); + // Individual transports like TCP will open a substream + // and then will generate a `SubstreamOpened` event via + // the protocol-set handler. + // + // The substream is used by individual protocols and then + // is closed. This simulates the substream being closed. + drop(permit); + + // Open a new substream to the peer. This will succeed as long as we still have + // one substream open. + let substream_id = service.open_substream(peer).unwrap(); + // Handle the substream. + let protocol_command = cmd_rx1.recv().await.unwrap(); + match protocol_command { + ProtocolCommand::OpenSubstream { + protocol, + substream_id: opened_substream_id, + permit, + .. + } => { + assert_eq!(protocol, ProtocolName::from("/notif/1")); + assert_eq!(substream_id, opened_substream_id); + + // Save the substream permit for later. + permits.push(permit); + } + _ => panic!("expected `ProtocolCommand::OpenSubstream`"), + } + + // Drop all substreams. + drop(permits); + + poll_service(&mut service).await; + tokio::time::sleep(KEEP_ALIVE_TIMEOUT + std::time::Duration::from_secs(1)).await; + poll_service(&mut service).await; + + // Cannot open a new substream because: + // 1. connection was downgraded by keep-alive timeout + // 2. all substreams were dropped. assert_eq!( - service.keep_alive_tracker.pending_keep_alive_timeouts.len(), - 0 + service.open_substream(peer), + Err(SubstreamError::ConnectionClosed) ); + } + + #[tokio::test] + async fn substream_opening_upgrades_connection_and_resets_keep_alive() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let (mut service, sender, _) = transport_service(); + let peer = PeerId::random(); + + // register first connection + let (cmd_tx1, mut cmd_rx1) = channel(64); + sender + .send(InnerTransportEvent::ConnectionEstablished { + peer, + connection: ConnectionId::from(1337usize), + endpoint: Endpoint::dialer(Multiaddr::empty(), ConnectionId::from(1337usize)), + sender: ConnectionHandle::new(ConnectionId::from(1337usize), cmd_tx1), + }) + .await + .unwrap(); + + if let Some(TransportEvent::ConnectionEstablished { + peer: connected_peer, + endpoint, + }) = service.next().await + { + assert_eq!(connected_peer, peer); + assert_eq!(endpoint.address(), &Multiaddr::empty()); + } else { + panic!("expected event from `TransportService`"); + }; + + // verify the first connection state is correct + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); + match service.connections.get(&peer) { + Some(context) => { + assert_eq!( + context.primary.connection_id(), + &ConnectionId::from(1337usize) + ); + // Check the connection is still active. + assert!(context.primary.is_active()); + assert!(context.secondary.is_none()); + } + None => panic!("expected {peer} to exist"), + } + + // Open substreams to the peer. + let substream_id = service.open_substream(peer).unwrap(); + let second_substream_id = service.open_substream(peer).unwrap(); + + let mut permits = Vec::new(); + // First substream. + let protocol_command = cmd_rx1.recv().await.unwrap(); + match protocol_command { + ProtocolCommand::OpenSubstream { + protocol, + substream_id: opened_substream_id, + permit, + .. + } => { + assert_eq!(protocol, ProtocolName::from("/notif/1")); + assert_eq!(substream_id, opened_substream_id); + + // Save the substream permit for later. + permits.push(permit); + } + _ => panic!("expected `ProtocolCommand::OpenSubstream`"), + } + + // Second substream. + let protocol_command = cmd_rx1.recv().await.unwrap(); + match protocol_command { + ProtocolCommand::OpenSubstream { + protocol, + substream_id: opened_substream_id, + permit, + .. + } => { + assert_eq!(protocol, ProtocolName::from("/notif/1")); + assert_eq!(second_substream_id, opened_substream_id); + + // Save the substream permit for later. + permits.push(permit); + } + _ => panic!("expected `ProtocolCommand::OpenSubstream`"), + } + + // Sleep to trigger keep-alive timeout. + poll_service(&mut service).await; + tokio::time::sleep(KEEP_ALIVE_TIMEOUT + std::time::Duration::from_secs(1)).await; + poll_service(&mut service).await; - // The connection had no substream activity for `KEEP_ALIVE_TIMEOUT` seconds. // Verify the connection is downgraded. match service.connections.get(&peer) { Some(context) => { @@ -1248,10 +1523,191 @@ mod tests { context.primary.connection_id(), &ConnectionId::from(1337usize) ); + // Check the connection is not active. assert!(!context.primary.is_active()); assert!(context.secondary.is_none()); } None => panic!("expected {peer} to exist"), } + assert_eq!(service.keep_alive_tracker.last_activity.len(), 0); + + // Open a new substream to the peer. This will succeed as long as we still have + // at least substream permit. + let substream_id = service.open_substream(peer).unwrap(); + let protocol_command = cmd_rx1.recv().await.unwrap(); + match protocol_command { + ProtocolCommand::OpenSubstream { + protocol, + substream_id: opened_substream_id, + permit, + .. + } => { + assert_eq!(protocol, ProtocolName::from("/notif/1")); + assert_eq!(substream_id, opened_substream_id); + + // Save the substream permit for later. + permits.push(permit); + } + _ => panic!("expected `ProtocolCommand::OpenSubstream`"), + } + + poll_service(&mut service).await; + + // Verify the connection is upgraded and keep-alive is tracked. + match service.connections.get(&peer) { + Some(context) => { + assert_eq!( + context.primary.connection_id(), + &ConnectionId::from(1337usize) + ); + // Check the connection is active, because it was upgraded by the last substream. + assert!(context.primary.is_active()); + assert!(context.secondary.is_none()); + } + None => panic!("expected {peer} to exist"), + } + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); + + // Drop all substreams + drop(permits); + + // The connection is still active, because it was upgraded by the last substream open. + match service.connections.get(&peer) { + Some(context) => { + assert_eq!( + context.primary.connection_id(), + &ConnectionId::from(1337usize) + ); + // Check the connection is active, because it was upgraded by the last substream. + assert!(context.primary.is_active()); + assert!(context.secondary.is_none()); + } + None => panic!("expected {peer} to exist"), + } + assert_eq!(service.keep_alive_tracker.last_activity.len(), 1); + + // Sleep to trigger keep-alive timeout. + poll_service(&mut service).await; + tokio::time::sleep(KEEP_ALIVE_TIMEOUT + std::time::Duration::from_secs(1)).await; + poll_service(&mut service).await; + + match service.connections.get(&peer) { + Some(context) => { + assert_eq!( + context.primary.connection_id(), + &ConnectionId::from(1337usize) + ); + // No longer active because it was downgraded by keep-alive and no + // substream opens were made. + assert!(!context.primary.is_active()); + assert!(context.secondary.is_none()); + } + None => panic!("expected {peer} to exist"), + } + + // Cannot open a new substream because: + // 1. connection was downgraded by keep-alive timeout + // 2. all substreams were dropped. + assert_eq!( + service.open_substream(peer), + Err(SubstreamError::ConnectionClosed) + ); + } + + #[tokio::test] + async fn keep_alive_pop_elements() { + let mut tracker = KeepAliveTracker::new(Duration::from_secs(1)); + + let (peer1, connection1) = (PeerId::random(), ConnectionId::from(1usize)); + let (peer2, connection2) = (PeerId::random(), ConnectionId::from(2usize)); + + tracker.substream_activity(peer1, connection1); + tracker.substream_activity(peer2, connection2); + + let key = tracker.get_next_ordered().unwrap(); + assert_eq!(key, (peer1, connection1)); + + let key = tracker.get_next_ordered().unwrap(); + assert_eq!(key, (peer2, connection2)); + + // No more elements. + assert!(tracker.get_next_ordered().is_none()); + assert!(tracker.last_activity_order.is_empty()); + } + + #[tokio::test] + async fn keep_alive_pop_multiple_elements_in_order() { + let mut tracker = KeepAliveTracker::new(Duration::from_secs(1)); + + let (peer1, connection1) = (PeerId::random(), ConnectionId::from(1usize)); + let (peer2, connection2) = (PeerId::random(), ConnectionId::from(2usize)); + + // T0. + tracker.substream_activity(peer1, connection1); + // T1 update. + tracker.substream_activity(peer1, connection1); + // Different peer on T2. + tracker.substream_activity(peer2, connection2); + // T3 update for peer1. + tracker.substream_activity(peer1, connection1); + + // This setup effectively tracks: + // [ (peer2, connection2) -> T2; (peer1, connection1) -> T3 ] + + let key = tracker.get_next_ordered().unwrap(); + assert_eq!(key, (peer2, connection2)); + + let key = tracker.get_next_ordered().unwrap(); + assert_eq!(key, (peer1, connection1)); + + // No more elements. + assert!(tracker.get_next_ordered().is_none()); + assert!(tracker.last_activity_order.is_empty()); + } + + #[tokio::test] + async fn keep_alive_pop_multiple_elements_in_order_last_element() { + let mut tracker = KeepAliveTracker::new(Duration::from_secs(1)); + + let (peer1, connection1) = (PeerId::random(), ConnectionId::from(1usize)); + + // T0. + tracker.substream_activity(peer1, connection1); + // T1 update. + tracker.substream_activity(peer1, connection1); + // T2 update. + tracker.substream_activity(peer1, connection1); + + // This setup effectively tracks: + // [ (peer1, connection1) -> T3 ] + + let key = tracker.get_next_ordered().unwrap(); + assert_eq!(key, (peer1, connection1)); + + // No more elements. + assert!(tracker.get_next_ordered().is_none()); + assert!(tracker.last_activity_order.is_empty()); + } + + #[tokio::test] + async fn keep_alive_remove_next_stale_key() { + let mut tracker = KeepAliveTracker::new(Duration::from_secs(1)); + + let (peer1, connection1) = (PeerId::random(), ConnectionId::from(1usize)); + let (peer2, connection2) = (PeerId::random(), ConnectionId::from(2usize)); + + // [ (peer1, connection1) -> T0; (peer2, connection2) -> T1 ] + // However, the connection2 is closed. + tracker.substream_activity(peer1, connection1); + tracker.substream_activity(peer2, connection2); + + tracker.on_connection_closed(peer2, connection2); + + let key = tracker.get_next_ordered().unwrap(); + assert_eq!(key, (peer1, connection1)); + + // No more elements. + assert!(tracker.get_next_ordered().is_none()); + assert!(tracker.last_activity_order.is_empty()); } }