Skip to content

Commit

Permalink
Use OpaqueCursor instead of IndexedKey
Browse files Browse the repository at this point in the history
This change removes the reliance on a separate type (`IndexedKey`)
implementing `CursorType`.
  • Loading branch information
msk committed Dec 5, 2024
1 parent 238fb77 commit ab541d5
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 83 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ bincode = "1"
chrono = { version = ">=0.4.35", default-features = false, features = [
"serde",
] }
data-encoding = "2"
futures = "0.3"
futures-util = "0.3"
http = "1"
Expand Down
2 changes: 0 additions & 2 deletions src/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,6 @@ where

#[derive(Debug, thiserror::Error)]
enum Error {
#[error("Invalid cursor")]
InvalidCursor,
#[error("The value of first and last must be within 0-100")]
InvalidLimitValue,
#[error("You must provide a `first` or `last` value to properly paginate a connection.")]
Expand Down
18 changes: 9 additions & 9 deletions src/graphql/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use async_graphql::{
connection::{Connection, Edge, EmptyFields},
connection::{Connection, Edge, EmptyFields, OpaqueCursor},
types::ID,
ComplexObject, Context, Object, Result, SimpleObject, StringNumber,
};
Expand All @@ -17,7 +17,7 @@ use super::{
get_trend,
model::{ModelDigest, TopElementCountsByColumn},
qualifier::Qualifier,
slicing::{self, IndexedKey},
slicing,
status::Status,
Role, RoleGuard, DEFAULT_CUTOFF_RATE, DEFAULT_TRENDI_ORDER,
};
Expand Down Expand Up @@ -46,7 +46,7 @@ impl ClusterQuery {
before: Option<String>,
first: Option<i32>,
last: Option<i32>,
) -> Result<Connection<IndexedKey<i64>, Cluster, ClusterTotalCount, EmptyFields>> {
) -> Result<Connection<OpaqueCursor<(i32, i64)>, Cluster, ClusterTotalCount, EmptyFields>> {
let model = model.as_str().parse()?;
let categories = try_id_args_into_ints(categories)?;
let detectors = try_id_args_into_ints(detectors)?;
Expand Down Expand Up @@ -357,11 +357,11 @@ async fn load(
detectors: Option<Vec<i32>>,
qualifiers: Option<Vec<i32>>,
statuses: Option<Vec<i32>>,
after: Option<IndexedKey<i64>>,
before: Option<IndexedKey<i64>>,
after: Option<OpaqueCursor<(i32, i64)>>,
before: Option<OpaqueCursor<(i32, i64)>>,
first: Option<usize>,
last: Option<usize>,
) -> Result<Connection<IndexedKey<i64>, Cluster, ClusterTotalCount, EmptyFields>> {
) -> Result<Connection<OpaqueCursor<(i32, i64)>, Cluster, ClusterTotalCount, EmptyFields>> {
let is_first = first.is_some();
let limit = slicing::len(first, last)?;
let db = ctx.data::<Database>()?;
Expand All @@ -372,8 +372,8 @@ async fn load(
detectors.as_deref(),
qualifiers.as_deref(),
statuses.as_deref(),
&after.map(Into::into),
&before.map(Into::into),
&after.map(|c| c.0),
&before.map(|c| c.0),
is_first,
limit,
)
Expand All @@ -393,7 +393,7 @@ async fn load(
);
connection.edges.extend(rows.into_iter().map(|c| {
Edge::new(
IndexedKey::new(c.id, c.size),
OpaqueCursor((c.id, c.size)),
Cluster {
id: c.id,
name: c.cluster_id,
Expand Down
27 changes: 10 additions & 17 deletions src/graphql/model.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use async_graphql::{
connection::{Connection, Edge, EmptyFields},
connection::{Connection, Edge, EmptyFields, OpaqueCursor},
types::ID,
Context, Object, Result, SimpleObject, StringNumber,
};
Expand All @@ -12,11 +12,8 @@ use review_database::{self as database, Database};
use tokio::sync::RwLock;

use super::{
cluster::TimeCount,
data_source::DataSource,
fill_vacant_time_slots, get_trend,
slicing::{self, IndexedKey},
Role, RoleGuard, DEFAULT_CUTOFF_RATE, DEFAULT_TRENDI_ORDER,
cluster::TimeCount, data_source::DataSource, fill_vacant_time_slots, get_trend, slicing, Role,
RoleGuard, DEFAULT_CUTOFF_RATE, DEFAULT_TRENDI_ORDER,
};
use crate::graphql::query;

Expand All @@ -40,7 +37,8 @@ impl ModelQuery {
before: Option<String>,
first: Option<i32>,
last: Option<i32>,
) -> Result<Connection<IndexedKey<String>, ModelDigest, ModelTotalCount, EmptyFields>> {
) -> Result<Connection<OpaqueCursor<(i32, String)>, ModelDigest, ModelTotalCount, EmptyFields>>
{
query(
after,
before,
Expand Down Expand Up @@ -902,29 +900,24 @@ impl ModelTotalCount {

async fn load(
ctx: &Context<'_>,
after: Option<IndexedKey<String>>,
before: Option<IndexedKey<String>>,
after: Option<OpaqueCursor<(i32, String)>>,
before: Option<OpaqueCursor<(i32, String)>>,
first: Option<usize>,
last: Option<usize>,
) -> Result<Connection<IndexedKey<String>, ModelDigest, ModelTotalCount, EmptyFields>> {
) -> Result<Connection<OpaqueCursor<(i32, String)>, ModelDigest, ModelTotalCount, EmptyFields>> {
let is_first = first.is_some();
let limit = slicing::len(first, last)?;
let db = ctx.data::<Database>()?;
let rows = db
.load_models(
&after.map(Into::into),
&before.map(Into::into),
is_first,
limit,
)
.load_models(&after.map(|c| c.0), &before.map(|c| c.0), is_first, limit)
.await?;

let (rows, has_previous, has_next) = slicing::page_info(is_first, limit, rows);
let mut connection =
Connection::with_additional_fields(has_previous, has_next, ModelTotalCount);
connection.edges.extend(
rows.into_iter()
.map(|model| Edge::new(IndexedKey::new(model.id, model.name.clone()), model.into())),
.map(|model| Edge::new(OpaqueCursor((model.id, model.name.clone())), model.into())),
);
Ok(connection)
}
43 changes: 0 additions & 43 deletions src/graphql/slicing.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,5 @@
use std::{fmt, str::FromStr};

use async_graphql::connection::CursorType;
use data_encoding::BASE64;

use super::Error;

pub(super) struct IndexedKey<T> {
pub(crate) id: i32,
pub(crate) value: T,
}

impl<T> IndexedKey<T> {
pub fn new(id: i32, value: T) -> Self {
Self { id, value }
}
}

impl<T> From<IndexedKey<T>> for (i32, T) {
fn from(key: IndexedKey<T>) -> Self {
(key.id, key.value)
}
}

impl<T: FromStr + fmt::Display> CursorType for IndexedKey<T> {
type Error = super::Error;

fn decode_cursor(s: &str) -> Result<Self, Self::Error> {
let decoded = String::from_utf8(
BASE64
.decode(s.as_bytes())
.map_err(|_| Error::InvalidCursor)?,
)
.map_err(|_| Error::InvalidCursor)?;
let (id, value) = decoded.split_once(':').ok_or(Error::InvalidCursor)?;
let id = id.parse().map_err(|_| Error::InvalidCursor)?;
let value = value.parse::<T>().map_err(|_| Error::InvalidCursor)?;
Ok(Self { id, value })
}

fn encode_cursor(&self) -> String {
BASE64.encode(format!("{}:{}", self.id, self.value).as_bytes())
}
}

// This internal method should be called after validating pagination paramerters by either
// `grapqhl::query` or `grapqhl::query_with_constraints`.
pub(crate) fn len(first: Option<usize>, last: Option<usize>) -> Result<usize, Error> {
Expand Down
19 changes: 8 additions & 11 deletions src/graphql/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ use num_traits::ToPrimitive;
use review_database::{BatchInfo, Database};
use serde_json::Value as JsonValue;

use super::{
slicing::{self, IndexedKey},
Role, RoleGuard,
};
use super::{slicing, Role, RoleGuard};
use crate::graphql::{query, query_with_constraints};

#[derive(Default)]
Expand Down Expand Up @@ -50,7 +47,7 @@ impl StatisticsQuery {
last: Option<i32>,
) -> Result<
Connection<
IndexedKey<i64>,
OpaqueCursor<(i32, i64)>,
Round,
TotalCountByCluster,
EmptyFields,
Expand Down Expand Up @@ -172,13 +169,13 @@ impl EdgeNameType for RoundByClusterEdge {
async fn load_rounds_by_cluster(
ctx: &Context<'_>,
cluster: i32,
after: Option<IndexedKey<i64>>,
before: Option<IndexedKey<i64>>,
after: Option<OpaqueCursor<(i32, i64)>>,
before: Option<OpaqueCursor<(i32, i64)>>,
first: Option<usize>,
last: Option<usize>,
) -> Result<
Connection<
IndexedKey<i64>,
OpaqueCursor<(i32, i64)>,
Round,
TotalCountByCluster,
EmptyFields,
Expand All @@ -192,8 +189,8 @@ async fn load_rounds_by_cluster(
let (model, batches) = db
.load_rounds_by_cluster(
cluster,
&after.map(|k| i64_to_naive_date_time(k.value)),
&before.map(|k| i64_to_naive_date_time(k.value)),
&after.map(|k| i64_to_naive_date_time(k.0 .1)),
&before.map(|k| i64_to_naive_date_time(k.0 .1)),
is_first,
limit + 1,
)
Expand Down Expand Up @@ -222,7 +219,7 @@ async fn load_rounds_by_cluster(
connection.edges.extend(
batch_infos
.into_iter()
.map(|row| Edge::new(IndexedKey::new(cluster, row.inner.id), row.into())),
.map(|row| Edge::new(OpaqueCursor((cluster, row.inner.id)), row.into())),
);
Ok(connection)
}
Expand Down

0 comments on commit ab541d5

Please sign in to comment.