Skip to content

Commit

Permalink
add NetworkThreat event message
Browse files Browse the repository at this point in the history
  • Loading branch information
syncpark committed Dec 6, 2023
1 parent 372e0de commit 9943b4f
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 115 deletions.
23 changes: 18 additions & 5 deletions src/graphql/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod http;
mod kerberos;
mod ldap;
mod mqtt;
mod network;
mod nfs;
mod ntlm;
mod rdp;
Expand All @@ -24,9 +25,9 @@ use self::{
http::BlockListHttp, http::DomainGenerationAlgorithm, http::HttpThreat, http::NonBrowser,
http::RepeatedHttpSessions, http::TorConnection, kerberos::BlockListKerberos,
ldap::BlockListLdap, ldap::LdapBruteForce, ldap::LdapPlainText, mqtt::BlockListMqtt,
nfs::BlockListNfs, ntlm::BlockListNtlm, rdp::BlockListRdp, rdp::RdpBruteForce,
smb::BlockListSmb, smtp::BlockListSmtp, ssh::BlockListSsh, sysmon::WindowsThreat,
tls::BlockListTls,
network::NetworkThreat, nfs::BlockListNfs, ntlm::BlockListNtlm, rdp::BlockListRdp,
rdp::RdpBruteForce, smb::BlockListSmb, smtp::BlockListSmtp, ssh::BlockListSsh,
sysmon::WindowsThreat, tls::BlockListTls,
};
use super::{
customer::{Customer, HostNetworkGroupInput},
Expand Down Expand Up @@ -152,6 +153,7 @@ async fn fetch_events(
let mut block_list_ssh_time = start_time;
let mut block_list_tls_time = start_time;
let mut windows_threat_time = start_time;
let mut network_threat_time = start_time;

loop {
itv.tick().await;
Expand Down Expand Up @@ -187,7 +189,8 @@ async fn fetch_events(
.min(block_list_smtp_time)
.min(block_list_ssh_time)
.min(block_list_tls_time)
.min(windows_threat_time);
.min(windows_threat_time)
.min(network_threat_time);

// Fetch event iterator based on time
let start = i128::from(start) << 64;
Expand Down Expand Up @@ -390,6 +393,13 @@ async fn fetch_events(
windows_threat_time = event_time + ADD_TIME_FOR_NEXT_COMPARE;
}
}
EventKind::NetworkThreat => {
if event_time >= network_threat_time {
tx.unbounded_send(value.into())?;
network_threat_time = event_time + ADD_TIME_FOR_NEXT_COMPARE;
}
}

EventKind::Log => continue,
}
}
Expand Down Expand Up @@ -502,7 +512,7 @@ pub(super) struct EndpointInput {
}

#[derive(SimpleObject)]
pub(self) struct EventWithTriage {
struct EventWithTriage {
pub event: Event,
pub triage_result: Option<String>,
}
Expand Down Expand Up @@ -588,6 +598,8 @@ enum Event {
BlockListTls(BlockListTls),

WindowsThreat(WindowsThreat),

NetworkThreat(NetworkThreat),
}

impl From<database::Event> for Event {
Expand Down Expand Up @@ -632,6 +644,7 @@ impl From<database::Event> for Event {
RecordType::Tls(event) => Event::BlockListTls(event.into()),
},
database::Event::WindowsThreat(event) => Event::WindowsThreat(event.into()),
database::Event::NetworkThreat(event) => Event::NetworkThreat(event.into()),
}
}
}
Expand Down
128 changes: 128 additions & 0 deletions src/graphql/event/network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use super::{country_code, find_ip_customer, find_ip_network, TriageScore};
use crate::graphql::{customer::Customer, network::Network};
use async_graphql::{Context, Object, Result};
use chrono::{DateTime, Utc};
use review_database as database;

#[allow(clippy::module_name_repetitions)]
pub(super) struct NetworkThreat {
inner: database::NetworkThreat,
}

#[Object]
impl NetworkThreat {
async fn time(&self) -> DateTime<Utc> {
self.inner.timestamp
}

async fn source(&self) -> &str {
&self.inner.source
}

async fn src_addr(&self) -> String {
self.inner.orig_addr.to_string()
}

async fn src_port(&self) -> u16 {
self.inner.orig_port
}

/// The two-letter country code of the source IP address. `"XX"` if the
/// location of the address is not known, and `"ZZ"` if the location
/// database is unavailable.
async fn src_country(&self, ctx: &Context<'_>) -> String {
country_code(ctx, self.inner.orig_addr)
}

async fn src_customer(&self, ctx: &Context<'_>) -> Result<Option<Customer>> {
let store = crate::graphql::get_store(ctx).await?;
let map = store.customer_map();
find_ip_customer(&map, self.inner.orig_addr)
}

async fn src_network(&self, ctx: &Context<'_>) -> Result<Option<Network>> {
let store = crate::graphql::get_store(ctx).await?;
let map = store.network_map();
find_ip_network(&map, self.inner.orig_addr)
}

async fn dst_addr(&self) -> String {
self.inner.resp_addr.to_string()
}

async fn dst_port(&self) -> u16 {
self.inner.resp_port
}

/// The two-letter country code of the destination IP address. `"XX"` if the
/// location of the address is not known, and `"ZZ"` if the location
/// database is unavailable.
async fn dst_country(&self, ctx: &Context<'_>) -> String {
country_code(ctx, self.inner.resp_addr)
}

async fn dst_customer(&self, ctx: &Context<'_>) -> Result<Option<Customer>> {
let store = crate::graphql::get_store(ctx).await?;
let map = store.customer_map();
find_ip_customer(&map, self.inner.resp_addr)
}

async fn dst_network(&self, ctx: &Context<'_>) -> Result<Option<Network>> {
let store = crate::graphql::get_store(ctx).await?;
let map = store.network_map();
find_ip_network(&map, self.inner.resp_addr)
}

async fn proto(&self) -> u8 {
self.inner.proto
}

async fn service(&self) -> &str {
&self.inner.service
}

async fn last_time(&self) -> i64 {
self.inner.last_time
}

async fn content(&self) -> &str {
&self.inner.content
}

async fn db_name(&self) -> &str {
&self.inner.db_name
}

async fn rule_id(&self) -> u32 {
self.inner.rule_id
}

async fn matched_to(&self) -> &str {
&self.inner.matched_to
}

async fn cluster_id(&self) -> usize {
self.inner.cluster_id
}

async fn attack_kind(&self) -> &str {
&self.inner.attack_kind
}

async fn confidence(&self) -> f32 {
self.inner.confidence
}

async fn triage_scores(&self) -> Option<Vec<TriageScore>> {
self.inner
.triage_scores
.as_ref()
.map(|scores| scores.iter().map(Into::into).collect::<Vec<TriageScore>>())
}
}

impl From<database::NetworkThreat> for NetworkThreat {
fn from(inner: database::NetworkThreat) -> Self {
Self { inner }
}
}
107 changes: 0 additions & 107 deletions src/graphql/event/sysmon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,113 +3,6 @@ use async_graphql::Object;
use chrono::{DateTime, Utc};
use review_database as database;

// #[allow(clippy::module_name_repetitions)]
// pub(super) struct CommonNetworkEvent {
// inner: database::CommonNetworkEvent,
// }

// #[Object]
// impl CommonNetworkEvent {
// async fn event_kind(&self) -> EventKind {
// self.inner.event_kind
// }

// async fn time(&self) -> DateTime<Utc> {
// self.inner.time
// }

// async fn source(&self) -> &str {
// &self.inner.source
// }

// async fn session_end_time(&self) -> DateTime<Utc> {
// self.inner.session_end_time
// }

// async fn src_addr(&self) -> String {
// self.inner.src_addr.to_string()
// }

// async fn src_port(&self) -> u16 {
// self.inner.src_port
// }

// /// The two-letter country code of the source IP address. `"XX"` if the
// /// location of the address is not known, and `"ZZ"` if the location
// /// database is unavailable.
// async fn src_country(&self, ctx: &Context<'_>) -> String {
// country_code(ctx, self.inner.src_addr)
// }

// async fn src_customer(&self, ctx: &Context<'_>) -> Result<Option<Customer>> {
// let store = crate::graphql::get_store(ctx).await?;
// let map = store.customer_map();
// find_ip_customer(&map, self.inner.src_addr)
// }

// async fn src_network(&self, ctx: &Context<'_>) -> Result<Option<Network>> {
// let store = crate::graphql::get_store(ctx).await?;
// let map = store.network_map();
// find_ip_network(&map, self.inner.src_addr)
// }

// async fn dst_addr(&self) -> String {
// self.inner.dst_addr.to_string()
// }

// async fn dst_port(&self) -> u16 {
// self.inner.dst_port
// }

// /// The two-letter country code of the destination IP address. `"XX"` if the
// /// location of the address is not known, and `"ZZ"` if the location
// /// database is unavailable.
// async fn dst_country(&self, ctx: &Context<'_>) -> String {
// country_code(ctx, self.inner.dst_addr)
// }

// async fn dst_customer(&self, ctx: &Context<'_>) -> Result<Option<Customer>> {
// let store = crate::graphql::get_store(ctx).await?;
// let map = store.customer_map();
// find_ip_customer(&map, self.inner.dst_addr)
// }

// async fn dst_network(&self, ctx: &Context<'_>) -> Result<Option<Network>> {
// let store = crate::graphql::get_store(ctx).await?;
// let map = store.network_map();
// find_ip_network(&map, self.inner.dst_addr)
// }

// async fn proto(&self) -> u8 {
// self.inner.proto
// }

// async fn service(&self) -> &str {
// &self.inner.service
// }

// async fn content(&self) -> &str {
// &self.inner.content
// }

// async fn confidence(&self) -> f32 {
// self.inner.confidence
// }

// async fn triage_scores(&self) -> Option<Vec<TriageScore>> {
// self.inner
// .triage_scores
// .as_ref()
// .map(|scores| scores.iter().map(Into::into).collect::<Vec<TriageScore>>())
// }
// }

// impl From<database::CommonNetworkEvent> for CommonNetworkEvent {
// fn from(inner: database::CommonNetworkEvent) -> Self {
// Self { inner }
// }
// }

#[allow(clippy::module_name_repetitions)]
pub(super) struct WindowsThreat {
inner: database::WindowsThreat,
Expand Down
1 change: 0 additions & 1 deletion src/graphql/outlier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ pub(super) struct CentralRankedOutlier {
pub saved: bool,
}

type CentralRankedOutlierKey = (String, String, i64, String, String);
type CentralRankedOutlierValue = (f64, bool);
impl FromKeyValue for CentralRankedOutlier {
fn from_key_value(key: &[u8], value: &[u8]) -> anyhow::Result<Self> {
Expand Down
4 changes: 2 additions & 2 deletions src/graphql/semi_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ impl SemiModelInfo {
model_name: semi_model.model_name,
model_version: semi_model.model_version,
model_description: semi_model.model_description,
time: time,
time,
model_data: semi_model.model_data,
}
}
}

impl FromKeyValue for SemiModelInfo {
fn from_key_value(_key: &[u8], value: &[u8]) -> Result<Self, anyhow::Error> {
let (semi_info, time) = bincode::deserialize::<SemiModelValue>(&value)?;
let (semi_info, time) = bincode::deserialize::<SemiModelValue>(value)?;
Ok(SemiModelInfo::new(semi_info, time))
}
}
Expand Down

0 comments on commit 9943b4f

Please sign in to comment.