diff --git a/.gitignore b/.gitignore index 4fffb2f8..5b70a898 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target /Cargo.lock +.vscode diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ae9507f..fd334f2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,10 +7,19 @@ Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] -### Added +### Changed -- Add `apply_target_id` field to `Node` struct for reverting node status. -- Add `apply_in_progress` field to `Node` struct for reverting node status. +- `Node` struct now has `as_is` and `to_be` `NodeSetting`s to support 2-step + node setting save & apply. `NodeInput` accordingly has `as_is` and `to_be`. + `to_be` field means the setting is only saved to the database. Once the + setting is applied, the values in `to_be` field is moved to `as_is` field. + - `graphql::event::convert_sensors` uses `Node`'s `as_is` value, to retrieve + the hostnames of the sensors. This function is called by GraphQL APIs of + `EventQuery` and `EventGroupQuery`. + - `graphql::node::crud::get_node_settings` uses `Node`'s `as_is` value. + - `node_status_list` API uses `hostname` and `name` values from `Node`'s + `as_is` field, prioritizing their presence. If `as_is` is not present, it + resorts to using `to_be` values instead. ### Fixed diff --git a/Cargo.toml b/Cargo.toml index e84eb65c..15a4d679 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,8 @@ ipnet = { version = "2", features = ["serde"] } jsonwebtoken = "9" lazy_static = "1" num-traits = "0.2" -oinq = { git = "https://github.com/petabi/oinq.git", tag = "0.9.1" } +# oinq = { git = "https://github.com/petabi/oinq.git", tag = "0.9.1" } +oinq = { git = "https://github.com/sophie-cluml/oinq.git", branch = "sophie/enum-configs" } # pita in discussion https://github.com/petabi/oinq/issues/58 reqwest = { version = "0.11", default-features = false, features = [ "rustls-tls-native-roots", ] } @@ -39,6 +40,7 @@ tracing = "0.1" vinum = { git = "https://github.com/vinesystems/vinum.git", tag = "1.0.3" } [dev-dependencies] +assert-json-diff = "2.0.2" config = { version = "0.13", features = ["toml"], default-features = false } futures = "0.3" tempfile = "3" diff --git a/src/graphql.rs b/src/graphql.rs index f541e9b6..707c31ed 100644 --- a/src/graphql.rs +++ b/src/graphql.rs @@ -83,7 +83,7 @@ pub trait AgentManager: Send + Sync { async fn broadcast_trusted_user_agent_list(&self, _list: &[u8]) -> Result<(), anyhow::Error>; async fn online_apps_by_host_id( &self, - ) -> Result>, anyhow::Error>; + ) -> Result>, anyhow::Error>; // (hostname, (agent_key, app_name)) async fn send_and_recv(&self, key: &str, msg: &[u8]) -> Result, anyhow::Error>; async fn update_traffic_filter_rules( &self, @@ -554,7 +554,7 @@ impl AgentManager for MockAgentManager { Ok(HashMap::new()) } async fn send_and_recv(&self, _key: &str, _msg: &[u8]) -> Result, anyhow::Error> { - unimplemented!() + Ok(vec![]) } async fn update_traffic_filter_rules( &self, @@ -575,6 +575,11 @@ struct TestSchema { #[cfg(test)] impl TestSchema { async fn new() -> Self { + let agent_manager: BoxedAgentManager = Box::new(MockAgentManager {}); + Self::new_with(agent_manager).await + } + + async fn new_with(agent_manager: BoxedAgentManager) -> Self { use self::account::set_initial_admin_password; let db_dir = tempfile::tempdir().unwrap(); @@ -582,7 +587,6 @@ impl TestSchema { let store = Store::new(db_dir.path(), backup_dir.path()).unwrap(); let _ = set_initial_admin_password(&store); let store = Arc::new(RwLock::new(store)); - let agent_manager: BoxedAgentManager = Box::new(MockAgentManager {}); let schema = Schema::build( Query::default(), Mutation::default(), diff --git a/src/graphql/event.rs b/src/graphql/event.rs index 1d11f1fa..790719d2 100644 --- a/src/graphql/event.rs +++ b/src/graphql/event.rs @@ -932,11 +932,15 @@ fn convert_sensors(map: &IndexedMap, sensors: &[ID]) -> anyhow::Result for Nic { } } -#[derive(Clone, Deserialize, Serialize, SimpleObject)] +#[derive(Clone, Deserialize, Serialize, SimpleObject, PartialEq)] #[graphql(complex)] -#[allow(clippy::struct_excessive_bools)] -pub(super) struct Node { - #[graphql(skip)] - id: u32, +#[allow(clippy::struct_excessive_bools, clippy::module_name_repetitions)] +pub struct NodeSetting { name: String, #[graphql(skip)] customer_id: u32, @@ -133,6 +131,7 @@ pub(super) struct Node { txt: bool, smtp_eml: bool, ftp: bool, + giganto: bool, #[graphql(skip)] giganto_ingestion_ip: Option, @@ -165,11 +164,31 @@ pub(super) struct Node { sensors: bool, sensor_list: HashMap, - +} +#[derive(Clone, Deserialize, Serialize, SimpleObject, PartialEq)] +#[graphql(complex)] +pub(super) struct Node { + #[graphql(skip)] + id: u32, creation_time: DateTime, + pub as_is: Option, + pub to_be: Option, +} - apply_target_id: Option, - apply_in_progress: bool, +impl IndexedMapUpdate for Node { + type Entry = Self; + + fn key(&self) -> Option<&[u8]> { + Some(Indexable::key(self)) + } + + fn apply(&self, _value: Self::Entry) -> anyhow::Result { + Ok(self.clone()) + } + + fn verify(&self, value: &Self::Entry) -> bool { + self == value + } } #[ComplexObject] @@ -177,7 +196,32 @@ impl Node { async fn id(&self) -> ID { ID(self.id.to_string()) } +} +impl Node { + fn as_is_hostname_or_fallback(&self) -> &str { + if let Some(as_is) = &self.as_is { + &as_is.hostname + } else if let Some(to_be) = &self.to_be { + &to_be.hostname + } else { + panic!("Both `as_is` and `to_be` are `None`"); + } + } + + fn as_is_name_or_fallback(&self) -> &str { + if let Some(as_is) = &self.as_is { + &as_is.name + } else if let Some(to_be) = &self.to_be { + &to_be.name + } else { + panic!("Both `as_is` and `to_be` are `None`"); + } + } +} + +#[ComplexObject] +impl NodeSetting { async fn customer_id(&self) -> ID { ID(self.customer_id.to_string()) } @@ -224,7 +268,13 @@ impl NodeTotalCount { impl Indexable for Node { fn key(&self) -> &[u8] { - self.name.as_bytes() + if let Some(as_is) = &self.as_is { + as_is.name.as_bytes() + } else if let Some(to_be) = &self.to_be { + to_be.name.as_bytes() + } else { + panic!("Both `as_is` and `to_be` are `None`"); + } } fn value(&self) -> Vec { @@ -399,214 +449,3 @@ impl From for Process { } } } - -#[cfg(test)] - -mod tests { - use crate::graphql::TestSchema; - - #[tokio::test] - async fn test_node() { - let schema = TestSchema::new().await; - - let res = schema.execute(r#"{nodeList{totalCount}}"#).await; - assert_eq!(res.data.to_string(), r#"{nodeList: {totalCount: 0}}"#); - - let res = schema - .execute( - r#"mutation { - insertNode( - name: "admin node", - customerId: 0, - description: "This is the admin node running review.", - hostname: "admin.aice-security.com", - review: true, - reviewPort: 38390, - reviewWebPort: 8443, - piglet: false, - pigletGigantoIp: null, - pigletGigantoPort: null, - pigletReviewIp: null, - pigletReviewPort: null, - savePackets: false, - http: false, - office: false, - exe: false, - pdf: false, - html: false, - txt: false, - smtpEml: false, - ftp: false, - giganto: false, - gigantoIngestionIp: null, - gigantoIngestionPort: null, - gigantoPublishIp: null, - gigantoPublishPort: null, - gigantoGraphqlIp: null, - gigantoGraphqlPort: null, - retentionPeriod: null, - reconverge: false, - reconvergeReviewIp: null, - reconvergeReviewPort: null, - reconvergeGigantoIp: null, - reconvergeGigantoPort: null, - hog: false, - hogReviewIp: null, - hogReviewPort: null, - hogGigantoIp: null, - hogGigantoPort: null, - protocols: false, - protocolList: {}, - sensors: false, - sensorList: {}, - ) - }"#, - ) - .await; - assert_eq!(res.data.to_string(), r#"{insertNode: "0"}"#); - - let res = schema - .execute( - r#"mutation { - updateNode( - id: "0" - old: { - name: "admin node", - customerId: 0, - description: "This is the admin node running review.", - hostname: "admin.aice-security.com", - review: true, - reviewPort: 38390, - reviewWebPort: 8443, - piglet: false, - pigletGigantoIp: null, - pigletGigantoPort: null, - pigletReviewIp: null, - pigletReviewPort: null, - savePackets: false, - http: false, - office: false, - exe: false, - pdf: false, - html: false, - txt: false, - smtpEml: false, - ftp: false, - giganto: false, - gigantoIngestionIp: null, - gigantoIngestionPort: null, - gigantoPublishIp: null, - gigantoPublishPort: null, - gigantoGraphqlIp: null, - gigantoGraphqlPort: null, - retentionPeriod: null, - reconverge: false, - reconvergeReviewIp: null, - reconvergeReviewPort: null, - reconvergeGigantoIp: null, - reconvergeGigantoPort: null, - hog: false, - hogReviewIp: null, - hogReviewPort: null, - hogGigantoIp: null, - hogGigantoPort: null, - protocols: false, - protocolList: {}, - sensors: false, - sensorList: {}, - }, - new: { - name: "AdminNode", - customerId: 0, - description: "This is the admin node running review.", - hostname: "admin.aice-security.com", - review: true, - reviewPort: 38391, - reviewWebPort: 8443, - piglet: false, - pigletGigantoIp: null, - pigletGigantoPort: null, - pigletReviewIp: null, - pigletReviewPort: null, - savePackets: false, - http: false, - office: false, - exe: false, - pdf: false, - html: false, - txt: false, - smtpEml: false, - ftp: false, - giganto: false, - gigantoIngestionIp: null, - gigantoIngestionPort: null, - gigantoPublishIp: null, - gigantoPublishPort: null, - gigantoGraphqlIp: null, - gigantoGraphqlPort: null, - retentionPeriod: null, - reconverge: false, - reconvergeReviewIp: null, - reconvergeReviewPort: null, - reconvergeGigantoIp: null, - reconvergeGigantoPort: null, - hog: false, - hogReviewIp: null, - hogReviewPort: null, - hogGigantoIp: null, - hogGigantoPort: null, - protocols: false, - protocolList: {}, - sensors: false, - sensorList: {}, - } - ) - }"#, - ) - .await; - assert_eq!(res.data.to_string(), r#"{updateNode: "0"}"#); - - let res = schema - .execute( - r#"query { - nodeList(first: 1) { - nodes { - name - } - } - }"#, - ) - .await; - assert_eq!( - res.data.to_string(), - r#"{nodeList: {nodes: [{name: "AdminNode"}]}}"# - ); - - let res = schema - .execute( - r#"query { - nodeStatusList(first: 1) { - nodes { - name - cpuUsage - review - } - } - }"#, - ) - .await; - assert_eq!( - res.data.to_string(), - r#"{nodeStatusList: {nodes: [{name: "AdminNode",cpuUsage: null,review: null}]}}"# - ); - - let res = schema - .execute( - r#"mutation { - removeNodes(ids: ["0"]) - }"#, - ) - .await; - assert_eq!(res.data.to_string(), r#"{removeNodes: ["AdminNode"]}"#); - } -} diff --git a/src/graphql/node/control.rs b/src/graphql/node/control.rs index aee36eb7..b0078a5b 100644 --- a/src/graphql/node/control.rs +++ b/src/graphql/node/control.rs @@ -1,10 +1,28 @@ +use std::{ + collections::HashMap, + net::{IpAddr, SocketAddr}, +}; + +use crate::graphql::{customer::broadcast_customer_networks, get_customer_networks}; + use super::{ super::{BoxedAgentManager, Role, RoleGuard}, - NodeControlMutation, + Node, NodeControlMutation, NodeSetting, }; -use async_graphql::{Context, Object, Result}; +use anyhow::bail; +use async_graphql::{Context, Object, Result, ID}; use bincode::Options; -use oinq::RequestCode; +use oinq::{ + request::{HogConfig, PigletConfig, ReconvergeConfig}, + RequestCode, +}; +use review_database::Indexed; +use tracing::{error, info}; + +const MAX_SET_CONFIG_TRY_COUNT: i32 = 3; +const PIGLET_APP_NAME: &str = "piglet"; +const HOG_APP_NAME: &str = "hog"; +const RECONVERGE_APP_NAME: &str = "reconverge"; #[Object] impl NodeControlMutation { @@ -40,4 +58,805 @@ impl NodeControlMutation { ) } } + + #[graphql(guard = "RoleGuard::new(Role::SystemAdministrator) + .or(RoleGuard::new(Role::SecurityAdministrator))")] + async fn apply_node(&self, ctx: &Context<'_>, id: ID) -> async_graphql::Result { + let i = id.as_str().parse::().map_err(|_| "invalid ID")?; + + let node: Node = { + let store = crate::graphql::get_store(ctx).await?; + let node_map = store.node_map(); + let node = node_map + .get_by_id(i)? + .ok_or_else(|| async_graphql::Error::new(format!("Node with ID {i} not found",)))?; + bincode::DefaultOptions::new().deserialize(node.as_ref())? + }; + if node.to_be.is_none() { + return Err("Cannot apply when `to_be` is None".into()); + } + + let agents = ctx.data::()?; + if send_set_config_requests(agents, &node).await? { + update_node_data(ctx, i, &node).await?; + if let (true, customer_id) = should_broadcast_customer_change(&node) { + broadcast_customer_change(customer_id, ctx).await?; + } + Ok(id) + } else { + Err("Failed to apply node setting".into()) + } + } +} + +async fn send_set_config_requests(agents: &BoxedAgentManager, node: &Node) -> anyhow::Result { + let online_apps = agents.online_apps_by_host_id().await?; + + let mut result_combined: bool = true; + + if let Some(to_be) = &node.to_be { + let to_be_hostname = &to_be.hostname; + + for (app_name, config) in target_app_configs(to_be)? { + let agent_key = find_agent_key(&online_apps, to_be_hostname, app_name)?; + let result = send_set_config_request(agents, agent_key.as_str(), &config).await?; + result_combined = result_combined && result; + } + } else { + bail!("to_be is None"); + } + + Ok(result_combined) +} + +async fn send_set_config_request( + agents: &BoxedAgentManager, + agent_key: &str, + config: &oinq::Configs, +) -> anyhow::Result { + let set_config_request: u32 = RequestCode::SetConfig.into(); + let mut set_config_msg = bincode::serialize(&set_config_request)?; + set_config_msg.extend(bincode::DefaultOptions::new().serialize(config)?); + + for _ in 0..MAX_SET_CONFIG_TRY_COUNT { + let set_config_response = agents.send_and_recv(agent_key, &set_config_msg).await; + + if let Ok(response) = set_config_response { + if response.is_empty() { + return Ok(true); + } + } + + info!("set_config_response is not Ok(true). retrying"); + } + + Ok(false) +} + +fn target_app_configs(to_be: &NodeSetting) -> anyhow::Result> { + let mut configurations = Vec::new(); + + if to_be.piglet { + configurations.push((PIGLET_APP_NAME, build_piglet_config(to_be)?)); + } + + if to_be.hog { + configurations.push((HOG_APP_NAME, build_hog_config(to_be)?)); + } + + if to_be.reconverge { + configurations.push((RECONVERGE_APP_NAME, build_reconverge_config(to_be)?)); + } + + Ok(configurations) +} + +fn find_agent_key( + online_apps: &HashMap>, + hostname: &str, + app_name: &str, +) -> anyhow::Result { + online_apps + .get(hostname) + .and_then(|v| v.iter().find(|(_, name)| *name == app_name)) + .map(|(k, _)| k.clone()) + .ok_or_else(|| anyhow::anyhow!("{} agent not found", app_name)) +} + +fn build_piglet_config(to_be: &NodeSetting) -> anyhow::Result { + let review_address = build_socket_address(to_be.piglet_review_ip, to_be.piglet_review_port) + .ok_or_else(|| anyhow::anyhow!("piglet review address is not set"))?; + + let giganto_address = build_socket_address(to_be.piglet_giganto_ip, to_be.piglet_giganto_port); + let log_options = build_log_options(to_be); + let http_file_types = build_http_file_types(to_be); + + Ok(oinq::Configs::Piglet(PigletConfig { + review_address, + giganto_address, + log_options, + http_file_types, + })) +} + +fn build_hog_config(to_be: &NodeSetting) -> anyhow::Result { + let review_address = build_socket_address(to_be.hog_review_ip, to_be.hog_review_port) + .ok_or_else(|| anyhow::anyhow!("hog review address is not set"))?; + let giganto_address = build_socket_address(to_be.hog_giganto_ip, to_be.hog_giganto_port); + let active_protocols = build_active_protocols(to_be); + let active_sources = build_active_sources(to_be); + + Ok(oinq::Configs::Hog(HogConfig { + review_address, + giganto_address, + active_protocols, + active_sources, + })) +} + +fn build_log_options(to_be: &NodeSetting) -> Option> { + let condition_to_log_option = [ + (to_be.save_packets, "dump"), + (to_be.http, "http"), + (to_be.smtp_eml, "eml"), + (to_be.ftp, "ftp"), + ]; + + let log_options = condition_to_log_option + .iter() + .filter_map(|(cond, value)| { + if *cond { + Some((*value).to_string()) + } else { + None + } + }) + .collect::>(); + + if log_options.is_empty() { + None + } else { + Some(log_options) + } +} + +fn build_http_file_types(to_be: &NodeSetting) -> Option> { + let condition_to_http_file_types = [ + (to_be.office, "office"), + (to_be.exe, "exe"), + (to_be.pdf, "pdf"), + (to_be.html, "html"), + (to_be.txt, "txt"), + ]; + + let http_file_types = condition_to_http_file_types + .iter() + .filter_map(|(cond, value)| { + if *cond { + Some((*value).to_string()) + } else { + None + } + }) + .collect::>(); + + if http_file_types.is_empty() { + None + } else { + Some(http_file_types) + } +} + +fn build_active_protocols(to_be: &NodeSetting) -> Option> { + if to_be.protocols { + Some( + to_be + .protocol_list + .iter() + .filter(|(_, v)| **v) + .map(|(k, _)| k.clone()) + .collect::>(), + ) + } else { + None + } +} + +fn build_active_sources(to_be: &NodeSetting) -> Option> { + if to_be.sensors { + Some( + to_be + .sensor_list + .iter() + .filter(|(_, v)| **v) + .map(|(k, _)| k.clone()) + .collect::>(), + ) + } else { + None + } +} + +fn build_reconverge_config(to_be: &NodeSetting) -> anyhow::Result { + let review_address = + build_socket_address(to_be.reconverge_review_ip, to_be.reconverge_review_port) + .ok_or_else(|| anyhow::anyhow!("reconverge review address is not set"))?; + + let giganto_address = + build_socket_address(to_be.reconverge_giganto_ip, to_be.reconverge_giganto_port); + + Ok(oinq::Configs::Reconverge(ReconvergeConfig { + review_address, + giganto_address, + })) +} + +fn build_socket_address(ip: Option, port: Option) -> Option { + ip.and_then(|ip| port.map(|port| SocketAddr::new(ip, port))) +} + +async fn update_node_data(ctx: &Context<'_>, i: u32, node: &Node) -> Result<()> { + let mut new_node = node.clone(); + new_node.as_is = new_node.to_be.take(); + + let store = crate::graphql::get_store(ctx).await?; + let map = store.node_map(); + Ok(map.update(i, node, &new_node)?) +} + +fn should_broadcast_customer_change(node: &Node) -> (bool, u32) { + match (node.as_is.as_ref(), node.to_be.as_ref()) { + (None, Some(to_be)) => (to_be.review, to_be.customer_id), + (Some(as_is), Some(to_be)) => ( + to_be.review && to_be.customer_id != as_is.customer_id, + to_be.customer_id, + ), + (_, None) => panic!("When `to_be` is None, this function should not be called"), + } +} + +async fn broadcast_customer_change(customer_id: u32, ctx: &Context<'_>) -> Result<()> { + let store = crate::graphql::get_store(ctx).await?; + let networks = get_customer_networks(&store, customer_id)?; + if let Err(e) = broadcast_customer_networks(ctx, &networks).await { + error!("failed to broadcast internal networks. {e:?}"); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use assert_json_diff::assert_json_eq; + use async_trait::async_trait; + use ipnet::IpNet; + use serde_json::json; + use tokio::sync::mpsc::{self, Sender}; + + use crate::graphql::{AgentManager, BoxedAgentManager, TestSchema}; + + #[tokio::test] + async fn test_node_apply() { + let schema = TestSchema::new().await; + + // check empty + let res = schema.execute(r#"{nodeList{totalCount}}"#).await; + assert_eq!(res.data.to_string(), r#"{nodeList: {totalCount: 0}}"#); + + // insert node + let res = schema + .execute( + r#"mutation { + insertNode( + name: "admin node", + customerId: 0, + description: "This is the admin node running review.", + hostname: "admin.aice-security.com", + review: true, + reviewPort: 1111, + reviewWebPort: 1112, + piglet: false, + pigletGigantoIp: null, + pigletGigantoPort: null, + pigletReviewIp: null, + pigletReviewPort: null, + savePackets: false, + http: false, + office: false, + exe: false, + pdf: false, + html: false, + txt: false, + smtpEml: false, + ftp: false, + giganto: false, + gigantoIngestionIp: null, + gigantoIngestionPort: null, + gigantoPublishIp: null, + gigantoPublishPort: null, + gigantoGraphqlIp: null, + gigantoGraphqlPort: null, + retentionPeriod: null, + reconverge: false, + reconvergeReviewIp: null, + reconvergeReviewPort: null, + reconvergeGigantoIp: null, + reconvergeGigantoPort: null, + hog: false, + hogReviewIp: null, + hogReviewPort: null, + hogGigantoIp: null, + hogGigantoPort: null, + protocols: false, + protocolList: {}, + sensors: false, + sensorList: {}, + ) + }"#, + ) + .await; + assert_eq!(res.data.to_string(), r#"{insertNode: "0"}"#); + + // check node list after insert + let res = schema + .execute( + r#"query { + nodeList(first: 10) { + totalCount + edges { + node { + id + asIs { + name + customerId + description + hostname + review + reviewPort + reviewWebPort + piglet + giganto + reconverge + hog + protocolList + sensorList + } + toBe { + name + customerId + description + hostname + review + reviewPort + reviewWebPort + piglet + giganto + reconverge + hog + protocolList + sensorList + } + } + } + } + }"#, + ) + .await; + assert_json_eq!( + res.data.into_json().unwrap(), + json!({ + "nodeList": { + "totalCount": 1, + "edges": [ + { + "node": { + "id": "0", + "asIs": null, + "toBe": { + "name": "admin node", + "customerId": "0", + "description": "This is the admin node running review.", + "hostname": "admin.aice-security.com", + "review": true, + "reviewPort": 1111, + "reviewWebPort": 1112, + "piglet": false, + "giganto": false, + "reconverge": false, + "hog": false, + "protocolList": {}, + "sensorList": {}, + }, + } + } + ] + } + }) + ); + + // apply node + let res = schema + .execute( + r#"mutation { + applyNode(id: "0") + }"#, + ) + .await; + assert_eq!(res.data.to_string(), r#"{applyNode: "0"}"#); + + // check node list after apply + let res = schema + .execute( + r#"query { + nodeList(first: 10) { + totalCount + edges { + node { + id + asIs { + name + customerId + description + hostname + review + reviewPort + reviewWebPort + piglet + giganto + reconverge + hog + protocolList + sensorList + } + toBe { + name + customerId + description + hostname + review + reviewPort + reviewWebPort + piglet + giganto + reconverge + hog + protocolList + sensorList + } + } + } + } + }"#, + ) + .await; + assert_json_eq!( + res.data.into_json().unwrap(), + json!({ + "nodeList": { + "totalCount": 1, + "edges": [ + { + "node": { + "id": "0", + "asIs": { + "name": "admin node", + "customerId": "0", + "description": "This is the admin node running review.", + "hostname": "admin.aice-security.com", + "review": true, + "reviewPort": 1111, + "reviewWebPort": 1112, + "piglet": false, + "giganto": false, + "reconverge": false, + "hog": false, + "protocolList": {}, + "sensorList": {}, + }, + "toBe": null, + } + } + ] + } + }) + ); + } + + struct MockAgentManager { + pub online_apps_by_host_id: HashMap>, + pub send_result_checker: Sender, + } + + impl MockAgentManager { + pub async fn insert_result(&self, result_key: &str) { + self.send_result_checker + .send(result_key.to_string()) + .await + .expect("send result failed"); + } + } + + #[async_trait] + impl AgentManager for MockAgentManager { + async fn broadcast_to_crusher(&self, _msg: &[u8]) -> Result<(), anyhow::Error> { + unimplemented!() + } + async fn broadcast_trusted_domains(&self) -> Result<(), anyhow::Error> { + unimplemented!() + } + async fn broadcast_trusted_user_agent_list( + &self, + _list: &[u8], + ) -> Result<(), anyhow::Error> { + unimplemented!() + } + async fn broadcast_internal_networks( + &self, + _networks: &[u8], + ) -> Result, anyhow::Error> { + Ok(vec!["hog@anyhost".to_string()]) + } + async fn broadcast_allow_networks( + &self, + _networks: &[u8], + ) -> Result, anyhow::Error> { + unimplemented!() + } + async fn broadcast_block_networks( + &self, + _networks: &[u8], + ) -> Result, anyhow::Error> { + unimplemented!() + } + async fn online_apps_by_host_id( + &self, + ) -> Result>, anyhow::Error> { + Ok(self.online_apps_by_host_id.clone()) + } + async fn send_and_recv(&self, key: &str, _msg: &[u8]) -> Result, anyhow::Error> { + self.insert_result(key).await; + Ok(vec![]) + } + async fn update_traffic_filter_rules( + &self, + _key: &str, + _rules: &[(IpNet, Option>, Option>)], + ) -> Result<(), anyhow::Error> { + unimplemented!() + } + } + + fn insert_apps(host: &str, apps: &[&str], map: &mut HashMap>) { + let entries = apps + .iter() + .map(|&app| (format!("{}@{}", app, host), app.to_string())) + .collect(); + map.insert(host.to_string(), entries); + } + + #[tokio::test] + async fn test_node_apply_with_online_apps() { + let mut online_apps_by_host_id = HashMap::new(); + insert_apps("host1", &["review", "piglet"], &mut online_apps_by_host_id); + insert_apps( + "host2", + &["giganto", "hog", "reconverge"], + &mut online_apps_by_host_id, + ); + + let (send_result_checker, mut recv_result_checker) = mpsc::channel(10); + + let agent_manager: BoxedAgentManager = Box::new(MockAgentManager { + online_apps_by_host_id, + send_result_checker, + }); + + let schema = TestSchema::new_with(agent_manager).await; + + // check empty + let res = schema.execute(r#"{nodeList{totalCount}}"#).await; + assert_eq!(res.data.to_string(), r#"{nodeList: {totalCount: 0}}"#); + + // insert node with review, piglet + let res = schema + .execute( + r#"mutation { + insertNode( + name: "node1", + customerId: 0, + description: "This is the admin node running review.", + hostname: "host1", + review: true, + reviewPort: 1111, + reviewWebPort: 1112, + piglet: true, + pigletGigantoIp: "0.0.0.0", + pigletGigantoPort: 5555, + pigletReviewIp: "0.0.0.0", + pigletReviewPort: 1111, + savePackets: false, + http: false, + office: false, + exe: false, + pdf: false, + html: false, + txt: false, + smtpEml: false, + ftp: false, + giganto: false, + gigantoIngestionIp: null, + gigantoIngestionPort: null, + gigantoPublishIp: null, + gigantoPublishPort: null, + gigantoGraphqlIp: null, + gigantoGraphqlPort: null, + retentionPeriod: null, + reconverge: false, + reconvergeReviewIp: null, + reconvergeReviewPort: null, + reconvergeGigantoIp: null, + reconvergeGigantoPort: null, + hog: false, + hogReviewIp: null, + hogReviewPort: null, + hogGigantoIp: null, + hogGigantoPort: null, + protocols: false, + protocolList: {}, + sensors: false, + sensorList: {}, + ) + }"#, + ) + .await; + assert_eq!(res.data.to_string(), r#"{insertNode: "0"}"#); + + // check node list after insert + let res = schema + .execute( + r#"query { + nodeList(first: 10) { + totalCount + edges { + node { + id + asIs { + name + customerId + description + hostname + review + piglet + giganto + reconverge + hog + } + toBe { + name + customerId + description + hostname + review + piglet + giganto + reconverge + hog + } + } + } + } + }"#, + ) + .await; + assert_json_eq!( + res.data.into_json().unwrap(), + json!({ + "nodeList": { + "totalCount": 1, + "edges": [ + { + "node": { + "id": "0", + "asIs": null, + "toBe": { + "name": "node1", + "customerId": "0", + "description": "This is the admin node running review.", + "hostname": "host1", + "review": true, + "piglet": true, + "giganto": false, + "reconverge": false, + "hog": false, + }, + } + } + ] + } + }) + ); + + // apply node + let res = schema + .execute( + r#"mutation { + applyNode(id: "0") + }"#, + ) + .await; + assert_eq!(res.data.to_string(), r#"{applyNode: "0"}"#); + + // check node list after apply + let res = schema + .execute( + r#"query { + nodeList(first: 10) { + totalCount + edges { + node { + id + asIs { + name + customerId + description + hostname + review + piglet + giganto + reconverge + hog + } + toBe { + name + customerId + description + hostname + review + piglet + giganto + reconverge + hog + } + } + } + } + }"#, + ) + .await; + assert_json_eq!( + res.data.into_json().unwrap(), + json!({ + "nodeList": { + "totalCount": 1, + "edges": [ + { + "node": { + "id": "0", + "asIs": { + "name": "node1", + "customerId": "0", + "description": "This is the admin node running review.", + "hostname": "host1", + "review": true, + "piglet": true, + "giganto": false, + "reconverge": false, + "hog": false, + }, + "toBe": null, + } + } + ] + } + }) + ); + + let mut result_buffer: Vec = Vec::with_capacity(2); + let size = recv_result_checker.recv_many(&mut result_buffer, 2).await; + assert_eq!(size, 1); + assert!(result_buffer.contains(&"piglet@host1".to_string())); + assert!(!result_buffer.contains(&"review@host1".to_string())); + } } diff --git a/src/graphql/node/crud.rs b/src/graphql/node/crud.rs index bedbb39a..6724acdd 100644 --- a/src/graphql/node/crud.rs +++ b/src/graphql/node/crud.rs @@ -4,8 +4,8 @@ use crate::graphql::{customer::broadcast_customer_networks, get_customer_network use super::{ super::{Role, RoleGuard}, - Node, NodeInput, NodeMutation, NodeQuery, NodeTotalCount, PortNumber, ServerAddress, - ServerPort, Setting, + Node, NodeInput, NodeMutation, NodeQuery, NodeSetting, NodeTotalCount, PortNumber, + ServerAddress, ServerPort, Setting, }; use async_graphql::{ connection::{query, Connection, EmptyFields}, @@ -127,143 +127,88 @@ impl NodeMutation { .as_str() .parse::() .map_err(|_| "invalid customer ID")?; - let piglet_giganto_ip = if let Some(ip) = piglet_giganto_ip { - Some( - ip.as_str() - .parse::() - .map_err(|_| "invalid IP address: storage")?, - ) - } else { - None - }; - let piglet_review_ip = if let Some(ip) = piglet_review_ip { - Some( - ip.as_str() - .parse::() - .map_err(|_| "invalid IP address: administration")?, - ) - } else { - None - }; - let giganto_ingestion_ip = if let Some(ip) = giganto_ingestion_ip { - Some( - ip.as_str() - .parse::() - .map_err(|_| "invalid IP address: receiving")?, - ) - } else { - None - }; - let giganto_publish_ip = if let Some(ip) = giganto_publish_ip { - Some( - ip.as_str() - .parse::() - .map_err(|_| "invalid IP address: sending")?, - ) - } else { - None - }; - let giganto_graphql_ip = if let Some(ip) = giganto_graphql_ip { - Some( - ip.as_str() - .parse::() - .map_err(|_| "invalid IP address: web")?, - ) - } else { - None - }; - let reconverge_review_ip = if let Some(ip) = reconverge_review_ip { - Some( - ip.as_str() - .parse::() - .map_err(|_| "invalid IP address: administration")?, - ) - } else { - None - }; - let reconverge_giganto_ip = if let Some(ip) = reconverge_giganto_ip { - Some( - ip.as_str() - .parse::() - .map_err(|_| "invalid IP address: storage")?, - ) - } else { - None - }; - let hog_review_ip = if let Some(ip) = hog_review_ip { - Some( - ip.as_str() - .parse::() - .map_err(|_| "invalid IP address: administration")?, - ) - } else { - None - }; - let hog_giganto_ip = if let Some(ip) = hog_giganto_ip { - Some( - ip.as_str() - .parse::() - .map_err(|_| "invalid IP address: storage")?, - ) - } else { - None - }; let value = Node { id: u32::MAX, - customer_id, - name, - description, - hostname, - - review, - review_port, - review_web_port, - - piglet, - piglet_giganto_ip, - piglet_giganto_port, - piglet_review_ip, - piglet_review_port, - save_packets, - http, - office, - exe, - pdf, - html, - txt, - smtp_eml, - ftp, - - giganto, - giganto_ingestion_ip, - giganto_ingestion_port, - giganto_publish_ip, - giganto_publish_port, - giganto_graphql_ip, - giganto_graphql_port, - retention_period, - - reconverge, - reconverge_review_ip, - reconverge_review_port, - reconverge_giganto_ip, - reconverge_giganto_port, - - hog, - hog_review_ip, - hog_review_port, - hog_giganto_ip, - hog_giganto_port, - protocols, - protocol_list, - sensors, - sensor_list, - creation_time: Utc::now(), - - apply_target_id: None, - apply_in_progress: false, + as_is: None, + to_be: Some(NodeSetting { + name, + customer_id, + description, + hostname, + + review, + review_port, + review_web_port, + + piglet, + piglet_giganto_ip: parse_str_to_ip( + piglet_giganto_ip.as_deref(), + "invalid IP address: storage", + )?, + piglet_giganto_port, + piglet_review_ip: parse_str_to_ip( + piglet_review_ip.as_deref(), + "invalid IP address: administration", + )?, + piglet_review_port, + save_packets, + http, + office, + exe, + pdf, + html, + txt, + smtp_eml, + ftp, + + giganto, + giganto_ingestion_ip: parse_str_to_ip( + giganto_ingestion_ip.as_deref(), + "invalid IP address: receiving", + )?, + giganto_ingestion_port, + giganto_publish_ip: parse_str_to_ip( + giganto_publish_ip.as_deref(), + "invalid IP address: sending", + )?, + giganto_publish_port, + giganto_graphql_ip: parse_str_to_ip( + giganto_graphql_ip.as_deref(), + "invalid IP address: web", + )?, + giganto_graphql_port, + retention_period, + + reconverge, + reconverge_review_ip: parse_str_to_ip( + reconverge_review_ip.as_deref(), + "invalid IP address: administration", + )?, + reconverge_review_port, + reconverge_giganto_ip: parse_str_to_ip( + reconverge_giganto_ip.as_deref(), + "invalid IP address: storage", + )?, + reconverge_giganto_port, + + hog, + hog_review_ip: parse_str_to_ip( + hog_review_ip.as_deref(), + "invalid IP address: administration", + )?, + hog_review_port, + hog_giganto_ip: parse_str_to_ip( + hog_giganto_ip.as_deref(), + "invalid IP address: storage", + )?, + hog_giganto_port, + protocols, + protocol_list, + sensors, + sensor_list, + }), }; let id = map.insert(value)?; (id, customer_id) @@ -317,36 +262,45 @@ impl NodeMutation { old: NodeInput, new: NodeInput, ) -> Result { - let (review, customer_is_changed, customer_id) = { - let i = id.as_str().parse::().map_err(|_| "invalid ID")?; - - let store = crate::graphql::get_store(ctx).await?; - let map = store.node_map(); - map.update(i, &old, &new)?; - - ( - new.review, - new.customer_id != old.customer_id, - new.customer_id, - ) - }; - - if review && customer_is_changed { - if let Ok(customer_id) = customer_id.as_str().parse::() { - let store = crate::graphql::get_store(ctx).await?; - - if let Ok(networks) = get_customer_networks(&store, customer_id) { - if let Err(e) = broadcast_customer_networks(ctx, &networks).await { - error!("failed to broadcast internal networks. {e:?}"); - } - } - } + if !validate_update_input(&old, &new) { + return Err("Invalid combination of old and new values".into()); } + let i = id.as_str().parse::().map_err(|_| "invalid ID")?; + let store = crate::graphql::get_store(ctx).await?; + let map = store.node_map(); + map.update(i, &old, &new)?; Ok(id) } } +fn validate_update_input(old: &NodeInput, new: &NodeInput) -> bool { + matches!( + ( + old.as_is.as_ref(), + old.to_be.as_ref(), + new.as_is.as_ref(), + new.to_be.as_ref(), + ), + (None, Some(_), None, Some(_)) + | (Some(_), None, Some(_), Some(_)) + | (Some(_), Some(_), Some(_), _) + ) +} + +fn parse_str_to_ip<'em>( + ip_str: Option<&str>, + error_message: &'em str, +) -> Result, &'em str> { + match ip_str { + Some(ip_str) => ip_str + .parse::() + .map(Some) + .map_err(|_| error_message), + None => Ok(None), + } +} + async fn load( ctx: &Context<'_>, after: Option, @@ -364,13 +318,16 @@ async fn load( /// # Errors /// /// Returns an error if the node settings could not be retrieved. +#[allow(clippy::too_many_lines)] pub fn get_node_settings(db: &Store) -> Result> { let map = db.node_map(); let mut output = Vec::new(); for (_key, value) in map.iter_forward()? { let node = bincode::DefaultOptions::new() .deserialize::(value.as_ref()) - .map_err(|_| "invalid value in database")?; + .map_err(|_| "invalid value in database")? + .as_is + .ok_or("node is not yet fully configured")?; let piglet: Option = if node.piglet { Some(ServerAddress { @@ -483,9 +440,388 @@ pub fn get_customer_id_of_review_host(db: &Store) -> Result> { let node = bincode::DefaultOptions::new() .deserialize::(value.as_ref()) .map_err(|_| "invalid value in database")?; - if node.review { - return Ok(Some(node.customer_id)); + + if let Some(as_is) = &node.as_is { + if as_is.review { + return Ok(Some(as_is.customer_id)); + } } } Ok(None) } + +#[cfg(test)] +mod tests { + use crate::graphql::TestSchema; + use assert_json_diff::assert_json_eq; + use serde_json::json; + + // test scenario : insert node -> update node with different name -> remove node + #[tokio::test] + async fn node_crud() { + let schema = TestSchema::new().await; + + // check empty + let res = schema.execute(r#"{nodeList{totalCount}}"#).await; + assert_eq!(res.data.to_string(), r#"{nodeList: {totalCount: 0}}"#); + + // insert node + let res = schema + .execute( + r#"mutation { + insertNode( + name: "admin node", + customerId: 0, + description: "This is the admin node running review.", + hostname: "admin.aice-security.com", + review: true, + reviewPort: 1111, + reviewWebPort: 1112, + piglet: false, + pigletGigantoIp: null, + pigletGigantoPort: null, + pigletReviewIp: null, + pigletReviewPort: null, + savePackets: false, + http: false, + office: false, + exe: false, + pdf: false, + html: false, + txt: false, + smtpEml: false, + ftp: false, + giganto: false, + gigantoIngestionIp: null, + gigantoIngestionPort: null, + gigantoPublishIp: null, + gigantoPublishPort: null, + gigantoGraphqlIp: null, + gigantoGraphqlPort: null, + retentionPeriod: null, + reconverge: false, + reconvergeReviewIp: null, + reconvergeReviewPort: null, + reconvergeGigantoIp: null, + reconvergeGigantoPort: null, + hog: false, + hogReviewIp: null, + hogReviewPort: null, + hogGigantoIp: null, + hogGigantoPort: null, + protocols: false, + protocolList: {}, + sensors: false, + sensorList: {}, + ) + }"#, + ) + .await; + assert_eq!(res.data.to_string(), r#"{insertNode: "0"}"#); + + // check node count after insert + let res = schema.execute(r#"{nodeList{totalCount}}"#).await; + assert_eq!(res.data.to_string(), r#"{nodeList: {totalCount: 1}}"#); + + // check inserted node + let res = schema + .execute( + r#"{node(id: "0") { + id + asIs { + name + customerId + description + hostname + review + reviewPort + reviewWebPort + protocolList + sensorList + } + toBe { + name + customerId + description + hostname + review + reviewPort + reviewWebPort + protocolList + sensorList + } + + }}"#, + ) + .await; + + assert_json_eq!( + res.data.into_json().unwrap(), + json!({ + "node": { + "id": "0", + "asIs": null, + "toBe": { + "name": "admin node", + "customerId": "0", + "description": "This is the admin node running review.", + "hostname": "admin.aice-security.com", + "review": true, + "reviewPort": 1111, + "reviewWebPort": 1112, + "protocolList": {}, + "sensorList": {}, + }, + } + }) + ); + + // update node + let res = schema + .execute( + r#"mutation { + updateNode( + id: "0" + old: { + asIs: null + toBe: { + name: "admin node", + customerId: 0, + description: "This is the admin node running review.", + hostname: "admin.aice-security.com", + review: true, + reviewPort: 1111, + reviewWebPort: 1112, + piglet: false, + pigletGigantoIp: null, + pigletGigantoPort: null, + pigletReviewIp: null, + pigletReviewPort: null, + savePackets: false, + http: false, + office: false, + exe: false, + pdf: false, + html: false, + txt: false, + smtpEml: false, + ftp: false, + giganto: false, + gigantoIngestionIp: null, + gigantoIngestionPort: null, + gigantoPublishIp: null, + gigantoPublishPort: null, + gigantoGraphqlIp: null, + gigantoGraphqlPort: null, + retentionPeriod: null, + reconverge: false, + reconvergeReviewIp: null, + reconvergeReviewPort: null, + reconvergeGigantoIp: null, + reconvergeGigantoPort: null, + hog: false, + hogReviewIp: null, + hogReviewPort: null, + hogGigantoIp: null, + hogGigantoPort: null, + protocols: false, + protocolList: {}, + sensors: false, + sensorList: {}, + } + }, + new: { + asIs: null, + toBe: { + name: "AdminNode", + customerId: 0, + description: "This is the admin node running review.", + hostname: "admin.aice-security.com", + review: true, + reviewPort: 2222, + reviewWebPort: 2223, + piglet: false, + pigletGigantoIp: null, + pigletGigantoPort: null, + pigletReviewIp: null, + pigletReviewPort: null, + savePackets: false, + http: false, + office: false, + exe: false, + pdf: false, + html: false, + txt: false, + smtpEml: false, + ftp: false, + giganto: false, + gigantoIngestionIp: null, + gigantoIngestionPort: null, + gigantoPublishIp: null, + gigantoPublishPort: null, + gigantoGraphqlIp: null, + gigantoGraphqlPort: null, + retentionPeriod: null, + reconverge: false, + reconvergeReviewIp: null, + reconvergeReviewPort: null, + reconvergeGigantoIp: null, + reconvergeGigantoPort: null, + hog: false, + hogReviewIp: null, + hogReviewPort: null, + hogGigantoIp: null, + hogGigantoPort: null, + protocols: false, + protocolList: {}, + sensors: false, + sensorList: {}, + } + } + ) + }"#, + ) + .await; + assert_eq!(res.data.to_string(), r#"{updateNode: "0"}"#); + + // check node count after update + let res = schema.execute(r#"{nodeList{totalCount}}"#).await; + assert_eq!(res.data.to_string(), r#"{nodeList: {totalCount: 1}}"#); + + // check updated node + let res = schema + .execute( + r#"{node(id: "0") { + id + asIs { + name + customerId + description + hostname + review + reviewPort + reviewWebPort + protocolList + sensorList + } + toBe { + name + customerId + description + hostname + review + reviewPort + reviewWebPort + protocolList + sensorList + } + + }}"#, + ) + .await; + + assert_json_eq!( + res.data.into_json().unwrap(), + json!({ + "node": { + "id": "0", + "asIs": null, + "toBe": { + "name": "AdminNode", // updated + "customerId": "0", + "description": "This is the admin node running review.", + "hostname": "admin.aice-security.com", + "review": true, + "reviewPort": 2222, // updated + "reviewWebPort": 2223, // updated + "protocolList": {}, + "sensorList": {}, + }, + } + }) + ); + + // try reverting node, but it should fail because the node is an initial draft + let res = schema + .execute( + r#"mutation { + updateNode( + id: "0" + old: { + asIs: null + toBe: { + name: "admin node", + customerId: 0, + description: "This is the admin node running review.", + hostname: "admin.aice-security.com", + review: true, + reviewPort: 2222, + reviewWebPort: 2223, + piglet: false, + pigletGigantoIp: null, + pigletGigantoPort: null, + pigletReviewIp: null, + pigletReviewPort: null, + savePackets: false, + http: false, + office: false, + exe: false, + pdf: false, + html: false, + txt: false, + smtpEml: false, + ftp: false, + giganto: false, + gigantoIngestionIp: null, + gigantoIngestionPort: null, + gigantoPublishIp: null, + gigantoPublishPort: null, + gigantoGraphqlIp: null, + gigantoGraphqlPort: null, + retentionPeriod: null, + reconverge: false, + reconvergeReviewIp: null, + reconvergeReviewPort: null, + reconvergeGigantoIp: null, + reconvergeGigantoPort: null, + hog: false, + hogReviewIp: null, + hogReviewPort: null, + hogGigantoIp: null, + hogGigantoPort: null, + protocols: false, + protocolList: {}, + sensors: false, + sensorList: {}, + } + }, + new: { + asIs: null, + toBe: null, + } + ) + }"#, + ) + .await; + assert_eq!(res.data.to_string(), r#"null"#); + assert_eq!(res.errors.len(), 1); + assert_eq!( + res.errors[0].message, + "Invalid combination of old and new values" + ); + + // remove node + let res = schema + .execute( + r#"mutation { + removeNodes(ids: ["0"]) + }"#, + ) + .await; + assert_eq!(res.data.to_string(), r#"{removeNodes: ["AdminNode"]}"#); + + // check node count after remove + let res = schema.execute(r#"{nodeList{totalCount}}"#).await; + assert_eq!(res.data.to_string(), r#"{nodeList: {totalCount: 0}}"#); + } +} diff --git a/src/graphql/node/input.rs b/src/graphql/node/input.rs index 826b20f3..7ef3b38a 100644 --- a/src/graphql/node/input.rs +++ b/src/graphql/node/input.rs @@ -1,13 +1,14 @@ -use super::{Node, PortNumber}; //NicInput -use anyhow::Context as AnyhowContext; //bail +use super::{Node, NodeSetting, PortNumber}; +use anyhow::Context as AnyhowContext; use async_graphql::{types::ID, InputObject, Result}; use review_database::IndexedMapUpdate; use std::{collections::HashMap, net::IpAddr}; +use tracing::error; #[allow(clippy::module_name_repetitions)] #[derive(Clone, InputObject)] #[allow(clippy::struct_excessive_bools)] -pub(super) struct NodeInput { +pub struct NodeSettingInput { pub name: String, pub customer_id: ID, pub description: String, @@ -58,346 +59,163 @@ pub(super) struct NodeInput { pub sensor_list: HashMap, } -impl IndexedMapUpdate for NodeInput { - type Entry = Node; - - fn key(&self) -> Option<&[u8]> { - Some(self.name.as_bytes()) +impl PartialEq for NodeSettingInput { + fn eq(&self, other: &NodeSetting) -> bool { + self.name == other.name + && self.customer_id.as_str().parse::() == Ok(other.customer_id) + && self.description == other.description + && self.hostname == other.hostname + && self.review == other.review + && self.review_port == other.review_port + && self.review_web_port == other.review_web_port + && self.piglet == other.piglet + && parse_str_to_ip(self.piglet_giganto_ip.as_deref()) == other.piglet_giganto_ip + && self.piglet_giganto_port == other.piglet_giganto_port + && parse_str_to_ip(self.piglet_review_ip.as_deref()) == other.piglet_review_ip + && self.piglet_review_port == other.piglet_review_port + && self.save_packets == other.save_packets + && self.http == other.http + && self.office == other.office + && self.exe == other.exe + && self.pdf == other.pdf + && self.html == other.html + && self.txt == other.txt + && self.smtp_eml == other.smtp_eml + && self.ftp == other.ftp + && self.giganto == other.giganto + && parse_str_to_ip(self.giganto_ingestion_ip.as_deref()) == other.giganto_ingestion_ip + && self.giganto_ingestion_port == other.giganto_ingestion_port + && parse_str_to_ip(self.giganto_publish_ip.as_deref()) == other.giganto_publish_ip + && self.giganto_publish_port == other.giganto_publish_port + && parse_str_to_ip(self.giganto_graphql_ip.as_deref()) == other.giganto_graphql_ip + && self.giganto_graphql_port == other.giganto_graphql_port + && self.retention_period == other.retention_period + && self.reconverge == other.reconverge + && parse_str_to_ip(self.reconverge_review_ip.as_deref()) == other.reconverge_review_ip + && self.reconverge_review_port == other.reconverge_review_port + && parse_str_to_ip(self.reconverge_giganto_ip.as_deref()) == other.reconverge_giganto_ip + && self.reconverge_giganto_port == other.reconverge_giganto_port + && self.hog == other.hog + && parse_str_to_ip(self.hog_review_ip.as_deref()) == other.hog_review_ip + && self.hog_review_port == other.hog_review_port + && parse_str_to_ip(self.hog_giganto_ip.as_deref()) == other.hog_giganto_ip + && self.hog_giganto_port == other.hog_giganto_port + && self.protocols == other.protocols + && self.protocol_list == other.protocol_list + && self.sensors == other.sensors + && self.sensor_list == other.sensor_list } +} - #[allow(clippy::too_many_lines)] - fn apply(&self, mut value: Self::Entry) -> Result { - value.name.clear(); - value.name.push_str(&self.name); - value.customer_id = self - .customer_id - .as_str() - .parse::() - .context("invalid customer ID")?; - value.description.clear(); - value.description.push_str(&self.description); - value.hostname.clear(); - value.hostname.push_str(&self.hostname); +impl PartialEq for NodeSetting { + fn eq(&self, other: &NodeSettingInput) -> bool { + other.eq(self) + } +} - // review - value.review = self.review; - value.review_port = self.review_port; - value.review_web_port = self.review_web_port; +impl TryFrom<&NodeSettingInput> for NodeSetting { + type Error = anyhow::Error; + + fn try_from(input: &NodeSettingInput) -> Result { + Ok(NodeSetting { + name: input.name.clone(), + customer_id: input.customer_id.parse().context("invalid customer ID")?, + description: input.description.clone(), + hostname: input.hostname.clone(), + review: input.review, + review_port: input.review_port, + review_web_port: input.review_web_port, + piglet: input.piglet, + piglet_giganto_ip: parse_str_to_ip(input.piglet_giganto_ip.as_deref()), + piglet_giganto_port: input.piglet_giganto_port, + piglet_review_ip: parse_str_to_ip(input.piglet_review_ip.as_deref()), + piglet_review_port: input.piglet_review_port, + save_packets: input.save_packets, + http: input.http, + office: input.office, + exe: input.exe, + pdf: input.pdf, + html: input.html, + txt: input.txt, + smtp_eml: input.smtp_eml, + ftp: input.ftp, + giganto: input.giganto, + giganto_ingestion_ip: parse_str_to_ip(input.giganto_ingestion_ip.as_deref()), + giganto_ingestion_port: input.giganto_ingestion_port, + giganto_publish_ip: parse_str_to_ip(input.giganto_publish_ip.as_deref()), + giganto_publish_port: input.giganto_publish_port, + giganto_graphql_ip: parse_str_to_ip(input.giganto_graphql_ip.as_deref()), + giganto_graphql_port: input.giganto_graphql_port, + retention_period: input.retention_period, + reconverge: input.reconverge, + reconverge_review_ip: parse_str_to_ip(input.reconverge_review_ip.as_deref()), + reconverge_review_port: input.reconverge_review_port, + reconverge_giganto_ip: parse_str_to_ip(input.reconverge_giganto_ip.as_deref()), + reconverge_giganto_port: input.reconverge_giganto_port, + hog: input.hog, + hog_review_ip: parse_str_to_ip(input.hog_review_ip.as_deref()), + hog_review_port: input.hog_review_port, + hog_giganto_ip: parse_str_to_ip(input.hog_giganto_ip.as_deref()), + hog_giganto_port: input.hog_giganto_port, + protocols: input.protocols, + protocol_list: input.protocol_list.clone(), + sensors: input.sensors, + sensor_list: input.sensor_list.clone(), + }) + } +} - // piglet - value.piglet = self.piglet; - value.piglet_giganto_ip = if let Some(ip) = self.piglet_giganto_ip.as_deref() { - Some(ip.parse::().context("invalid IP address")?) - } else { - None - }; - value.piglet_giganto_port = self.piglet_giganto_port; - value.piglet_review_ip = if let Some(ip) = self.piglet_review_ip.as_deref() { - Some(ip.parse::().context("invalid IP address")?) - } else { - None - }; - value.piglet_review_port = self.piglet_review_port; - value.save_packets = self.save_packets; - value.http = self.http; - value.office = self.office; - value.exe = self.exe; - value.pdf = self.pdf; - value.html = self.html; - value.txt = self.txt; - value.smtp_eml = self.smtp_eml; - value.ftp = self.ftp; +fn parse_str_to_ip(ip_str: Option<&str>) -> Option { + ip_str.and_then(|ip_str| ip_str.parse::().ok()) +} - // giganto - value.giganto = self.giganto; - value.giganto_ingestion_ip = if let Some(ip) = self.giganto_ingestion_ip.as_deref() { - Some(ip.parse::().context("invalid IP address")?) - } else { - None - }; - value.giganto_ingestion_port = self.giganto_ingestion_port; - value.giganto_publish_ip = if let Some(ip) = self.giganto_publish_ip.as_deref() { - Some(ip.parse::().context("invalid IP address")?) - } else { - None - }; - value.giganto_publish_port = self.giganto_publish_port; - value.giganto_graphql_ip = if let Some(ip) = self.giganto_graphql_ip.as_deref() { - Some(ip.parse::().context("invalid IP address")?) - } else { - None - }; - value.giganto_graphql_port = self.giganto_graphql_port; - value.retention_period = self.retention_period; +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, InputObject)] +pub(super) struct NodeInput { + pub as_is: Option, + pub to_be: Option, +} - // reconverge - value.reconverge = self.reconverge; - value.reconverge_review_ip = if let Some(ip) = self.reconverge_review_ip.as_deref() { - Some(ip.parse::().context("invalid IP address")?) - } else { - None - }; - value.reconverge_review_port = self.reconverge_review_port; - value.reconverge_giganto_ip = if let Some(ip) = self.reconverge_giganto_ip.as_deref() { - Some(ip.parse::().context("invalid IP address")?) - } else { - None - }; - value.reconverge_giganto_port = self.reconverge_giganto_port; +impl IndexedMapUpdate for NodeInput { + type Entry = Node; - // hog - value.hog = self.hog; - value.hog_review_ip = if let Some(ip) = self.hog_review_ip.as_deref() { - Some(ip.parse::().context("invalid IP address")?) - } else { - None - }; - value.hog_review_port = self.hog_review_port; - value.hog_giganto_ip = if let Some(ip) = self.hog_giganto_ip.as_deref() { - Some(ip.parse::().context("invalid IP address")?) + fn key(&self) -> Option<&[u8]> { + if let Some(as_is) = &self.as_is { + Some(as_is.name.as_bytes()) + } else if let Some(to_be) = &self.to_be { + Some(to_be.name.as_bytes()) } else { + error!("Both `as_is` and `to_be` are `None`"); None - }; - value.hog_giganto_port = self.hog_giganto_port; - value.protocols = self.protocols; - value.protocol_list = self.protocol_list.clone(); - value.sensors = self.sensors; - value.sensor_list = self.sensor_list.clone(); - - Ok(value) - } - - #[allow(clippy::too_many_lines)] - fn verify(&self, value: &Self::Entry) -> bool { - if self.name != value.name { - return false; - } - if self - .customer_id - .as_str() - .parse::() - .map_or(true, |id| id != value.customer_id) - { - return false; - } - if self.description != value.description { - return false; - } - if self.hostname != value.hostname { - return false; - } - - // review - if self.review != value.review { - return false; - } - if self.review_port != value.review_port { - return false; - } - if self.review_web_port != value.review_web_port { - return false; - } - - // piglet - if self.piglet != value.piglet { - return false; - } - if let (Some(ip_self), Some(ip_value)) = - (self.piglet_giganto_ip.as_deref(), value.piglet_giganto_ip) - { - if ip_self - .parse::() - .map_or(true, |ip_self| ip_self != ip_value) - { - return false; - } - } else if self.piglet_giganto_ip.is_some() || value.piglet_giganto_ip.is_some() { - return false; - } - if self.piglet_giganto_port != value.piglet_giganto_port { - return false; - } - if let (Some(ip_self), Some(ip_value)) = - (self.piglet_review_ip.as_deref(), value.piglet_review_ip) - { - if ip_self - .parse::() - .map_or(true, |ip_self| ip_self != ip_value) - { - return false; - } - } else if self.piglet_review_ip.is_some() || value.piglet_review_ip.is_some() { - return false; - } - if self.piglet_review_port != value.piglet_review_port { - return false; - } - if self.save_packets != value.save_packets { - return false; - } - if self.http != value.http { - return false; - } - if self.office != value.office { - return false; - } - if self.exe != value.exe { - return false; - } - if self.pdf != value.pdf { - return false; - } - if self.html != value.html { - return false; - } - if self.txt != value.txt { - return false; - } - if self.smtp_eml != value.smtp_eml { - return false; - } - if self.ftp != value.ftp { - return false; } + } - // giganto - if self.giganto != value.giganto { - return false; - } - if let (Some(ip_self), Some(ip_value)) = ( - self.giganto_ingestion_ip.as_deref(), - value.giganto_ingestion_ip, - ) { - if ip_self - .parse::() - .map_or(true, |ip_self| ip_self != ip_value) - { - return false; + fn apply(&self, mut value: Self::Entry) -> Result { + match self.to_be.as_ref() { + Some(to_be) => { + let new_to_be = NodeSetting::try_from(to_be)?; + value.to_be = Some(new_to_be); } - } else if self.giganto_ingestion_ip.is_some() || value.giganto_ingestion_ip.is_some() { - return false; - } - if self.giganto_ingestion_port != value.giganto_ingestion_port { - return false; - } - if let (Some(ip_self), Some(ip_value)) = - (self.giganto_publish_ip.as_deref(), value.giganto_publish_ip) - { - if ip_self - .parse::() - .map_or(true, |ip_self| ip_self != ip_value) - { - return false; + None => { + value.to_be = None; } - } else if self.giganto_publish_ip.is_some() || value.giganto_publish_ip.is_some() { - return false; - } - if self.giganto_publish_port != value.giganto_publish_port { - return false; - } - if let (Some(ip_self), Some(ip_value)) = - (self.giganto_graphql_ip.as_deref(), value.giganto_graphql_ip) - { - if ip_self - .parse::() - .map_or(true, |ip_self| ip_self != ip_value) - { - return false; - } - } else if self.giganto_graphql_ip.is_some() || value.giganto_graphql_ip.is_some() { - return false; - } - if self.giganto_graphql_port != value.giganto_graphql_port { - return false; - } - if self.retention_period != value.retention_period { - return false; } + Ok(value) + } - // reconverge - if self.reconverge != value.reconverge { - return false; - } - if let (Some(ip_self), Some(ip_value)) = ( - self.reconverge_review_ip.as_deref(), - value.reconverge_review_ip, - ) { - if ip_self - .parse::() - .map_or(true, |ip_self| ip_self != ip_value) - { - return false; - } - } else if self.reconverge_review_ip.is_some() || value.reconverge_review_ip.is_some() { - return false; - } - if self.reconverge_review_port != value.reconverge_review_port { - return false; - } - if let (Some(ip_self), Some(ip_value)) = ( - self.reconverge_giganto_ip.as_deref(), - value.reconverge_giganto_ip, - ) { - if ip_self - .parse::() - .map_or(true, |ip_self| ip_self != ip_value) - { - return false; - } - } else if self.reconverge_giganto_ip.is_some() || value.reconverge_giganto_ip.is_some() { - return false; - } - if self.reconverge_giganto_port != value.reconverge_giganto_port { - return false; - } + fn verify(&self, value: &Self::Entry) -> bool { + let as_is_matches = match (&self.as_is, &value.as_is) { + (Some(as_is), Some(value)) => as_is == value, + (Some(_), None) | (None, Some(_)) => false, + (None, None) => true, + }; - // hog - if self.hog != value.hog { - return false; - } - if let (Some(ip_self), Some(ip_value)) = - (self.hog_review_ip.as_deref(), value.hog_review_ip) - { - if ip_self - .parse::() - .map_or(true, |ip_self| ip_self != ip_value) - { - return false; - } - } else if self.hog_review_ip.is_some() || value.hog_review_ip.is_some() { - return false; - } - if self.hog_review_port != value.hog_review_port { - return false; - } - if let (Some(ip_self), Some(ip_value)) = - (self.hog_giganto_ip.as_deref(), value.hog_giganto_ip) - { - if ip_self - .parse::() - .map_or(true, |ip_self| ip_self != ip_value) - { - return false; - } - } else if self.hog_giganto_ip.is_some() || value.hog_giganto_ip.is_some() { - return false; - } - if self.hog_giganto_port != value.hog_giganto_port { - return false; - } - if self.protocols != value.protocols { - return false; - } - if self.protocol_list != value.protocol_list { - return false; - } - if self.sensors != value.sensors { - return false; - } - if self.sensor_list != value.sensor_list { - return false; - } + let to_be_matches = match (&self.to_be, &value.to_be) { + (Some(to_be), Some(value)) => to_be == value, + (Some(_), None) | (None, Some(_)) => false, + (None, None) => true, + }; - true + as_is_matches && to_be_matches } } diff --git a/src/graphql/node/status.rs b/src/graphql/node/status.rs index b7327a26..230db36f 100644 --- a/src/graphql/node/status.rs +++ b/src/graphql/node/status.rs @@ -99,6 +99,8 @@ async fn load( connection .edges .extend(node_list.into_iter().map(move |(k, ev)| { + let hostname = ev.as_is_hostname_or_fallback(); + let ( review, piglet, @@ -111,11 +113,9 @@ async fn load( total_disk_space, used_disk_space, ping, - ) = if let (Some(modules), Some(usage), Some(ping)) = ( - apps.get(&ev.hostname), - usages.get(&ev.hostname), - ping.get(&ev.hostname), - ) { + ) = if let (Some(modules), Some(usage), Some(ping)) = + (apps.get(hostname), usages.get(hostname), ping.get(hostname)) + { let module_names = modules .iter() .map(|(_, m)| m.clone()) @@ -140,7 +140,7 @@ async fn load( Some(usage.used_disk_space), Some(*ping), ) - } else if !review_hostname.is_empty() && review_hostname == ev.hostname { + } else if !review_hostname.is_empty() && review_hostname == hostname { ( Some(true), None, @@ -163,7 +163,7 @@ async fn load( k, NodeStatus::new( ev.id, - ev.name, + ev.as_is_name_or_fallback().to_owned(), cpu_usage, total_memory, used_memory,