Skip to content

Commit

Permalink
Updated get_connection for SpecificNode to await on refreshing connec…
Browse files Browse the repository at this point in the history
…tion before redirecting to a random node

Signed-off-by: GilboaAWS <[email protected]>
  • Loading branch information
GilboaAWS committed Jan 15, 2025
1 parent e87e0cb commit 029aafc
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ pub(crate) enum RefreshTaskStatus {
// The task is actively reconnecting. Includes a notifier for tasks to wait on.
Reconnecting(RefreshTaskNotifier),
// The task has exceeded the allowed reconnection time.
#[allow(dead_code)]
ReconnectingTooLong,
}

Expand All @@ -209,7 +210,8 @@ impl RefreshTaskStatus {
// using the embedded `RefreshTaskNotifier` and updates the status to `ReconnectingTooLong`.
//
// If the status is already `ReconnectingTooLong`, this method does nothing.
pub fn flip_state(&mut self) {
#[allow(dead_code)]
pub fn flip_status_to_too_long(&mut self) {
if let RefreshTaskStatus::Reconnecting(notifier) = self {
debug!(
"RefreshTaskStatus: Notifying tasks before transitioning to ReconnectingTooLong."
Expand All @@ -220,6 +222,15 @@ impl RefreshTaskStatus {
debug!("RefreshTaskStatus: Already in ReconnectingTooLong status.");
}
}

pub fn notify_waiting_requests(&mut self) {
if let RefreshTaskStatus::Reconnecting(notifier) = self {
debug!("RefreshTaskStatus::notify_waiting_requests notify");
notifier.notify();
} else {
debug!("RefreshTaskStatus::notify_waiting_requests - ReconnectingTooLong status.");
}
}
}

// Struct combining the task handle and its status
Expand Down Expand Up @@ -501,6 +512,51 @@ where
})
}

// Fetches the master address for a given route.
// Returns `None` if no master address can be resolved.
pub(crate) fn address_for_route(&self, route: &Route) -> Option<String> {
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
Some(slot_map_value.addrs.primary().clone().to_string())
}

// Retrieves the notifier for a reconnect task associated with a given route.
// Returns `Some(Arc<Notify>)` if a reconnect task is in the `Reconnecting` state.
// Returns `None` if:
// - There is no refresh task for the route's address.
// - The reconnect task is in `ReconnectingTooLong` state, with a debug log for clarity.
pub(crate) fn notifier_for_route(&self, route: &Route) -> Option<Arc<Notify>> {
let address = self.address_for_route(route)?;

if let Some(task_state) = self
.refresh_conn_state
.refresh_address_in_progress
.get(&address)
{
match &task_state.status {
RefreshTaskStatus::Reconnecting(notifier) => {
debug!(
"notifier_for_route: Found reconnect notifier for address: {}",
address
);
Some(notifier.get_notifier())
}
RefreshTaskStatus::ReconnectingTooLong => {
debug!(
"notifier_for_route: Address {} is in ReconnectingTooLong state. No notifier will be returned.",
address
);
None
}
}
} else {
debug!(
"notifier_for_route: No refresh task exists for address: {}. No notifier will be returned.",
address
);
None
}
}

pub(crate) fn all_node_connections(
&self,
) -> impl Iterator<Item = ConnectionAndAddress<Connection>> + '_ {
Expand Down
225 changes: 144 additions & 81 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1414,83 +1414,99 @@ where
"Refreshing connection task to {:?} started",
address_clone_for_task
);
let _ = async {
let node_option = if check_existing_conn {
let connections_container =
inner_clone.conn_lock.read().expect(MUTEX_READ_ERR);
connections_container
.connection_map()
.get(&address_clone_for_task)
.map(|node| node.value().clone())
} else {
None
};

let mut cluster_params = inner_clone
.cluster_params
.read()
.expect(MUTEX_READ_ERR)
.clone();
let subs_guard = inner_clone.subscriptions_by_address.read().await;
cluster_params.pubsub_subscriptions =
subs_guard.get(&address_clone_for_task).cloned();
drop(subs_guard);
let node_option = if check_existing_conn {
let connections_container = inner_clone.conn_lock.read().expect(MUTEX_READ_ERR);
connections_container
.connection_map()
.get(&address_clone_for_task)
.map(|node| node.value().clone())
} else {
None
};

let node_result = get_or_create_conn(
&address_clone_for_task,
node_option,
&cluster_params,
conn_type,
inner_clone.glide_connection_options.clone(),
)
.await;
let mut cluster_params = inner_clone
.cluster_params
.read()
.expect(MUTEX_READ_ERR)
.clone();
let subs_guard = inner_clone.subscriptions_by_address.read().await;
cluster_params.pubsub_subscriptions =
subs_guard.get(&address_clone_for_task).cloned();
drop(subs_guard);

let node_result = get_or_create_conn(
&address_clone_for_task,
node_option,
&cluster_params,
conn_type,
inner_clone.glide_connection_options.clone(),
)
.await;

// Maintain the newly refreshed connection separately from the main connection map.
// This refreshed connection will be incorporated into the main connection map at the start of the poll_flush operation.
// This approach ensures that all requests within the current batch interact with a consistent connection map,
// preventing potential reordering issues.
//
// By delaying the integration of the refreshed connection:
//
// 1. We maintain consistency throughout the processing of a batch of requests.
// 2. We avoid mid-batch changes to the connection map that could lead to inconsistent routing or ordering of operations.
// 3. We ensure that all requests in a batch see the same cluster topology, reducing the risk of race conditions or unexpected behavior.
//
// This strategy effectively creates a synchronization point at the beginning of poll_flush, where the connection map is
// updated atomically for the next batch of operations. This approach balances the need for up-to-date connection information
// with the requirement for consistent request handling within each processing cycle.
match node_result {
Ok(node) => {
debug!(
"Succeeded to refresh connection for node {}.",
address_clone_for_task
);
inner_clone
.conn_lock
.write()
.expect(MUTEX_READ_ERR)
.refresh_conn_state
.refresh_addresses_done
.insert(address_clone_for_task, Some(node));
}
Err(err) => {
warn!(
"Failed to refresh connection for node {}. Error: `{:?}`",
address_clone_for_task, err
);
// TODO - When we move to retry more than once, we add this address to a new set of running to long, and then only move
// the RefreshTaskState.status to RunningTooLong in the poll_flush context inside update_refreshed_connection.
inner_clone
.conn_lock
.write()
.expect(MUTEX_READ_ERR)
.refresh_conn_state
.refresh_addresses_done
.insert(address_clone_for_task, None);
}
// Maintain the newly refreshed connection separately from the main connection map.
// This refreshed connection will be incorporated into the main connection map at the start of the poll_flush operation.
// This approach ensures that all requests within the current batch interact with a consistent connection map,
// preventing potential reordering issues.
//
// By delaying the integration of the refreshed connection:
//
// 1. We maintain consistency throughout the processing of a batch of requests.
// 2. We avoid mid-batch changes to the connection map that could lead to inconsistent routing or ordering of operations.
// 3. We ensure that all requests in a batch see the same cluster topology, reducing the risk of race conditions or unexpected behavior.
//
// This strategy effectively creates a synchronization point at the beginning of poll_flush, where the connection map is
// updated atomically for the next batch of operations. This approach balances the need for up-to-date connection information
// with the requirement for consistent request handling within each processing cycle.
match node_result {
Ok(node) => {
debug!(
"Succeeded to refresh connection for node {}.",
address_clone_for_task
);
inner_clone
.conn_lock
.write()
.expect(MUTEX_READ_ERR)
.refresh_conn_state
.refresh_addresses_done
.insert(address_clone_for_task.clone(), Some(node));
}
Err(err) => {
warn!(
"Failed to refresh connection for node {}. Error: `{:?}`",
address_clone_for_task, err
);
// TODO - When we move to retry more than once, we add this address to a new set of running to long, and then only move
// the RefreshTaskState.status to RunningTooLong in the poll_flush context inside update_refreshed_connection.
inner_clone
.conn_lock
.write()
.expect(MUTEX_READ_ERR)
.refresh_conn_state
.refresh_addresses_done
.insert(address_clone_for_task.clone(), None);
}
}
.await;

// Need to notify here the awaitng requests inorder to awaket the context of the poll_flush as
// it awaits on this notifier inside the get_connection in the poll_next inside poll_complete.
// Otherwise poll_flush won't be polled until the next start_send or other requests I/O.
if let Some(task_state) = inner_clone
.conn_lock
.write()
.expect(MUTEX_READ_ERR)
.refresh_conn_state
.refresh_address_in_progress
.get_mut(&address_clone_for_task)
{
task_state.status.notify_waiting_requests();
} else {
warn!(
"No refresh task state found for address: {}",
address_clone_for_task
);
}

info!("Refreshing connection task to {:?} is done", address_clone);
});
Expand Down Expand Up @@ -2296,20 +2312,26 @@ where
)
}
InternalSingleNodeRouting::SpecificNode(route) => {
if let Some((conn, address)) = core
.conn_lock
.read()
.expect(MUTEX_READ_ERR)
.connection_for_route(&route)
{
ConnectionCheck::Found((conn, address))
// Step 1: Attempt to get the connection directly using the route.
let conn_check = {
let conn_lock = core.conn_lock.read().expect(MUTEX_READ_ERR);
conn_lock
.connection_for_route(&route)
.map(ConnectionCheck::Found)
};

if let Some(conn_check) = conn_check {
conn_check
} else {
// No connection is found for the given route:
// - For key-based commands, attempt redirection to a random node,
// hopefully to be redirected afterwards by a MOVED error.
// - For non-key-based commands, avoid attempting redirection to a random node
// as it wouldn't result in MOVED hints and can lead to unwanted results
// (e.g., sending management command to a different node than the user asked for); instead, raise the error.
let mut conn_check = ConnectionCheck::RandomConnection;

// Step 2: Handle cases where no connection is found for the route.
let routable_cmd = cmd.and_then(|cmd| Routable::command(&*cmd));
if routable_cmd.is_some()
&& !RoutingInfo::is_key_routing_command(&routable_cmd.unwrap())
Expand All @@ -2320,10 +2342,51 @@ where
format!("{route:?}"),
)
.into());
}

debug!(
"SpecificNode: No connection found for route `{route:?}`. Checking for reconnect tasks before redirecting to a random node."
);

// Step 3: Obtain the reconnect notifier, ensuring the lock is released immediately after.
let reconnect_notifier = {
let conn_lock = core.conn_lock.write().expect(MUTEX_READ_ERR);
conn_lock.notifier_for_route(&route).clone()
};

// Step 4: If a notifier exists, wait for it to signal completion.
if let Some(notifier) = reconnect_notifier {
debug!(
"SpecificNode: Waiting on reconnect notifier for route `{route:?}`."
);

// Drop the lock before awaiting
notifier.notified().await;

debug!(
"SpecificNode: Finished waiting on notifier for route `{route:?}`. Retrying connection lookup."
);

// Step 5: Retry the connection lookup after waiting for the reconnect task.
if let Some((conn, address)) = core
.conn_lock
.read()
.expect(MUTEX_READ_ERR)
.connection_for_route(&route)
{
conn_check = ConnectionCheck::Found((conn, address));
} else {
debug!(
"SpecificNode: No connection found for route `{route:?}` after waiting on reconnect notifier. Proceeding to random node."
);
}
} else {
warn!("No connection found for route `{route:?}`. Attempting redirection to a random node.");
ConnectionCheck::RandomConnection
debug!(
"SpecificNode: No active reconnect task for route `{route:?}`. Proceeding to random node."
);
}

conn_check
}
}
InternalSingleNodeRouting::Random => ConnectionCheck::RandomConnection,
Expand Down

0 comments on commit 029aafc

Please sign in to comment.