From 029aafc4bb7e3dd306c9008d387844cee656ccc4 Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Wed, 15 Jan 2025 11:35:59 +0000 Subject: [PATCH] Updated get_connection for SpecificNode to await on refreshing connection before redirecting to a random node Signed-off-by: GilboaAWS --- .../cluster_async/connections_container.rs | 58 ++++- .../redis-rs/redis/src/cluster_async/mod.rs | 225 +++++++++++------- 2 files changed, 201 insertions(+), 82 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs index 5f588e6fb4..e9aa4e1201 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs @@ -184,6 +184,7 @@ pub(crate) enum RefreshTaskStatus { // The task is actively reconnecting. Includes a notifier for tasks to wait on. Reconnecting(RefreshTaskNotifier), // The task has exceeded the allowed reconnection time. + #[allow(dead_code)] ReconnectingTooLong, } @@ -209,7 +210,8 @@ impl RefreshTaskStatus { // using the embedded `RefreshTaskNotifier` and updates the status to `ReconnectingTooLong`. // // If the status is already `ReconnectingTooLong`, this method does nothing. - pub fn flip_state(&mut self) { + #[allow(dead_code)] + pub fn flip_status_to_too_long(&mut self) { if let RefreshTaskStatus::Reconnecting(notifier) = self { debug!( "RefreshTaskStatus: Notifying tasks before transitioning to ReconnectingTooLong." @@ -220,6 +222,15 @@ impl RefreshTaskStatus { debug!("RefreshTaskStatus: Already in ReconnectingTooLong status."); } } + + pub fn notify_waiting_requests(&mut self) { + if let RefreshTaskStatus::Reconnecting(notifier) = self { + debug!("RefreshTaskStatus::notify_waiting_requests notify"); + notifier.notify(); + } else { + debug!("RefreshTaskStatus::notify_waiting_requests - ReconnectingTooLong status."); + } + } } // Struct combining the task handle and its status @@ -501,6 +512,51 @@ where }) } + // Fetches the master address for a given route. + // Returns `None` if no master address can be resolved. + pub(crate) fn address_for_route(&self, route: &Route) -> Option { + let slot_map_value = self.slot_map.slot_value_for_route(route)?; + Some(slot_map_value.addrs.primary().clone().to_string()) + } + + // Retrieves the notifier for a reconnect task associated with a given route. + // Returns `Some(Arc)` if a reconnect task is in the `Reconnecting` state. + // Returns `None` if: + // - There is no refresh task for the route's address. + // - The reconnect task is in `ReconnectingTooLong` state, with a debug log for clarity. + pub(crate) fn notifier_for_route(&self, route: &Route) -> Option> { + let address = self.address_for_route(route)?; + + if let Some(task_state) = self + .refresh_conn_state + .refresh_address_in_progress + .get(&address) + { + match &task_state.status { + RefreshTaskStatus::Reconnecting(notifier) => { + debug!( + "notifier_for_route: Found reconnect notifier for address: {}", + address + ); + Some(notifier.get_notifier()) + } + RefreshTaskStatus::ReconnectingTooLong => { + debug!( + "notifier_for_route: Address {} is in ReconnectingTooLong state. No notifier will be returned.", + address + ); + None + } + } + } else { + debug!( + "notifier_for_route: No refresh task exists for address: {}. No notifier will be returned.", + address + ); + None + } + } + pub(crate) fn all_node_connections( &self, ) -> impl Iterator> + '_ { diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 69683fda97..36334d497d 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1414,83 +1414,99 @@ where "Refreshing connection task to {:?} started", address_clone_for_task ); - let _ = async { - let node_option = if check_existing_conn { - let connections_container = - inner_clone.conn_lock.read().expect(MUTEX_READ_ERR); - connections_container - .connection_map() - .get(&address_clone_for_task) - .map(|node| node.value().clone()) - } else { - None - }; - let mut cluster_params = inner_clone - .cluster_params - .read() - .expect(MUTEX_READ_ERR) - .clone(); - let subs_guard = inner_clone.subscriptions_by_address.read().await; - cluster_params.pubsub_subscriptions = - subs_guard.get(&address_clone_for_task).cloned(); - drop(subs_guard); + let node_option = if check_existing_conn { + let connections_container = inner_clone.conn_lock.read().expect(MUTEX_READ_ERR); + connections_container + .connection_map() + .get(&address_clone_for_task) + .map(|node| node.value().clone()) + } else { + None + }; - let node_result = get_or_create_conn( - &address_clone_for_task, - node_option, - &cluster_params, - conn_type, - inner_clone.glide_connection_options.clone(), - ) - .await; + let mut cluster_params = inner_clone + .cluster_params + .read() + .expect(MUTEX_READ_ERR) + .clone(); + let subs_guard = inner_clone.subscriptions_by_address.read().await; + cluster_params.pubsub_subscriptions = + subs_guard.get(&address_clone_for_task).cloned(); + drop(subs_guard); + + let node_result = get_or_create_conn( + &address_clone_for_task, + node_option, + &cluster_params, + conn_type, + inner_clone.glide_connection_options.clone(), + ) + .await; - // Maintain the newly refreshed connection separately from the main connection map. - // This refreshed connection will be incorporated into the main connection map at the start of the poll_flush operation. - // This approach ensures that all requests within the current batch interact with a consistent connection map, - // preventing potential reordering issues. - // - // By delaying the integration of the refreshed connection: - // - // 1. We maintain consistency throughout the processing of a batch of requests. - // 2. We avoid mid-batch changes to the connection map that could lead to inconsistent routing or ordering of operations. - // 3. We ensure that all requests in a batch see the same cluster topology, reducing the risk of race conditions or unexpected behavior. - // - // This strategy effectively creates a synchronization point at the beginning of poll_flush, where the connection map is - // updated atomically for the next batch of operations. This approach balances the need for up-to-date connection information - // with the requirement for consistent request handling within each processing cycle. - match node_result { - Ok(node) => { - debug!( - "Succeeded to refresh connection for node {}.", - address_clone_for_task - ); - inner_clone - .conn_lock - .write() - .expect(MUTEX_READ_ERR) - .refresh_conn_state - .refresh_addresses_done - .insert(address_clone_for_task, Some(node)); - } - Err(err) => { - warn!( - "Failed to refresh connection for node {}. Error: `{:?}`", - address_clone_for_task, err - ); - // TODO - When we move to retry more than once, we add this address to a new set of running to long, and then only move - // the RefreshTaskState.status to RunningTooLong in the poll_flush context inside update_refreshed_connection. - inner_clone - .conn_lock - .write() - .expect(MUTEX_READ_ERR) - .refresh_conn_state - .refresh_addresses_done - .insert(address_clone_for_task, None); - } + // Maintain the newly refreshed connection separately from the main connection map. + // This refreshed connection will be incorporated into the main connection map at the start of the poll_flush operation. + // This approach ensures that all requests within the current batch interact with a consistent connection map, + // preventing potential reordering issues. + // + // By delaying the integration of the refreshed connection: + // + // 1. We maintain consistency throughout the processing of a batch of requests. + // 2. We avoid mid-batch changes to the connection map that could lead to inconsistent routing or ordering of operations. + // 3. We ensure that all requests in a batch see the same cluster topology, reducing the risk of race conditions or unexpected behavior. + // + // This strategy effectively creates a synchronization point at the beginning of poll_flush, where the connection map is + // updated atomically for the next batch of operations. This approach balances the need for up-to-date connection information + // with the requirement for consistent request handling within each processing cycle. + match node_result { + Ok(node) => { + debug!( + "Succeeded to refresh connection for node {}.", + address_clone_for_task + ); + inner_clone + .conn_lock + .write() + .expect(MUTEX_READ_ERR) + .refresh_conn_state + .refresh_addresses_done + .insert(address_clone_for_task.clone(), Some(node)); + } + Err(err) => { + warn!( + "Failed to refresh connection for node {}. Error: `{:?}`", + address_clone_for_task, err + ); + // TODO - When we move to retry more than once, we add this address to a new set of running to long, and then only move + // the RefreshTaskState.status to RunningTooLong in the poll_flush context inside update_refreshed_connection. + inner_clone + .conn_lock + .write() + .expect(MUTEX_READ_ERR) + .refresh_conn_state + .refresh_addresses_done + .insert(address_clone_for_task.clone(), None); } } - .await; + + // Need to notify here the awaitng requests inorder to awaket the context of the poll_flush as + // it awaits on this notifier inside the get_connection in the poll_next inside poll_complete. + // Otherwise poll_flush won't be polled until the next start_send or other requests I/O. + if let Some(task_state) = inner_clone + .conn_lock + .write() + .expect(MUTEX_READ_ERR) + .refresh_conn_state + .refresh_address_in_progress + .get_mut(&address_clone_for_task) + { + task_state.status.notify_waiting_requests(); + } else { + warn!( + "No refresh task state found for address: {}", + address_clone_for_task + ); + } info!("Refreshing connection task to {:?} is done", address_clone); }); @@ -2296,13 +2312,16 @@ where ) } InternalSingleNodeRouting::SpecificNode(route) => { - if let Some((conn, address)) = core - .conn_lock - .read() - .expect(MUTEX_READ_ERR) - .connection_for_route(&route) - { - ConnectionCheck::Found((conn, address)) + // Step 1: Attempt to get the connection directly using the route. + let conn_check = { + let conn_lock = core.conn_lock.read().expect(MUTEX_READ_ERR); + conn_lock + .connection_for_route(&route) + .map(ConnectionCheck::Found) + }; + + if let Some(conn_check) = conn_check { + conn_check } else { // No connection is found for the given route: // - For key-based commands, attempt redirection to a random node, @@ -2310,6 +2329,9 @@ where // - For non-key-based commands, avoid attempting redirection to a random node // as it wouldn't result in MOVED hints and can lead to unwanted results // (e.g., sending management command to a different node than the user asked for); instead, raise the error. + let mut conn_check = ConnectionCheck::RandomConnection; + + // Step 2: Handle cases where no connection is found for the route. let routable_cmd = cmd.and_then(|cmd| Routable::command(&*cmd)); if routable_cmd.is_some() && !RoutingInfo::is_key_routing_command(&routable_cmd.unwrap()) @@ -2320,10 +2342,51 @@ where format!("{route:?}"), ) .into()); + } + + debug!( + "SpecificNode: No connection found for route `{route:?}`. Checking for reconnect tasks before redirecting to a random node." + ); + + // Step 3: Obtain the reconnect notifier, ensuring the lock is released immediately after. + let reconnect_notifier = { + let conn_lock = core.conn_lock.write().expect(MUTEX_READ_ERR); + conn_lock.notifier_for_route(&route).clone() + }; + + // Step 4: If a notifier exists, wait for it to signal completion. + if let Some(notifier) = reconnect_notifier { + debug!( + "SpecificNode: Waiting on reconnect notifier for route `{route:?}`." + ); + + // Drop the lock before awaiting + notifier.notified().await; + + debug!( + "SpecificNode: Finished waiting on notifier for route `{route:?}`. Retrying connection lookup." + ); + + // Step 5: Retry the connection lookup after waiting for the reconnect task. + if let Some((conn, address)) = core + .conn_lock + .read() + .expect(MUTEX_READ_ERR) + .connection_for_route(&route) + { + conn_check = ConnectionCheck::Found((conn, address)); + } else { + debug!( + "SpecificNode: No connection found for route `{route:?}` after waiting on reconnect notifier. Proceeding to random node." + ); + } } else { - warn!("No connection found for route `{route:?}`. Attempting redirection to a random node."); - ConnectionCheck::RandomConnection + debug!( + "SpecificNode: No active reconnect task for route `{route:?}`. Proceeding to random node." + ); } + + conn_check } } InternalSingleNodeRouting::Random => ConnectionCheck::RandomConnection,