Skip to content

Commit

Permalink
CARL ClusterManager -> (Re-)Deploy cluster, if all peers are available.
Browse files Browse the repository at this point in the history
  • Loading branch information
mbfm committed Oct 21, 2024
1 parent d2ac751 commit 9ab5a88
Show file tree
Hide file tree
Showing 16 changed files with 565 additions and 247 deletions.
44 changes: 44 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ task-local-extensions = "0.1.4"
tempfile = "3.10.1"
testcontainers-modules = "0.9.0"
test-with = { version = "0.13.0", default-features = false }
test-log = { version = "0.2.16", default-features = false, features = ["trace", "color"] }
time = "0.3.31"
thiserror = "1.0.56"
tokio = { version = "1.35.1", default-features = false }
Expand Down
29 changes: 23 additions & 6 deletions opendut-carl/src/actions/clusters/determine_cluster_peer_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::persistence::error::{FlattenPersistenceResult, PersistenceError};
use crate::resources::manager::ResourcesManagerRef;
use crate::resources::storage::ResourcesStorageApi;
use opendut_types::cluster::ClusterId;
use opendut_types::peer::state::PeerState;
use opendut_types::peer::state::{PeerState, PeerUpState};
use opendut_types::peer::PeerId;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -15,7 +15,7 @@ pub struct DetermineClusterPeerStatesParams {
pub cluster_id: ClusterId,
}

pub async fn determine_cluster_peer_states(params: DetermineClusterPeerStatesParams) -> Result<HashMap<PeerId, PeerState>, DetermineClusterPeerStatesError> {
pub async fn determine_cluster_peer_states(params: DetermineClusterPeerStatesParams) -> Result<ClusterPeerStates, DetermineClusterPeerStatesError> {
let DetermineClusterPeerStatesParams { resources_manager, cluster_id } = params;

let cluster_peers = actions::determine_cluster_peers(DetermineClusterPeersParams { cluster_id, resources_manager: Arc::clone(&resources_manager) }).await
Expand All @@ -33,7 +33,20 @@ pub async fn determine_cluster_peer_states(params: DetermineClusterPeerStatesPar
.flatten_persistence_result()
.map_err(|source| DetermineClusterPeerStatesError::Persistence { cluster_id, source })?;

Ok(cluster_peer_states)
Ok(ClusterPeerStates { peer_states: cluster_peer_states })
}

pub struct ClusterPeerStates {
pub peer_states: HashMap<PeerId, PeerState>,
}
impl ClusterPeerStates {
pub fn all_peers_available(&self) -> bool {
self.peer_states.values()
.all(|peer_state| matches!(
peer_state,
PeerState::Up { inner: PeerUpState::Available, .. }
))
}
}

#[derive(thiserror::Error, Debug)]
Expand All @@ -58,6 +71,7 @@ mod tests {
use opendut_types::util::net::{NetworkInterfaceConfiguration, NetworkInterfaceDescriptor, NetworkInterfaceId, NetworkInterfaceName};
use std::collections::HashSet;
use std::net::IpAddr;
use std::ops::Not;
use std::str::FromStr;

#[tokio::test]
Expand Down Expand Up @@ -88,30 +102,33 @@ mod tests {
let params = DetermineClusterPeerStatesParams { resources_manager: resources_manager.clone(), cluster_id: cluster.id };

let result = actions::determine_cluster_peer_states(params.clone()).await?;
assert_eq!(result, HashMap::from_iter([
assert_eq!(result.peer_states, HashMap::from_iter([
(peer_a.id, PeerState::Down),
(peer_b.id, PeerState::Down),
]));
assert!(result.all_peers_available().not());


let peer_a_state = PeerState::Up { inner: PeerUpState::Available, remote_host: IpAddr::from_str("127.0.0.1")? };
resources_manager.insert(peer_a.id, peer_a_state.clone()).await?;

let result = actions::determine_cluster_peer_states(params.clone()).await?;
assert_eq!(result, HashMap::from_iter([
assert_eq!(result.peer_states, HashMap::from_iter([
(peer_a.id, peer_a_state.clone()),
(peer_b.id, PeerState::Down),
]));
assert!(result.all_peers_available().not());


let peer_b_state = PeerState::Up { inner: PeerUpState::Available, remote_host: IpAddr::from_str("127.0.0.2")? };
resources_manager.insert(peer_b.id, peer_b_state.clone()).await?;

let result = actions::determine_cluster_peer_states(params).await?;
assert_eq!(result, HashMap::from_iter([
assert_eq!(result.peer_states, HashMap::from_iter([
(peer_a.id, peer_a_state),
(peer_b.id, peer_b_state),
]));
assert!(result.all_peers_available());

Ok(())
}
Expand Down
Loading

0 comments on commit 9ab5a88

Please sign in to comment.