Skip to content

Commit

Permalink
feat: 🌈 Add postgresql_matchers module
Browse files Browse the repository at this point in the history
  • Loading branch information
fungiboletus committed Aug 27, 2024
1 parent c0a7b16 commit fca825b
Show file tree
Hide file tree
Showing 11 changed files with 327 additions and 22 deletions.
87 changes: 85 additions & 2 deletions src/datamodel/matchers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use prometheus_parser::{
Expression as PrometheusExpression, Label as PrometheusLabel, LabelOp as PrometheusLabelOp,
};

#[derive(Debug, Default, PartialEq)]
#[derive(Debug, Clone, Default, PartialEq)]
pub enum StringMatcher {
#[default]
All,
Expand All @@ -22,9 +22,31 @@ impl StringMatcher {
PrometheusLabelOp::RegexNotEqual => StringMatcher::NotMatch(value),
}
}

/// Is the matcher a negative matcher?
pub fn is_negative(&self) -> bool {
match self {
StringMatcher::All => false,
StringMatcher::Equal(_) => false,
StringMatcher::NotEqual(_) => true,
StringMatcher::Match(_) => false,
StringMatcher::NotMatch(_) => true,
}
}

/// Negate the matcher.
pub fn negate(&self) -> Self {
match self {
StringMatcher::All => StringMatcher::All,
StringMatcher::Equal(value) => StringMatcher::NotEqual(value.clone()),
StringMatcher::NotEqual(value) => StringMatcher::Equal(value.clone()),
StringMatcher::Match(value) => StringMatcher::NotMatch(value.clone()),
StringMatcher::NotMatch(value) => StringMatcher::Match(value.clone()),
}
}
}

#[derive(Debug, PartialEq)]
#[derive(Debug, Clone, PartialEq)]
pub struct LabelMatcher {
name: String,
matcher: StringMatcher,
Expand All @@ -36,6 +58,21 @@ impl LabelMatcher {
let matcher = StringMatcher::from_prometheus_label_op(label.op, label.value);
Self { name, matcher }
}

pub fn name(&self) -> &String {
&self.name
}

pub fn matcher(&self) -> &StringMatcher {
&self.matcher
}

pub fn negate(&self) -> Self {
Self {
name: self.name.clone(),
matcher: self.matcher.negate(),
}
}
}

#[derive(Debug, Default, PartialEq)]
Expand Down Expand Up @@ -90,6 +127,19 @@ impl SensorMatcher {
},
})
}

pub fn name_matcher(&self) -> &StringMatcher {
&self.name_matcher
}

pub fn label_matchers(&self) -> Option<&Vec<LabelMatcher>> {
self.label_matchers.as_ref()
}

pub fn is_all(&self) -> bool {
self.name_matcher == StringMatcher::All
&& (self.label_matchers.is_none() || self.label_matchers.as_ref().unwrap().is_empty())
}
}

// Test with:
Expand Down Expand Up @@ -217,4 +267,37 @@ mod tests {
);
assert!(SensorMatcher::from_prometheus_query("http_requests_total @ 1609746000").is_err());
}

#[test]
fn test_negative_matcher() {
let matcher = SensorMatcher::from_prometheus_query("http_requests_total").unwrap();
assert!(!matcher.name_matcher.is_negative());
assert!(matcher.name_matcher.negate().is_negative());

let matcher =
SensorMatcher::from_prometheus_query("http_requests_total{job!=\"prometheus\"}")
.unwrap();
assert!(matcher.label_matchers.as_ref().unwrap()[0]
.matcher
.is_negative(),);
assert!(!matcher.label_matchers.unwrap()[0]
.matcher
.negate()
.is_negative());
}

#[test]
fn test_sensor_matcher_is_all() {
let matcher = SensorMatcher::from_prometheus_query("http_requests_total").unwrap();
assert!(!matcher.is_all());

let matcher = SensorMatcher {
name_matcher: StringMatcher::All,
label_matchers: None,
};
assert!(matcher.is_all());

let matcher = SensorMatcher::default();
assert!(matcher.is_all());
}
}
6 changes: 4 additions & 2 deletions src/ingestors/http/crud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ pub async fn list_sensors(
}

let matcher = match query.query {
Some(query_str) => SensorMatcher::from_prometheus_query(&query_str)?,
Some(query_str) => {
SensorMatcher::from_prometheus_query(&query_str).map_err(AppError::BadRequest)?
}
None => SensorMatcher::default(),
};

Expand Down Expand Up @@ -90,7 +92,7 @@ pub async fn get_sensor(
State(state): State<HttpServerState>,
Path(uuid): Path<String>,
) -> Result<Json<SensorViewModel>, AppError> {
todo!();
unimplemented!();
/*let sensor = state.storage.get_sensor(uuid).await.map_err(|error| {
eprintln!("Failed to get sensor: {:?}", error);
AppError::InternalServerError(error)
Expand Down
2 changes: 1 addition & 1 deletion src/ingestors/http/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,5 +146,5 @@ pub async fn read(
headers: HeaderMap,
bytes: Bytes,
) -> Result<StatusCode, AppError> {
todo!();
unimplemented!();
}
2 changes: 1 addition & 1 deletion src/storage/bigquery/migrations/20240223133248_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ ON
s.sensor_id = nv.sensor_id;

CREATE OR REPLACE VIEW `{dataset_id}.sensor_labels_view` AS
SELECT sensors.uuid, sensors.created_at, sensors.name, type, units.name as unit,
SELECT sensors.sensor_id, sensors.uuid, sensors.created_at, sensors.name, type, units.name as unit,
CASE WHEN COUNT(labels.sensor_id) = 0 THEN JSON_OBJECT() ELSE JSON_OBJECT(
ARRAY_AGG(labels_name_dictionary.name), ARRAY_AGG(labels_description_dictionary.description)
) END AS labels
Expand Down
2 changes: 1 addition & 1 deletion src/storage/duckdb/migrations/20240223133248_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ CREATE INDEX IF NOT EXISTS idx_labels_description_dictionary_description ON labe
CREATE INDEX IF NOT EXISTS idx_strings_values_dictionary_value ON strings_values_dictionary (value);

CREATE VIEW IF NOT EXISTS sensor_labels_view AS
SELECT sensors.uuid, sensors.created_at, sensors."name", type, units.name as unit, json_group_object(
SELECT sensors.sensor_id, sensors.uuid, sensors.created_at, sensors."name", type, units.name as unit, json_group_object(
labels_name_dictionary."name",labels_description_dictionary."description"
) AS labels
FROM sensors
Expand Down
2 changes: 1 addition & 1 deletion src/storage/postgresql/migrations/20240110123122_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ CREATE INDEX index_json_values ON json_values USING brin (sensor_id, timestamp_m
CREATE INDEX index_blob_values ON blob_values USING brin (sensor_id, timestamp_ms) WITH (pages_per_range = 32);

CREATE VIEW sensor_labels_view AS
SELECT sensors.uuid, sensors.created_at, sensors."name", type, units.name as unit,
SELECT sensors.sensor_id, sensors.uuid, sensors.created_at, sensors."name", type, units.name as unit,
CASE WHEN COUNT(labels.sensor_id) = 0 THEN '{}' ELSE jsonb_object_agg(
COALESCE(labels_name_dictionary."name",'whatever_this_is_a_bug_workaround'),labels_description_dictionary."description")
END AS labels
Expand Down
1 change: 1 addition & 0 deletions src/storage/postgresql/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod postgresql;
pub mod postgresql_crud;
pub mod postgresql_matchers;
pub mod postgresql_publishers;
pub mod postgresql_utilities;

Expand Down
64 changes: 52 additions & 12 deletions src/storage/postgresql/postgresql_crud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ use std::collections::BTreeMap;

use anyhow::anyhow;
use anyhow::Result;
use sqlx::postgres::PgArguments;
use sqlx::query::Query;
use sqlx::types::time::PrimitiveDateTime;
use sqlx::PgPool;
use sqlx::Postgres;
use sqlx::Row;

use crate::crud::{list_cursor::ListCursor, viewmodel::sensor_viewmodel::SensorViewModel};
use crate::datamodel::matchers::SensorMatcher;
use crate::storage::postgresql::postgresql_matchers::append_sensor_matcher_to_query;

pub async fn list_sensors(
pool: &PgPool,
Expand All @@ -23,18 +27,54 @@ pub async fn list_sensors(
::time::OffsetDateTime::from_unix_timestamp_nanos(cursor_next_created_at_timestamp)?;
let cursor_uuid = uuid::Uuid::parse_str(&cursor.next_uuid)?;

let query = sqlx::query(
r#"
SELECT uuid, name, created_at, type, unit, labels
FROM sensor_labels_view
WHERE (created_at, uuid) >= ($1, $2)
ORDER BY created_at ASC, uuid ASC
LIMIT $3
"#,
)
.bind(cursor_next_created_at_offset_datetime)
.bind(cursor_uuid)
.bind(query_limit);
let mut query: Query<Postgres, PgArguments>;
let mut query_string: String;

if matcher.is_all() {
query = sqlx::query(
r#"SELECT uuid, name, created_at, type, unit, labels
FROM sensor_labels_view
WHERE (created_at, uuid) >= ($1, $2)
ORDER BY created_at ASC, uuid ASC
LIMIT $3"#,
)
.bind(cursor_next_created_at_offset_datetime)
.bind(cursor_uuid)
.bind(query_limit);
} else {
query_string = String::from(
r#"SELECT uuid, name, created_at, type, unit, labels
FROM sensor_labels_view
WHERE (created_at, uuid) >= ($1, $2)
AND sensor_id IN (
"#,
);

let mut params: Vec<String> = Vec::new();

append_sensor_matcher_to_query(&mut query_string, &mut params, &matcher, 2);

query_string.push_str(
r#"
)
ORDER BY created_at ASC, uuid ASC
LIMIT $"#,
);
query_string.push_str((params.len() + 3).to_string().as_str());

//println!("query_string: {}", query_string);
//println!("params: {:?}", params);

query = sqlx::query(&query_string)
.bind(cursor_next_created_at_offset_datetime)
.bind(cursor_uuid);

for param in params {
query = query.bind(param);
}

query = query.bind(query_limit);
}

let mut connection = pool.acquire().await?;
let mut records = query.fetch_all(&mut *connection).await?;
Expand Down
Loading

0 comments on commit fca825b

Please sign in to comment.