Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[redis-rs][core] Move connection refresh to the background #2915

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 257 additions & 0 deletions glide-core/redis-rs/redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@ use crate::cluster_topology::TopologyHash;
use dashmap::DashMap;
use futures::FutureExt;
use rand::seq::IteratorRandom;
use std::collections::{HashMap, HashSet};
use std::net::IpAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use telemetrylib::Telemetry;

use tracing::debug;

use tokio::sync::Notify;
use tokio::task::JoinHandle;

/// Count the number of connections in a connections_map object
macro_rules! count_connections {
($conn_map:expr) => {{
Expand Down Expand Up @@ -134,11 +140,213 @@ impl<Connection> std::fmt::Display for ConnectionsMap<Connection> {
}
}

#[derive(Clone, Debug)]
pub(crate) struct RefreshTaskNotifier {
notify: Arc<Notify>,
}

impl RefreshTaskNotifier {
fn new() -> Self {
RefreshTaskNotifier {
notify: Arc::new(Notify::new()),
}
}

pub fn get_notifier(&self) -> Arc<Notify> {
self.notify.clone()
}

pub fn notify(&self) {
self.notify.notify_waiters();
}
}

// Enum representing the task status during a connection refresh.
//
// - **Reconnecting**:
// Indicates that a refresh task is in progress. This status includes a dedicated
// notifier (`RefreshTaskNotifier`) so that other tasks can wait for the connection
// to be refreshed before proceeding.
//
// - **ReconnectingTooLong**:
// Represents a situation where a refresh task has taken too long to complete.
// The status transitions from `Reconnecting` to `ReconnectingTooLong` under specific
// conditions (e.g., after one attempt of reconnecting inside the task or after a timeout).
//
// The transition from `Reconnecting` to `ReconnectingTooLong` is managed exclusively
// within the `update_refreshed_connection` function in `poll_flush`. This ensures that
// all requests maintain a consistent view of the connections.
//
// When transitioning from `Reconnecting` to `ReconnectingTooLong`, the associated
// notifier is triggered to unblock all awaiting tasks.
#[derive(Clone, Debug)]
pub(crate) enum RefreshTaskStatus {
// The task is actively reconnecting. Includes a notifier for tasks to wait on.
Reconnecting(RefreshTaskNotifier),
// The task has exceeded the allowed reconnection time.
#[allow(dead_code)]
ReconnectingTooLong,
}

impl Drop for RefreshTaskStatus {
fn drop(&mut self) {
if let RefreshTaskStatus::Reconnecting(notifier) = self {
debug!("RefreshTaskStatus: Dropped while in Reconnecting status. Notifying tasks.");
notifier.notify();
}
}
}

impl RefreshTaskStatus {
/// Creates a new `RefreshTaskStatus` in the `Reconnecting` status with a fresh `RefreshTaskNotifier`.
pub fn new() -> Self {
debug!("RefreshTaskStatus: Initialized in Reconnecting status with a new notifier.");
RefreshTaskStatus::Reconnecting(RefreshTaskNotifier::new())
}

// Transitions the current status from `Reconnecting` to `ReconnectingTooLong` in place.
//
// If the current status is `Reconnecting`, this method notifies all waiting tasks
// using the embedded `RefreshTaskNotifier` and updates the status to `ReconnectingTooLong`.
//
// If the status is already `ReconnectingTooLong`, this method does nothing.
#[allow(dead_code)]
pub fn flip_status_to_too_long(&mut self) {
if let RefreshTaskStatus::Reconnecting(notifier) = self {
debug!(
"RefreshTaskStatus: Notifying tasks before transitioning to ReconnectingTooLong."
);
notifier.notify();
*self = RefreshTaskStatus::ReconnectingTooLong;
} else {
debug!("RefreshTaskStatus: Already in ReconnectingTooLong status.");
}
}

pub fn notify_waiting_requests(&mut self) {
if let RefreshTaskStatus::Reconnecting(notifier) = self {
debug!("RefreshTaskStatus::notify_waiting_requests notify");
notifier.notify();
} else {
debug!("RefreshTaskStatus::notify_waiting_requests - ReconnectingTooLong status.");
}
}
}

// Struct combining the task handle and its status
#[derive(Debug)]
pub(crate) struct RefreshTaskState {
pub handle: JoinHandle<()>,
pub status: RefreshTaskStatus,
}

impl RefreshTaskState {
// Creates a new `RefreshTaskState` with a `Reconnecting` state and a new notifier.
pub fn new(handle: JoinHandle<()>) -> Self {
debug!("RefreshTaskState: Creating a new instance with a Reconnecting state.");
RefreshTaskState {
handle,
status: RefreshTaskStatus::new(),
}
}
}

impl Drop for RefreshTaskState {
fn drop(&mut self) {
if let RefreshTaskStatus::Reconnecting(ref notifier) = self.status {
debug!("RefreshTaskState: Dropped while in Reconnecting status. Notifying tasks.");
notifier.notify();
} else {
debug!("RefreshTaskState: Dropped while in ReconnectingTooLong status.");
}

// Abort the task handle if it's not yet finished
if !self.handle.is_finished() {
debug!("RefreshTaskState: Aborting unfinished task.");
self.handle.abort();
} else {
debug!("RefreshTaskState: Task already finished, no abort necessary.");
}
}
}

// This struct is used to track the status of each address refresh
pub(crate) struct RefreshConnectionStates<Connection> {
// Holds all the failed addresses that started a refresh task.
pub(crate) refresh_addresses_started: HashSet<String>,
// Follow the refresh ops on the connections
pub(crate) refresh_address_in_progress: HashMap<String, RefreshTaskState>,
// Holds all the refreshed addresses that are ready to be inserted into the connection_map
pub(crate) refresh_addresses_done: HashMap<String, Option<ClusterNode<Connection>>>,
}

impl<Connection> RefreshConnectionStates<Connection> {
// Clears all ongoing refresh connection tasks and resets associated state tracking.
//
// - This method removes all entries in the `refresh_address_in_progress` map.
// - The `Drop` trait is responsible for notifying the associated notifiers and aborting any unfinished refresh tasks.
// - Additionally, this method clears `refresh_addresses_started` and `refresh_addresses_done`
// to ensure no stale data remains in the refresh state tracking.
pub(crate) fn clear_refresh_state(&mut self) {
debug!(
"clear_refresh_state: removing all in-progress refresh connection tasks for addresses: {:?}",
self.refresh_address_in_progress.keys().collect::<Vec<_>>()
);

// Clear the entire map; Drop handles the cleanup
self.refresh_address_in_progress.clear();

// Clear other state tracking
self.refresh_addresses_started.clear();
self.refresh_addresses_done.clear();
}

// Collects the notifiers for the given addresses and returns them as a vector.
//
// This function retrieves the notifiers for the provided addresses from the `refresh_address_in_progress`
// map and returns them, so they can be awaited outside of the lock.
//
// # Arguments
// * `addresses` - A list of addresses for which notifiers are required.
//
// # Returns
// A vector of `futures::future::Notified` that can be awaited.
pub(crate) fn collect_refresh_notifiers(
&self,
addresses: &HashSet<String>,
) -> Vec<Arc<Notify>> {
addresses
.iter()
.filter_map(|address| {
self.refresh_address_in_progress
.get(address)
.and_then(|refresh_state| match &refresh_state.status {
RefreshTaskStatus::Reconnecting(notifier) => {
Some(notifier.get_notifier().clone())
}
_ => None,
})
})
.collect()
}
}

impl<Connection> Default for RefreshConnectionStates<Connection> {
fn default() -> Self {
Self {
refresh_addresses_started: HashSet::new(),
refresh_address_in_progress: HashMap::new(),
refresh_addresses_done: HashMap::new(),
}
}
}

pub(crate) struct ConnectionsContainer<Connection> {
connection_map: DashMap<String, ClusterNode<Connection>>,
pub(crate) slot_map: SlotMap,
read_from_replica_strategy: ReadFromReplicaStrategy,
topology_hash: TopologyHash,
pub(crate) refresh_conn_state: RefreshConnectionStates<Connection>,
}

impl<Connection> Drop for ConnectionsContainer<Connection> {
Expand All @@ -155,6 +363,7 @@ impl<Connection> Default for ConnectionsContainer<Connection> {
slot_map: Default::default(),
read_from_replica_strategy: ReadFromReplicaStrategy::AlwaysFromPrimary,
topology_hash: 0,
refresh_conn_state: Default::default(),
}
}
}
Expand Down Expand Up @@ -182,6 +391,7 @@ where
slot_map,
read_from_replica_strategy,
topology_hash,
refresh_conn_state: Default::default(),
}
}

Expand Down Expand Up @@ -337,6 +547,51 @@ where
})
}

// Fetches the master address for a given route.
// Returns `None` if no master address can be resolved.
pub(crate) fn address_for_route(&self, route: &Route) -> Option<String> {
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
Some(slot_map_value.addrs.primary().clone().to_string())
}

// Retrieves the notifier for a reconnect task associated with a given route.
// Returns `Some(Arc<Notify>)` if a reconnect task is in the `Reconnecting` state.
// Returns `None` if:
// - There is no refresh task for the route's address.
// - The reconnect task is in `ReconnectingTooLong` state, with a debug log for clarity.
pub(crate) fn notifier_for_route(&self, route: &Route) -> Option<Arc<Notify>> {
let address = self.address_for_route(route)?;

if let Some(task_state) = self
.refresh_conn_state
.refresh_address_in_progress
.get(&address)
{
match &task_state.status {
RefreshTaskStatus::Reconnecting(notifier) => {
debug!(
"notifier_for_route: Found reconnect notifier for address: {}",
address
);
Some(notifier.get_notifier())
}
RefreshTaskStatus::ReconnectingTooLong => {
debug!(
"notifier_for_route: Address {} is in ReconnectingTooLong state. No notifier will be returned.",
address
);
None
}
}
} else {
debug!(
"notifier_for_route: No refresh task exists for address: {}. No notifier will be returned.",
address
);
None
}
}

pub(crate) fn all_node_connections(
&self,
) -> impl Iterator<Item = ConnectionAndAddress<Connection>> + '_ {
Expand Down Expand Up @@ -572,6 +827,7 @@ mod tests {
connection_map,
read_from_replica_strategy: ReadFromReplicaStrategy::AZAffinity("use-1a".to_string()),
topology_hash: 0,
refresh_conn_state: Default::default(),
}
}

Expand Down Expand Up @@ -628,6 +884,7 @@ mod tests {
connection_map,
read_from_replica_strategy: strategy,
topology_hash: 0,
refresh_conn_state: Default::default(),
}
}

Expand Down
Loading
Loading