Skip to content

Commit

Permalink
Update triage_response interface
Browse files Browse the repository at this point in the history
  • Loading branch information
minshao committed Feb 26, 2024
1 parent 2619b55 commit e6fc894
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 185 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ oinq = { git = "https://github.com/petabi/oinq.git", tag = "0.9.1" }
reqwest = { version = "0.11", default-features = false, features = [
"rustls-tls-native-roots",
] }
review-database = { git = "https://github.com/petabi/review-database.git", rev = "d13d89c6" }
review-database = { git = "https://github.com/petabi/review-database.git", rev = "258802f8" }
roxy = { git = "https://github.com/aicers/roxy.git", tag = "0.2.1" }
rustls = "0.21"
rustls-native-certs = "0.6"
Expand Down
19 changes: 5 additions & 14 deletions src/graphql/outlier.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
use super::{
always_true,
model::ModelDigest,
triage::response::{key, TriageResponse},
Role, RoleGuard, DEFAULT_CONNECTION_SIZE,
};
use super::{always_true, model::ModelDigest, Role, RoleGuard, DEFAULT_CONNECTION_SIZE};
use crate::graphql::{earliest_key, latest_key};
use anyhow::anyhow;
use async_graphql::{
Expand Down Expand Up @@ -778,7 +773,7 @@ async fn load_ranked_outliers_with_filter(
#[allow(clippy::type_complexity, clippy::too_many_arguments)] // since this is called within `load` only
fn load_nodes_with_search_filter<'m, M, I>(
map: &'m M,
remarks_map: &review_database::IndexedMap<'_>,
remarks_map: &review_database::IndexedTable<'_, review_database::TriageResponse>,
tags_map: &review_database::IndexedSet<'_>,
filter: &Option<SearchFilterInput>,
after: Option<String>,
Expand Down Expand Up @@ -859,7 +854,7 @@ where
#[allow(clippy::too_many_lines)]
fn iter_through_search_filter_nodes<I>(
iter: I,
remarks_map: &review_database::IndexedMap<'_>,
remarks_map: &review_database::IndexedTable<'_, review_database::TriageResponse>,
tags_map: &review_database::IndexedSet<'_>,
to: &[u8],
cond: fn(cmp::Ordering) -> bool,
Expand Down Expand Up @@ -906,18 +901,14 @@ where

if let Some(filter) = filter {
if filter.remark.is_some() || tag_id_list.is_some() {
let key = key(&node.source, Utc.timestamp_nanos(node.id));
if let Some(value) = remarks_map.get_by_key(&key)? {
let value: TriageResponse = bincode::DefaultOptions::new()
.deserialize(value.as_ref())
.map_err(|_| "invalid value in database")?;
if let Some(value) = remarks_map.get(&node.source, &Utc.timestamp_nanos(node.id))? {
if let Some(remark) = &filter.remark {
if !value.remarks.contains(remark) {
continue;
}
}
if let Some(tag_ids) = &tag_id_list {
if !tag_ids.iter().any(|tag| value.tag_ids.contains(tag)) {
if !tag_ids.iter().any(|tag| value.tag_ids().contains(tag)) {
continue;
}
}
Expand Down
206 changes: 36 additions & 170 deletions src/graphql/triage/response.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,34 @@
use std::borrow::Cow;

use super::{Role, RoleGuard};
use async_graphql::{
connection::{query, Connection, EmptyFields},
types::ID,
ComplexObject, Context, InputObject, Object, Result, SimpleObject,
Context, InputObject, Object, Result,
};
use bincode::Options;
use chrono::{DateTime, Utc};
use review_database::{
types::FromKeyValue, Indexable, Indexed, IndexedMap, IndexedMapIterator, IndexedMapUpdate,
IterableMap,
};
use serde::{Deserialize, Serialize};

#[allow(clippy::module_name_repetitions)]
#[derive(Deserialize, Serialize, SimpleObject)]
#[graphql(complex)]
pub struct TriageResponse {
#[graphql(skip)]
id: u32,
key: Vec<u8>,
source: String,
time: DateTime<Utc>,
pub tag_ids: Vec<u32>,
pub remarks: String,
creation_time: DateTime<Utc>,
last_modified_time: DateTime<Utc>,
inner: review_database::TriageResponse,
}

impl From<review_database::TriageResponse> for TriageResponse {
fn from(inner: review_database::TriageResponse) -> Self {
Self { inner }
}
}

#[ComplexObject]
#[Object]
impl TriageResponse {
async fn id(&self) -> ID {
ID(self.id.to_string())
ID(self.inner.id.to_string())
}

async fn remarks(&self) -> &str {
&self.inner.remarks
}
}

impl FromKeyValue for TriageResponse {
fn from_key_value(_key: &[u8], value: &[u8]) -> anyhow::Result<Self> {
Ok(bincode::DefaultOptions::new().deserialize(value)?)
async fn tag_ids(&self) -> &[u32] {
&self.inner.tag_ids()
}
}

Expand All @@ -48,24 +38,11 @@ struct TriageResponseTotalCount;
impl TriageResponseTotalCount {
/// The total number of edges.
async fn total_count(&self, ctx: &Context<'_>) -> Result<usize> {
let store = crate::graphql::get_store(ctx).await?;
Ok(store.triage_response_map().count()?)
}
}

impl Indexable for TriageResponse {
fn key(&self) -> Cow<[u8]> {
Cow::Borrowed(&self.key)
}
use review_database::{Direction, Iterable};

fn value(&self) -> Vec<u8> {
bincode::DefaultOptions::new()
.serialize(self)
.expect("serializable")
}

fn set_index(&mut self, index: u32) {
self.id = index;
let store = crate::graphql::get_store(ctx).await?;
let map = store.triage_response_map();
Ok(map.iter(Direction::Forward, None).count())
}
}

Expand All @@ -76,6 +53,12 @@ pub(super) struct TriageResponseInput {
remarks: Option<String>,
}

impl From<TriageResponseInput> for review_database::TriageResponseUpdate {
fn from(input: TriageResponseInput) -> Self {
Self::new(input.key, input.tag_ids, input.remarks)
}
}

#[Object]
impl super::TriageResponseQuery {
/// A list of triage responses.
Expand Down Expand Up @@ -110,18 +93,7 @@ impl super::TriageResponseQuery {
) -> Result<Option<TriageResponse>> {
let store = crate::graphql::get_store(ctx).await?;
let map = store.triage_response_map();
let key = key(&source, time);

let value = if let Some(value) = map.get_by_key(&key)? {
let value: TriageResponse = bincode::DefaultOptions::new()
.deserialize(value.as_ref())
.map_err(|_| "invalid value in database")?;
Some(value)
} else {
None
};

Ok(value)
Ok(map.get(&source, &time)?.map(Into::into))
}
}

Expand All @@ -133,22 +105,8 @@ async fn load(
last: Option<usize>,
) -> Result<Connection<String, TriageResponse, TriageResponseTotalCount, EmptyFields>> {
let store = crate::graphql::get_store(ctx).await?;
let map = store.triage_response_map();
super::super::load::<
'_,
IndexedMap,
IndexedMapIterator,
TriageResponse,
TriageResponse,
TriageResponseTotalCount,
>(&map, after, before, first, last, TriageResponseTotalCount)
}

pub fn key(source: &str, time: DateTime<Utc>) -> Vec<u8> {
let mut key = Vec::new();
key.extend_from_slice(source.as_bytes());
key.extend_from_slice(&time.timestamp_nanos_opt().unwrap_or_default().to_be_bytes());
key
let table = store.triage_response_map();
crate::graphql::load_edges(&table, after, before, first, last, TriageResponseTotalCount)
}

#[Object]
Expand All @@ -162,26 +120,13 @@ impl super::TriageResponseMutation {
ctx: &Context<'_>,
source: String,
time: DateTime<Utc>,
mut tag_ids: Vec<u32>,
tag_ids: Vec<u32>,
remarks: String,
) -> Result<ID> {
let key = key(&source, time);
tag_ids.sort_unstable();
tag_ids.dedup();
let time = Utc::now();
let pol = TriageResponse {
id: u32::MAX,
key,
source,
time,
tag_ids,
remarks,
creation_time: time,
last_modified_time: time,
};
let pol = review_database::TriageResponse::new(source, time, tag_ids, remarks);
let store = crate::graphql::get_store(ctx).await?;
let map = store.triage_response_map();
let id = map.insert(pol)?;
let id = map.put(pol)?;

Ok(ID(id.to_string()))
}
Expand Down Expand Up @@ -223,97 +168,18 @@ impl super::TriageResponseMutation {
let i = id.as_str().parse::<u32>().map_err(|_| "invalid ID")?;

let store = crate::graphql::get_store(ctx).await?;
let map = store.triage_response_map();
let mut map = store.triage_response_map();
let old: review_database::TriageResponseUpdate = old.into();
let new: review_database::TriageResponseUpdate = new.into();
map.update(i, &old, &new)?;

Ok(id)
}
}

impl IndexedMapUpdate for TriageResponseInput {
type Entry = TriageResponse;

fn key(&self) -> Option<Cow<[u8]>> {
Some(Cow::Borrowed(&self.key))
}

fn apply(&self, mut value: Self::Entry) -> Result<Self::Entry, anyhow::Error> {
if let Some(remarks) = self.remarks.as_deref() {
value.remarks.clear();
value.remarks.push_str(remarks);
}
if let Some(tag_ids) = self.tag_ids.as_deref() {
let mut tag_ids = tag_ids.to_vec();
tag_ids.sort_unstable();
tag_ids.dedup();
value.tag_ids.clear();
value.tag_ids.extend(tag_ids.iter());
}
value.last_modified_time = Utc::now();

Ok(value)
}

fn verify(&self, value: &Self::Entry) -> bool {
if self.key != value.key {
return false;
}
if let Some(remarks) = self.remarks.as_deref() {
if remarks != value.remarks {
return false;
}
}
if let Some(tag_ids) = self.tag_ids.as_deref() {
let mut tag_ids = tag_ids.to_vec();
tag_ids.sort_unstable();
tag_ids.dedup();
if tag_ids != value.tag_ids {
return false;
}
}

true
}
}

pub(in crate::graphql) async fn remove_tag(ctx: &Context<'_>, tag_id: u32) -> Result<()> {
let store = crate::graphql::get_store(ctx).await?;
let map = store.triage_response_map();
let mut updates = Vec::new();
for (_, value) in map.iter_forward()? {
let triage_response = bincode::DefaultOptions::new()
.deserialize::<TriageResponse>(value.as_ref())
.map_err(|_| "invalid value in database")?;
if triage_response.tag_ids.iter().all(|x| *x != tag_id) {
continue;
}

let old_tag_ids = triage_response.tag_ids;
let mut new_tag_ids = old_tag_ids.clone();
new_tag_ids.retain(|&id| id != tag_id);
updates.push((
triage_response.id,
triage_response.key,
old_tag_ids,
new_tag_ids,
triage_response.remarks,
));
}

for (id, key, old_tag_ids, new_tag_ids, remarks) in updates {
let old = TriageResponseInput {
key: key.clone(),
tag_ids: Some(old_tag_ids),
remarks: Some(remarks.clone()),
};
let new = TriageResponseInput {
key: key.clone(),
tag_ids: Some(new_tag_ids),
remarks: Some(remarks),
};
map.update(id, &old, &new)
.map_err(|e| format!("failed to update triage response: {e}"))?;
}

Ok(())
Ok(map.remove_tag(tag_id)?)
}

0 comments on commit e6fc894

Please sign in to comment.