Skip to content

Commit

Permalink
CARL ClusterManager -> Do not attempt to re-deploy cluster when peers…
Browse files Browse the repository at this point in the history
… are blocked by that cluster.
  • Loading branch information
mbfm committed Nov 14, 2024
1 parent 4562a59 commit 3d8adb2
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 67 deletions.
224 changes: 170 additions & 54 deletions opendut-carl/src/actions/clusters/determine_cluster_peer_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use crate::resources::storage::ResourcesStorageApi;
use opendut_types::cluster::ClusterId;
use opendut_types::peer::state::{PeerState, PeerUpState};
use opendut_types::peer::PeerId;
use std::collections::HashMap;
use std::ops::Not;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

#[derive(Clone)]
Expand All @@ -34,29 +33,55 @@ pub async fn determine_cluster_peer_states(params: DetermineClusterPeerStatesPar
.flatten_persistence_result()
.map_err(|source| DetermineClusterPeerStatesError::Persistence { cluster_id, source })?;

Ok(ClusterPeerStates { peer_states: cluster_peer_states })
Ok(ClusterPeerStates { cluster_id, peer_states: cluster_peer_states })
}

pub struct ClusterPeerStates {
pub(crate) struct ClusterPeerStates {
pub cluster_id: ClusterId,
pub peer_states: HashMap<PeerId, PeerState>,
}
impl ClusterPeerStates {
pub fn filter_unavailable_peers(&self) -> Vec<PeerId> {
self.peer_states

pub fn check_cluster_deployable(&self) -> ClusterDeployable {
let mut only_blocked_by_self = true;

let unavailable_peers: HashSet<PeerId> = self.peer_states
.iter()
.filter_map(|(peer_id, peer_state)| {
let is_available = matches!(
peer_state,
PeerState::Up { inner: PeerUpState::Available, .. }
);

is_available.not().then_some(peer_id)
if let PeerState::Down = peer_state {
only_blocked_by_self = false;
Some(peer_id)
} else if let PeerState::Up { inner: PeerUpState::Blocked { by_cluster, .. }, .. } = peer_state {
if by_cluster != &self.cluster_id {
only_blocked_by_self = false;
}
Some(peer_id)
} else {
None
}
})
.cloned()
.collect()
.collect();

if unavailable_peers.is_empty() {
ClusterDeployable::AllPeersAvailable
}
else if only_blocked_by_self {
ClusterDeployable::AlreadyDeployed
}
else {
ClusterDeployable::NotAllPeersAvailable { unavailable_peers }
}
}
}

#[derive(Debug, PartialEq, Eq)]
pub(crate) enum ClusterDeployable {
AllPeersAvailable,
AlreadyDeployed,
NotAllPeersAvailable { unavailable_peers: HashSet<PeerId> },
}

#[derive(thiserror::Error, Debug)]
pub enum DetermineClusterPeerStatesError {
#[error("Determining the peer states for cluster <{cluster_id}> was not possible, because determining the cluster peers failed.")]
Expand All @@ -72,75 +97,166 @@ mod tests {
use crate::resources::manager::ResourcesManager;
use opendut_types::cluster::{ClusterConfiguration, ClusterName};
use opendut_types::peer::executor::ExecutorDescriptors;
use opendut_types::peer::state::PeerUpState;
use opendut_types::peer::state::{PeerBlockedState, PeerUpState};
use opendut_types::peer::{PeerDescriptor, PeerId, PeerName, PeerNetworkDescriptor};
use opendut_types::topology::DeviceName;
use opendut_types::topology::{DeviceDescriptor, DeviceId, Topology};
use opendut_types::util::net::{NetworkInterfaceConfiguration, NetworkInterfaceDescriptor, NetworkInterfaceId, NetworkInterfaceName};
use std::collections::HashSet;
use std::net::IpAddr;
use std::ops::Not;
use std::str::FromStr;

#[tokio::test]
async fn should_determine_the_cluster_state() -> anyhow::Result<()> {
let resources_manager = ResourcesManager::new_in_memory();

let peer_a = generate_peer_descriptor()?;
resources_manager.insert(peer_a.id, peer_a.clone()).await?;

let peer_b = generate_peer_descriptor()?;
resources_manager.insert(peer_b.id, peer_b.clone()).await?;

let peer_not_in_cluster = generate_peer_descriptor()?;
resources_manager.insert(peer_not_in_cluster.id, peer_not_in_cluster.clone()).await?;

let cluster = ClusterConfiguration {
id: ClusterId::random(),
name: ClusterName::try_from("cluster")?,
leader: peer_a.id,
devices: HashSet::from_iter(
peer_a.topology.devices.iter()
.chain(peer_b.topology.devices.iter())
.map(|device| device.id)
),
};
resources_manager.insert(cluster.id, cluster.clone()).await?;
async fn should_filter_down_peers() -> anyhow::Result<()> {
let Fixture { resources_manager, peer_a, peer_b, cluster, remote_host } = Fixture::create().await?;

let params = DetermineClusterPeerStatesParams { resources_manager: resources_manager.clone(), cluster_id: cluster.id };

let result = actions::determine_cluster_peer_states(params.clone()).await?;
assert_eq!(result.peer_states, HashMap::from_iter([
let cluster_peer_states = actions::determine_cluster_peer_states(params.clone()).await?;
assert_eq!(cluster_peer_states.peer_states, HashMap::from_iter([
(peer_a.id, PeerState::Down),
(peer_b.id, PeerState::Down),
]));
assert!(result.filter_unavailable_peers().is_empty().not());
assert_eq!(
cluster_peer_states.check_cluster_deployable(),
ClusterDeployable::NotAllPeersAvailable { unavailable_peers: HashSet::from_iter(vec![peer_a.id, peer_b.id]) }
);


let peer_a_state = PeerState::Up { inner: PeerUpState::Available, remote_host: IpAddr::from_str("127.0.0.1")? };
resources_manager.insert(peer_a.id, peer_a_state.clone()).await?;
let available_state = PeerState::Up { inner: PeerUpState::Available, remote_host };

let result = actions::determine_cluster_peer_states(params.clone()).await?;
assert_eq!(result.peer_states, HashMap::from_iter([
(peer_a.id, peer_a_state.clone()),
resources_manager.insert(peer_a.id, available_state.clone()).await?;

let cluster_peer_states = actions::determine_cluster_peer_states(params.clone()).await?;
assert_eq!(cluster_peer_states.peer_states, HashMap::from_iter([
(peer_a.id, available_state.clone()),
(peer_b.id, PeerState::Down),
]));
assert!(result.filter_unavailable_peers().is_empty().not());
assert_eq!(
cluster_peer_states.check_cluster_deployable(),
ClusterDeployable::NotAllPeersAvailable { unavailable_peers: HashSet::from_iter(vec![peer_b.id]) }
);

resources_manager.insert(peer_b.id, available_state.clone()).await?;

let cluster_peer_states = actions::determine_cluster_peer_states(params.clone()).await?;
assert_eq!(cluster_peer_states.peer_states, HashMap::from_iter([
(peer_a.id, available_state.clone()),
(peer_b.id, available_state.clone()),
]));
assert_eq!(
cluster_peer_states.check_cluster_deployable(),
ClusterDeployable::AllPeersAvailable
);

Ok(())
}

#[tokio::test]
async fn should_filter_blocked_peers() -> anyhow::Result<()> {
let Fixture { resources_manager, peer_a, peer_b, cluster, remote_host } = Fixture::create().await?;

let params = DetermineClusterPeerStatesParams { resources_manager: resources_manager.clone(), cluster_id: cluster.id };

let blocked_by_other_cluster_state = PeerState::Up { inner: PeerUpState::Blocked { by_cluster: ClusterId::random(), inner: PeerBlockedState::Member }, remote_host };
let blocked_by_own_cluster_state = PeerState::Up { inner: PeerUpState::Blocked { by_cluster: cluster.id, inner: PeerBlockedState::Member }, remote_host };
let available_state = PeerState::Up { inner: PeerUpState::Available, remote_host };

resources_manager.insert(peer_a.id, blocked_by_other_cluster_state.clone()).await?;
resources_manager.insert(peer_b.id, blocked_by_other_cluster_state.clone()).await?;

let cluster_peer_states = actions::determine_cluster_peer_states(params.clone()).await?;
assert_eq!(cluster_peer_states.peer_states, HashMap::from_iter([
(peer_a.id, blocked_by_other_cluster_state.clone()),
(peer_b.id, blocked_by_other_cluster_state.clone()),
]));
assert_eq!(
cluster_peer_states.check_cluster_deployable(),
ClusterDeployable::NotAllPeersAvailable { unavailable_peers: HashSet::from_iter(vec![peer_a.id, peer_b.id]) }
);

resources_manager.insert(peer_a.id, available_state.clone()).await?;

let cluster_peer_states = actions::determine_cluster_peer_states(params.clone()).await?;
assert_eq!(cluster_peer_states.peer_states, HashMap::from_iter([
(peer_a.id, available_state.clone()),
(peer_b.id, blocked_by_other_cluster_state.clone()),
]));
assert_eq!(
cluster_peer_states.check_cluster_deployable(),
ClusterDeployable::NotAllPeersAvailable { unavailable_peers: HashSet::from_iter(vec![peer_b.id]) }
);

resources_manager.insert(peer_b.id, available_state.clone()).await?;

let cluster_peer_states = actions::determine_cluster_peer_states(params.clone()).await?;
assert_eq!(cluster_peer_states.peer_states, HashMap::from_iter([
(peer_a.id, available_state.clone()),
(peer_b.id, available_state.clone()),
]));
assert_eq!(
cluster_peer_states.check_cluster_deployable(),
ClusterDeployable::AllPeersAvailable
);


let peer_b_state = PeerState::Up { inner: PeerUpState::Available, remote_host: IpAddr::from_str("127.0.0.2")? };
resources_manager.insert(peer_b.id, peer_b_state.clone()).await?;
resources_manager.insert(peer_a.id, blocked_by_own_cluster_state.clone()).await?;

let result = actions::determine_cluster_peer_states(params).await?;
assert_eq!(result.peer_states, HashMap::from_iter([
(peer_a.id, peer_a_state),
(peer_b.id, peer_b_state),
let cluster_peer_states = actions::determine_cluster_peer_states(params.clone()).await?;
assert_eq!(cluster_peer_states.peer_states, HashMap::from_iter([
(peer_a.id, blocked_by_own_cluster_state.clone()),
(peer_b.id, available_state.clone()),
]));
assert!(result.filter_unavailable_peers().is_empty());
assert_eq!(
cluster_peer_states.check_cluster_deployable(),
ClusterDeployable::AlreadyDeployed
);

Ok(())
}

struct Fixture {
resources_manager: ResourcesManagerRef,
peer_a: PeerDescriptor,
peer_b: PeerDescriptor,
cluster: ClusterConfiguration,
remote_host: IpAddr,
}
impl Fixture {
async fn create() -> anyhow::Result<Self> {
let resources_manager = ResourcesManager::new_in_memory();

let peer_a = generate_peer_descriptor()?;
resources_manager.insert(peer_a.id, peer_a.clone()).await?;

let peer_b = generate_peer_descriptor()?;
resources_manager.insert(peer_b.id, peer_b.clone()).await?;

let peer_not_in_cluster = generate_peer_descriptor()?;
resources_manager.insert(peer_not_in_cluster.id, peer_not_in_cluster.clone()).await?;

let cluster = ClusterConfiguration {
id: ClusterId::random(),
name: ClusterName::try_from("cluster")?,
leader: peer_a.id,
devices: HashSet::from_iter(
peer_a.topology.devices.iter()
.chain(peer_b.topology.devices.iter())
.map(|device| device.id)
),
};
resources_manager.insert(cluster.id, cluster.clone()).await?;

Ok(Self {
resources_manager,
peer_a,
peer_b,
cluster,
remote_host: IpAddr::from_str("127.0.0.1")?, //doesn't matter
})
}
}

fn generate_peer_descriptor() -> anyhow::Result<PeerDescriptor> {
let network_interface_id = NetworkInterfaceId::random();

Expand Down
33 changes: 20 additions & 13 deletions opendut-carl/src/cluster/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use opendut_types::util::net::{NetworkInterfaceDescriptor, NetworkInterfaceName}
use opendut_types::util::Port;

use crate::actions;
use crate::actions::{AssignClusterOptions, AssignClusterParams, DeleteClusterDeploymentParams, DetermineClusterPeerStatesParams, GetPeerStateParams, ListPeerDescriptorsParams, StoreClusterConfigurationParams};
use crate::actions::{AssignClusterOptions, AssignClusterParams, ClusterDeployable, DeleteClusterDeploymentParams, DetermineClusterPeerStatesParams, GetPeerStateParams, ListPeerDescriptorsParams, StoreClusterConfigurationParams};
use crate::peer::broker::PeerMessagingBrokerRef;
use crate::persistence::error::PersistenceResult;
use crate::resources::manager::{ResourcesManagerRef, SubscriptionEvent};
Expand Down Expand Up @@ -236,18 +236,25 @@ impl ClusterManager {
}).await
.map_err(|error| DeployClusterError::Internal { cluster_id, cause: format!("Failed to determine states of peers: {error}") })?;

let unavailable_peers = cluster_peer_states.filter_unavailable_peers();
if unavailable_peers.is_empty() {
debug!("All peers of cluster <{cluster_id}> are now available. Deploying...");
self.deploy_cluster(cluster_id).await?;
} else {
trace!(
"Not all peers of cluster <{cluster_id}> are available, so not deploying. Unavailable peers: {}",
unavailable_peers.iter()
.map(|peer_id| peer_id.to_string())
.collect::<Vec<_>>()
.join(", ")
);
let cluster_deployable = cluster_peer_states.check_cluster_deployable();

match cluster_deployable {
ClusterDeployable::AllPeersAvailable => {
debug!("All peers of cluster <{cluster_id}> are now available. Deploying...");
self.deploy_cluster(cluster_id).await?;
}
ClusterDeployable::AlreadyDeployed => {
debug!("Cluster <{cluster_id}> is already deployed. Not triggering new deployment.");
}
ClusterDeployable::NotAllPeersAvailable { unavailable_peers } => {
debug!(
"Not all peers of cluster <{cluster_id}> are available, so not deploying. Unavailable peers: {}",
unavailable_peers.iter()
.map(|peer_id| peer_id.to_string())
.collect::<Vec<_>>()
.join(", ")
);
}
}
Ok(())
}
Expand Down

0 comments on commit 3d8adb2

Please sign in to comment.