Skip to content

Commit

Permalink
feat: 🚚 Refactoring of the parsers
Browse files Browse the repository at this point in the history
  • Loading branch information
fungiboletus committed Aug 21, 2024
1 parent 190f052 commit 56ffe50
Show file tree
Hide file tree
Showing 20 changed files with 1,404 additions and 783 deletions.
1,110 changes: 694 additions & 416 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 12 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ futures = "0.3"
#futures-util = { version = "0.3", features = ["io"] }
#http-body = "1.0"
#http-body-util = "0.1"
polars = { version = "0.41" }
sqlx = { version = "0.7", features = [
polars = { version = "0.42" }
sqlx = { version = "0.8", features = [
"runtime-tokio",
"sqlite",
"postgres",
Expand All @@ -30,7 +30,13 @@ tokio = { version = "1.35", features = ["full"] }
tokio-stream = { version = "0.1", features = ["io-util"] }
tokio-util = "0.7"
tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.5", features = ["full"] }
tower-http = { version = "0.5", features = [
"full",
"sensitive-headers",
"compression-zstd",
"compression-br",
"compression-gzip",
] }
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.6", features = ["serde"] }
Expand Down Expand Up @@ -58,7 +64,7 @@ blake3 = "1.5"
regex = "1.10"
influxdb-line-protocol = "2.0"
flate2 = "1.0"
smallvec = "1.13"
smallvec = "1.13.2"
once_cell = "1.19"
urlencoding = "2.1"
hybridmap = "0.1"
Expand All @@ -73,8 +79,8 @@ sentry = { version = "0.34", features = ["anyhow", "tower"] }
rumqttc = { version = "0.24", features = ["url", "websocket"] }
url = "2.5"
rand = "0.8"
#geobuf = "0.1.4"
#protobuf = "3.0.2"
geobuf = "0.1.4"
protobuf = "3.0.2"
#lapin = "2.3"
futures-lite = "2.3"
#gcp-bigquery-client = { git = "https://github.com/lquerel/gcp-bigquery-client.git", rev = "107a5557df6336933f6b0bcf330aa91fe6ca866a" }
Expand Down
2 changes: 1 addition & 1 deletion src/datamodel/batch_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ mod tests {
use crate::{
bus::message::Message,
config::load_configuration,
datamodel::{sensapp_vec::SensAppLabels, Sample, SensorType},
datamodel::{Sample, SensAppLabels, SensorType},
};

// Utility function to create a test sensor
Expand Down
3 changes: 3 additions & 0 deletions src/datamodel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod batch;
pub mod batch_builder;
pub mod sample;
pub mod sensapp_datetime;
pub mod sensapp_labels;
pub mod sensapp_vec;
pub mod sensor;
pub mod sensor_type;
Expand All @@ -10,6 +11,8 @@ pub mod unit;

pub use sample::Sample;
pub use sensapp_datetime::SensAppDateTime;
pub use sensapp_labels::SensAppLabels;
pub use sensapp_labels::SensAppLabelsExt;
pub use sensapp_vec::SensAppVec;
pub use sensor::Sensor;
pub use sensor_type::SensorType;
Expand Down
60 changes: 60 additions & 0 deletions src/datamodel/sensapp_labels.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::collections::BTreeMap;

use hybridmap::HybridMap;
use smallvec::SmallVec;

pub type SensAppLabels = SmallVec<[(String, String); 8]>;

pub trait SensAppLabelsExt {
fn build_with_iterators<'a>(
context_iterator: Option<impl Iterator<Item = (&'a String, &'a String)>>,
labels_iterator: Option<impl Iterator<Item = (String, String)>>,
) -> Option<Self>
where
Self: std::marker::Sized;

fn build_with_context<'a>(
context_reference: &Option<HybridMap<String, String>>,
labels_iterator: Option<impl Iterator<Item = (String, String)>>,
) -> Option<Self>
where
Self: std::marker::Sized;
}

impl SensAppLabelsExt for SensAppLabels {
fn build_with_iterators<'a>(
context_iterator: Option<impl Iterator<Item = (&'a String, &'a String)>>,
labels_iterator: Option<impl Iterator<Item = (String, String)>>,
) -> Option<Self> {
if let Some(context_iterator) = context_iterator {
let mut labels_builder = BTreeMap::new();

for (key, value) in context_iterator {
labels_builder.insert(key.clone(), value.clone());
}
if let Some(labels_iterator) = labels_iterator {
for (key, value) in labels_iterator {
let mut key_with_prefix = key;
while labels_builder.contains_key(&key_with_prefix) {
key_with_prefix.insert(0, '_');
}
labels_builder.insert(key_with_prefix, value);
}
}

Some(labels_builder.into_iter().collect())
} else {
labels_iterator.map(|labels_iterator| labels_iterator.collect())
}
}

fn build_with_context<'a>(
context_reference: &Option<HybridMap<String, String>>,
labels_iterator: Option<impl Iterator<Item = (String, String)>>,
) -> Option<Self> {
Self::build_with_iterators(
context_reference.as_ref().map(|context| context.iter()),
labels_iterator,
)
}
}
2 changes: 0 additions & 2 deletions src/datamodel/sensapp_vec.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use smallvec::SmallVec;

pub type SensAppVec<T> = SmallVec<[T; 4]>;

pub type SensAppLabels = SmallVec<[(String, String); 8]>;
2 changes: 1 addition & 1 deletion src/datamodel/sensor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::name_to_uuid::uuid_v8_blake3;

use super::{sensapp_vec::SensAppLabels, unit::Unit, SensorType};
use super::{unit::Unit, SensAppLabels, SensorType};
use anyhow::{anyhow, Error};
use smallvec::SmallVec;
use std::fmt;
Expand Down
2 changes: 1 addition & 1 deletion src/importers/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub async fn publish_csv_async<R: io::AsyncRead + Unpin + Send>(

println!("Reading CSV");
while let Some(record) = records.next().await {
let record = record.unwrap();
let _record = record.unwrap();
//println!("{:?}", record);

current_samples.push(Sample {
Expand Down
Loading

0 comments on commit 56ffe50

Please sign in to comment.