diff --git a/src/graphql/event.rs b/src/graphql/event.rs index 975cb323..2d731de1 100644 --- a/src/graphql/event.rs +++ b/src/graphql/event.rs @@ -7,6 +7,7 @@ mod http; mod kerberos; mod ldap; mod mqtt; +mod network; mod nfs; mod ntlm; mod rdp; @@ -24,9 +25,9 @@ use self::{ http::BlockListHttp, http::DomainGenerationAlgorithm, http::HttpThreat, http::NonBrowser, http::RepeatedHttpSessions, http::TorConnection, kerberos::BlockListKerberos, ldap::BlockListLdap, ldap::LdapBruteForce, ldap::LdapPlainText, mqtt::BlockListMqtt, - nfs::BlockListNfs, ntlm::BlockListNtlm, rdp::BlockListRdp, rdp::RdpBruteForce, - smb::BlockListSmb, smtp::BlockListSmtp, ssh::BlockListSsh, sysmon::WindowsThreat, - tls::BlockListTls, + network::NetworkThreat, nfs::BlockListNfs, ntlm::BlockListNtlm, rdp::BlockListRdp, + rdp::RdpBruteForce, smb::BlockListSmb, smtp::BlockListSmtp, ssh::BlockListSsh, + sysmon::WindowsThreat, tls::BlockListTls, }; use super::{ customer::{Customer, HostNetworkGroupInput}, @@ -152,6 +153,7 @@ async fn fetch_events( let mut block_list_ssh_time = start_time; let mut block_list_tls_time = start_time; let mut windows_threat_time = start_time; + let mut network_threat_time = start_time; loop { itv.tick().await; @@ -187,7 +189,8 @@ async fn fetch_events( .min(block_list_smtp_time) .min(block_list_ssh_time) .min(block_list_tls_time) - .min(windows_threat_time); + .min(windows_threat_time) + .min(network_threat_time); // Fetch event iterator based on time let start = i128::from(start) << 64; @@ -390,6 +393,13 @@ async fn fetch_events( windows_threat_time = event_time + ADD_TIME_FOR_NEXT_COMPARE; } } + EventKind::NetworkThreat => { + if event_time >= network_threat_time { + tx.unbounded_send(value.into())?; + network_threat_time = event_time + ADD_TIME_FOR_NEXT_COMPARE; + } + } + EventKind::Log => continue, } } @@ -502,7 +512,7 @@ pub(super) struct EndpointInput { } #[derive(SimpleObject)] -pub(self) struct EventWithTriage { +struct EventWithTriage { pub event: Event, pub triage_result: Option, } @@ -588,6 +598,8 @@ enum Event { BlockListTls(BlockListTls), WindowsThreat(WindowsThreat), + + NetworkThreat(NetworkThreat), } impl From for Event { @@ -632,6 +644,7 @@ impl From for Event { RecordType::Tls(event) => Event::BlockListTls(event.into()), }, database::Event::WindowsThreat(event) => Event::WindowsThreat(event.into()), + database::Event::NetworkThreat(event) => Event::NetworkThreat(event.into()), } } } diff --git a/src/graphql/event/network.rs b/src/graphql/event/network.rs new file mode 100644 index 00000000..b8e103f8 --- /dev/null +++ b/src/graphql/event/network.rs @@ -0,0 +1,128 @@ +use super::{country_code, find_ip_customer, find_ip_network, TriageScore}; +use crate::graphql::{customer::Customer, network::Network}; +use async_graphql::{Context, Object, Result}; +use chrono::{DateTime, Utc}; +use review_database as database; + +#[allow(clippy::module_name_repetitions)] +pub(super) struct NetworkThreat { + inner: database::NetworkThreat, +} + +#[Object] +impl NetworkThreat { + async fn time(&self) -> DateTime { + self.inner.timestamp + } + + async fn source(&self) -> &str { + &self.inner.source + } + + async fn src_addr(&self) -> String { + self.inner.orig_addr.to_string() + } + + async fn src_port(&self) -> u16 { + self.inner.orig_port + } + + /// The two-letter country code of the source IP address. `"XX"` if the + /// location of the address is not known, and `"ZZ"` if the location + /// database is unavailable. + async fn src_country(&self, ctx: &Context<'_>) -> String { + country_code(ctx, self.inner.orig_addr) + } + + async fn src_customer(&self, ctx: &Context<'_>) -> Result> { + let store = crate::graphql::get_store(ctx).await?; + let map = store.customer_map(); + find_ip_customer(&map, self.inner.orig_addr) + } + + async fn src_network(&self, ctx: &Context<'_>) -> Result> { + let store = crate::graphql::get_store(ctx).await?; + let map = store.network_map(); + find_ip_network(&map, self.inner.orig_addr) + } + + async fn dst_addr(&self) -> String { + self.inner.resp_addr.to_string() + } + + async fn dst_port(&self) -> u16 { + self.inner.resp_port + } + + /// The two-letter country code of the destination IP address. `"XX"` if the + /// location of the address is not known, and `"ZZ"` if the location + /// database is unavailable. + async fn dst_country(&self, ctx: &Context<'_>) -> String { + country_code(ctx, self.inner.resp_addr) + } + + async fn dst_customer(&self, ctx: &Context<'_>) -> Result> { + let store = crate::graphql::get_store(ctx).await?; + let map = store.customer_map(); + find_ip_customer(&map, self.inner.resp_addr) + } + + async fn dst_network(&self, ctx: &Context<'_>) -> Result> { + let store = crate::graphql::get_store(ctx).await?; + let map = store.network_map(); + find_ip_network(&map, self.inner.resp_addr) + } + + async fn proto(&self) -> u8 { + self.inner.proto + } + + async fn service(&self) -> &str { + &self.inner.service + } + + async fn last_time(&self) -> i64 { + self.inner.last_time + } + + async fn content(&self) -> &str { + &self.inner.content + } + + async fn db_name(&self) -> &str { + &self.inner.db_name + } + + async fn rule_id(&self) -> u32 { + self.inner.rule_id + } + + async fn matched_to(&self) -> &str { + &self.inner.matched_to + } + + async fn cluster_id(&self) -> usize { + self.inner.cluster_id + } + + async fn attack_kind(&self) -> &str { + &self.inner.attack_kind + } + + async fn confidence(&self) -> f32 { + self.inner.confidence + } + + async fn triage_scores(&self) -> Option> { + self.inner + .triage_scores + .as_ref() + .map(|scores| scores.iter().map(Into::into).collect::>()) + } +} + +impl From for NetworkThreat { + fn from(inner: database::NetworkThreat) -> Self { + Self { inner } + } +} diff --git a/src/graphql/event/sysmon.rs b/src/graphql/event/sysmon.rs index 4e88834a..ade0ff04 100644 --- a/src/graphql/event/sysmon.rs +++ b/src/graphql/event/sysmon.rs @@ -3,113 +3,6 @@ use async_graphql::Object; use chrono::{DateTime, Utc}; use review_database as database; -// #[allow(clippy::module_name_repetitions)] -// pub(super) struct CommonNetworkEvent { -// inner: database::CommonNetworkEvent, -// } - -// #[Object] -// impl CommonNetworkEvent { -// async fn event_kind(&self) -> EventKind { -// self.inner.event_kind -// } - -// async fn time(&self) -> DateTime { -// self.inner.time -// } - -// async fn source(&self) -> &str { -// &self.inner.source -// } - -// async fn session_end_time(&self) -> DateTime { -// self.inner.session_end_time -// } - -// async fn src_addr(&self) -> String { -// self.inner.src_addr.to_string() -// } - -// async fn src_port(&self) -> u16 { -// self.inner.src_port -// } - -// /// The two-letter country code of the source IP address. `"XX"` if the -// /// location of the address is not known, and `"ZZ"` if the location -// /// database is unavailable. -// async fn src_country(&self, ctx: &Context<'_>) -> String { -// country_code(ctx, self.inner.src_addr) -// } - -// async fn src_customer(&self, ctx: &Context<'_>) -> Result> { -// let store = crate::graphql::get_store(ctx).await?; -// let map = store.customer_map(); -// find_ip_customer(&map, self.inner.src_addr) -// } - -// async fn src_network(&self, ctx: &Context<'_>) -> Result> { -// let store = crate::graphql::get_store(ctx).await?; -// let map = store.network_map(); -// find_ip_network(&map, self.inner.src_addr) -// } - -// async fn dst_addr(&self) -> String { -// self.inner.dst_addr.to_string() -// } - -// async fn dst_port(&self) -> u16 { -// self.inner.dst_port -// } - -// /// The two-letter country code of the destination IP address. `"XX"` if the -// /// location of the address is not known, and `"ZZ"` if the location -// /// database is unavailable. -// async fn dst_country(&self, ctx: &Context<'_>) -> String { -// country_code(ctx, self.inner.dst_addr) -// } - -// async fn dst_customer(&self, ctx: &Context<'_>) -> Result> { -// let store = crate::graphql::get_store(ctx).await?; -// let map = store.customer_map(); -// find_ip_customer(&map, self.inner.dst_addr) -// } - -// async fn dst_network(&self, ctx: &Context<'_>) -> Result> { -// let store = crate::graphql::get_store(ctx).await?; -// let map = store.network_map(); -// find_ip_network(&map, self.inner.dst_addr) -// } - -// async fn proto(&self) -> u8 { -// self.inner.proto -// } - -// async fn service(&self) -> &str { -// &self.inner.service -// } - -// async fn content(&self) -> &str { -// &self.inner.content -// } - -// async fn confidence(&self) -> f32 { -// self.inner.confidence -// } - -// async fn triage_scores(&self) -> Option> { -// self.inner -// .triage_scores -// .as_ref() -// .map(|scores| scores.iter().map(Into::into).collect::>()) -// } -// } - -// impl From for CommonNetworkEvent { -// fn from(inner: database::CommonNetworkEvent) -> Self { -// Self { inner } -// } -// } - #[allow(clippy::module_name_repetitions)] pub(super) struct WindowsThreat { inner: database::WindowsThreat, diff --git a/src/graphql/outlier.rs b/src/graphql/outlier.rs index 9466ee45..2ea8e320 100644 --- a/src/graphql/outlier.rs +++ b/src/graphql/outlier.rs @@ -138,7 +138,6 @@ pub(super) struct CentralRankedOutlier { pub saved: bool, } -type CentralRankedOutlierKey = (String, String, i64, String, String); type CentralRankedOutlierValue = (f64, bool); impl FromKeyValue for CentralRankedOutlier { fn from_key_value(key: &[u8], value: &[u8]) -> anyhow::Result { diff --git a/src/graphql/semi_model.rs b/src/graphql/semi_model.rs index c6165566..90444430 100644 --- a/src/graphql/semi_model.rs +++ b/src/graphql/semi_model.rs @@ -110,7 +110,7 @@ impl SemiModelInfo { model_name: semi_model.model_name, model_version: semi_model.model_version, model_description: semi_model.model_description, - time: time, + time, model_data: semi_model.model_data, } } @@ -118,7 +118,7 @@ impl SemiModelInfo { impl FromKeyValue for SemiModelInfo { fn from_key_value(_key: &[u8], value: &[u8]) -> Result { - let (semi_info, time) = bincode::deserialize::(&value)?; + let (semi_info, time) = bincode::deserialize::(value)?; Ok(SemiModelInfo::new(semi_info, time)) } }