Skip to content

Commit

Permalink
CARL actions::store_peer_descriptor -> Make use of transactions.
Browse files Browse the repository at this point in the history
  • Loading branch information
mbfm committed Sep 4, 2024
1 parent d5b6b9b commit d88d05c
Showing 1 changed file with 31 additions and 36 deletions.
67 changes: 31 additions & 36 deletions opendut-carl/src/actions/peers/store_peer_descriptor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::persistence::error::{FlattenPersistenceResult, PersistenceError};
use crate::persistence::error::PersistenceError;
use crate::resources::manager::ResourcesManagerRef;
use crate::resources::storage::ResourcesStorageApi;
use crate::vpn::Vpn;
use opendut_carl_api::carl::peer::StorePeerDescriptorError;
use opendut_types::peer;
Expand All @@ -8,7 +9,6 @@ use opendut_types::peer::ethernet::EthernetBridge;
use opendut_types::peer::{PeerDescriptor, PeerId};
use opendut_types::util::net::NetworkInterfaceName;
use tracing::{debug, error, info, warn};
use crate::resources::storage::ResourcesStorageApi;

pub struct StorePeerDescriptorParams {
pub resources_manager: ResourcesManagerRef,
Expand All @@ -32,14 +32,17 @@ pub async fn store_peer_descriptor(params: StorePeerDescriptorParams) -> Result<
let peer_descriptor = params.peer_descriptor;
let resources_manager = params.resources_manager;

let is_new_peer = resources_manager.resources_mut(|resources| {
let old_peer_descriptor = resources.get::<PeerDescriptor>(peer_id)?;
resources_manager.resources_mut(|resources| {
let old_peer_descriptor = resources.get::<PeerDescriptor>(peer_id)
.map_err(|cause: PersistenceError| StorePeerDescriptorError::Internal { peer_id, peer_name: Clone::clone(&peer_name), cause: cause.to_string() })?;

let is_new_peer = old_peer_descriptor.is_none();

let old_peer_configuration = OldPeerConfiguration {
cluster_assignment: None,
};
resources.insert(peer_id, old_peer_configuration)?;
resources.insert(peer_id, old_peer_configuration)
.map_err(|cause: PersistenceError| StorePeerDescriptorError::Internal { peer_id, peer_name: Clone::clone(&peer_name), cause: cause.to_string() })?;


let peer_configuration = {
Expand All @@ -59,40 +62,32 @@ pub async fn store_peer_descriptor(params: StorePeerDescriptorParams) -> Result<

peer_configuration
};
resources.insert(peer_id, peer_configuration)?; //FIXME don't just insert, but rather update existing values via ID with intelligent logic (in a separate action)
resources.insert(peer_id, peer_descriptor)?;
resources.insert(peer_id, peer_configuration) //FIXME don't just insert, but rather update existing values via ID with intelligent logic (in a separate action)
.map_err(|cause: PersistenceError| StorePeerDescriptorError::Internal { peer_id, peer_name: Clone::clone(&peer_name), cause: cause.to_string() })?;
resources.insert(peer_id, peer_descriptor)
.map_err(|cause: PersistenceError| StorePeerDescriptorError::Internal { peer_id, peer_name: Clone::clone(&peer_name), cause: cause.to_string() })?;

if is_new_peer {
if let Vpn::Enabled { vpn_client } = params.vpn {
debug!("Creating VPN peer <{peer_id}>.");
futures::executor::block_on(
vpn_client.create_peer(peer_id)
).map_err(|cause| StorePeerDescriptorError::Internal { peer_id, peer_name: Clone::clone(&peer_name), cause: cause.to_string() })?;
info!("Successfully created VPN peer <{peer_id}>.");
} else {
warn!("VPN disabled. Skipping VPN peer creation!");
}
}

Ok(is_new_peer)
}).await
.flatten_persistence_result()
.map_err(|cause: PersistenceError| StorePeerDescriptorError::Internal {
peer_id,
peer_name: Clone::clone(&peer_name),
cause: cause.to_string(),
})?;

if is_new_peer {
if let Vpn::Enabled { vpn_client } = params.vpn {
debug!("Creating VPN peer <{peer_id}>.");
vpn_client.create_peer(peer_id)
.await
.map_err(|cause| StorePeerDescriptorError::Internal {
peer_id,
peer_name: Clone::clone(&peer_name),
cause: cause.to_string()
})?; // TODO: When a failure happens, we should rollback changes previously made to resources.
info!("Successfully created VPN peer <{peer_id}>.");
if is_new_peer {
info!("Successfully stored peer descriptor of '{peer_name}' <{peer_id}>.");
} else {
warn!("VPN disabled. Skipping VPN peer creation!");
info!("Successfully updated peer descriptor of '{peer_name}' <{peer_id}>.");
}
}

if is_new_peer {
info!("Successfully stored peer descriptor of '{peer_name}' <{peer_id}>.");
}
else {
info!("Successfully updated peer descriptor of '{peer_name}' <{peer_id}>.");
}

Ok(())
}).await
.map_err(|cause: PersistenceError| StorePeerDescriptorError::Internal { peer_id, peer_name: Clone::clone(&peer_name), cause: cause.to_string() })??;

Ok(peer_id)
}
Expand Down

0 comments on commit d88d05c

Please sign in to comment.