From 0bd80dd39d709172fd0f2476743448a25a786489 Mon Sep 17 00:00:00 2001 From: Le Chiffre Date: Wed, 24 Jan 2024 22:26:58 -0500 Subject: [PATCH] Initial abstraction with working Postgres and stub Rocksdb TBD: - abstract startup (db connection intialization) - decide how to select backend in config --- src/main.rs | 7 +- src/migration.rs | 36 ++++--- src/models/backend.rs | 236 ++++++++++++++++++++++++++++++++++++++++++ src/models/mod.rs | 78 +++++++------- src/routes.rs | 23 ++-- 5 files changed, 310 insertions(+), 70 deletions(-) create mode 100644 src/models/backend.rs diff --git a/src/main.rs b/src/main.rs index 5e78799..23cc631 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::models::MIGRATIONS; use crate::routes::*; use axum::extract::DefaultBodyLimit; @@ -34,7 +36,7 @@ const ALLOWED_LOCALHOST: &str = "http://127.0.0.1:"; #[derive(Clone)] pub struct State { - db_pool: Pool>, + backend: Arc, pub auth_key: Option, pub self_hosted: bool, pub secp: Secp256k1, @@ -86,8 +88,9 @@ async fn main() -> anyhow::Result<()> { .expect("migrations could not run"); } + let backend = Arc::new(models::backend::Postgres::new(db_pool)); let state = State { - db_pool, + backend, auth_key, self_hosted, secp, diff --git a/src/migration.rs b/src/migration.rs index 52acad6..874e6fd 100644 --- a/src/migration.rs +++ b/src/migration.rs @@ -1,3 +1,4 @@ +use crate::kv::KeyValue; use crate::models::VssItem; use crate::State; use anyhow::anyhow; @@ -6,7 +7,6 @@ use axum::headers::Authorization; use axum::http::StatusCode; use axum::{Extension, Json, TypedHeader}; use chrono::{DateTime, NaiveDateTime, Utc}; -use diesel::Connection; use log::{error, info}; use serde::{Deserialize, Deserializer}; use serde_json::json; @@ -46,6 +46,12 @@ where }) } +/// Migration assumes a v1 `get_object` response, which returns value as a base64-encoded string +/// +/// Environment: +/// - MIGRATION_URL +/// - MIGRATION_BATCH_SIZE (default 100) +/// - MIGRATION_START_INDEX (default 0) pub async fn migration_impl(admin_key: String, state: &State) -> anyhow::Result<()> { let client = Agent::new(); let Ok(url) = std::env::var("MIGRATION_URL") else { @@ -77,21 +83,25 @@ pub async fn migration_impl(admin_key: String, state: &State) -> anyhow::Result< .set("x-api-key", &admin_key) .send_string(&payload.to_string())?; let items: Vec = resp.into_json()?; - - let mut conn = state.db_pool.get().unwrap(); - - // Insert values into DB - conn.transaction::<_, anyhow::Error, _>(|conn| { - for item in items.iter() { - if let Ok(value) = base64::decode(&item.value) { - VssItem::put_item(conn, &item.store_id, &item.key, &value, item.version)?; - } + let nitems = items.len(); // we'll need this later and plan to consume items + + let backend = state.backend.clone(); + + // Original migration code didn't preserve timestamps + // neither does [`VssItem::from_kv`] + let vss_items: Vec<_> = items.into_iter().filter_map(|item| { + if let Ok(value) = base64::decode(&item.value) { + Some(VssItem::from_kv(&item.store_id, KeyValue::new(item.key, value, item.version))) + } else { + log::warn!("Failed to decode value during migration: {}", item.value); + None } + }).collect(); - Ok(()) - })?; + // Insert values into DB + backend.put_items(vss_items)?; - if items.len() < limit { + if nitems < limit { finished = true; } else { offset += limit; diff --git a/src/models/backend.rs b/src/models/backend.rs new file mode 100644 index 0000000..86af295 --- /dev/null +++ b/src/models/backend.rs @@ -0,0 +1,236 @@ +/// Selectable backend datastore +/// +/// Implementation Status +/// - [X] Postgres +/// - [ ] Sqlite +/// - [ ] Rocksdb +use crate::kv::KeyValue; +use crate::models::VssItem; +use diesel::prelude::*; +use diesel::r2d2::PooledConnection; +use diesel::r2d2::{ConnectionManager, Pool}; +use diesel::sql_query; +use diesel::sql_types::{BigInt, Bytea, Text}; +use diesel::PgConnection; + +/// For introspection on dyn trait object (since const generics for ADT still in nightly) +pub enum BackendType { + Postgres, + Rocksdb, +} + +pub struct Postgres { + db_pool: Pool>, +} + +impl Postgres { + pub fn new(db_pool: Pool>) -> Self { + Self { db_pool } + } + fn conn( + &self, + ) -> anyhow::Result>, anyhow::Error> + { + Ok(self.db_pool.get()?) + } +} + +pub struct Rocksdb; + +pub trait VssBackend: Send + Sync { + fn backend_type(&self) -> BackendType; + fn get_item(&self, store_id: &str, key: &str) -> anyhow::Result>; + + #[cfg(test)] + /// This method fetches its own connection even if called from within a tx, + /// so it is moved to cfg(test) to prevent errors. It is manually inlined + /// into [`VssBackend::put_items`] and [`VssBackend::put_items_in_store`] + fn put_item(&self, store_id: &str, key: &str, value: &[u8], version: i64) + -> anyhow::Result<()>; + + /// Wrap multiple item writes in a transaction + /// Takes `Vec<[VssItem]>`, ∴ Items may go to different `store_id`s + /// If you need atomic put for items all with same `store_id`, see [`VssBackend::put_items_in_store`] + fn put_items(&self, items: Vec) -> anyhow::Result<()>; + + /// Wrap multiple item writes in a transaction + /// Items all placed in same `store_id` + /// If you need atomic put to potentially different store_ids, see [`VssBackend::put_items`] + fn put_items_in_store(&self, store_id: &str, items: Vec) -> anyhow::Result<()>; + + fn list_key_versions( + &self, + store_id: &str, + prefix: Option<&str>, + ) -> anyhow::Result>; + + #[cfg(test)] + /// THIS WILL NUKE YOUR DATA + /// Make sure `DATABASE_URL`` is not same as your dev/stage/prod connection + /// when running `cargo test` + fn clear_database(&self); +} + +impl VssBackend for Postgres { + fn backend_type(&self) -> BackendType { + BackendType::Postgres + } + + fn get_item(&self, store_id: &str, key: &str) -> anyhow::Result> { + use super::schema::vss_db; + + let mut conn = self.conn()?; + + Ok(vss_db::table + .filter(vss_db::store_id.eq(store_id)) + .filter(vss_db::key.eq(key)) + .first::(&mut conn) + .optional()?) + } + + #[cfg(test)] + fn put_item( + &self, + store_id: &str, + key: &str, + value: &[u8], + version: i64, + ) -> anyhow::Result<()> { + let mut conn = self.conn()?; + + sql_query("SELECT upsert_vss_db($1, $2, $3, $4)") + .bind::(store_id) + .bind::(key) + .bind::(value) + .bind::(version) + .execute(&mut conn)?; + + Ok(()) + } + + fn put_items(&self, items: Vec) -> anyhow::Result<()> { + let mut conn = self.conn()?; + + conn.transaction::<_, anyhow::Error, _>(|conn| { + for item in items { + // VssItem.value is Option for unclear reasons + let value = match item.value { + None => vec![], + Some(v) => v, + }; + // Inline VssBackend::put_item which was moved to #[cfg(test)] only + sql_query("SELECT upsert_vss_db($1, $2, $3, $4)") + .bind::(item.store_id) + .bind::(item.key) + .bind::(value) + .bind::(item.version) + .execute(conn)?; + } + + Ok(()) + })?; + + Ok(()) + } + + fn put_items_in_store(&self, store_id: &str, items: Vec) -> anyhow::Result<()> { + let mut conn = self.conn()?; + + conn.transaction::<_, anyhow::Error, _>(|conn| { + for kv in items { + // Inline VssBackend::put_item which was moved to #[cfg(test)] only + sql_query("SELECT upsert_vss_db($1, $2, $3, $4)") + .bind::(store_id) + .bind::(kv.key) + .bind::(kv.value.0) + .bind::(kv.version) + .execute(conn)?; + } + + Ok(()) + })?; + + Ok(()) + } + + fn list_key_versions( + &self, + store_id: &str, + prefix: Option<&str>, + ) -> anyhow::Result> { + use super::schema::vss_db; + + let mut conn = self.conn()?; + + let table = vss_db::table + .filter(vss_db::store_id.eq(store_id)) + .select((vss_db::key, vss_db::version)); + + let res = match prefix { + None => table.load::<(String, i64)>(&mut conn)?, + Some(prefix) => table + .filter(vss_db::key.ilike(format!("{prefix}%"))) + .load::<(String, i64)>(&mut conn)?, + }; + + Ok(res) + } + + #[cfg(test)] + fn clear_database(&self) { + use crate::models::schema::vss_db; + + let mut conn = self.conn().unwrap(); + conn.transaction::<_, anyhow::Error, _>(|conn| { + diesel::delete(vss_db::table).execute(conn)?; + Ok(()) + }) + .unwrap(); + } +} + +impl VssBackend for Rocksdb { + fn backend_type(&self) -> BackendType { + BackendType::Rocksdb + } + + fn get_item(&self, _store_id: &str, _key: &str) -> anyhow::Result> { + todo!(); + //Ok(None) + } + + #[cfg(test)] + fn put_item( + &self, + _store_id: &str, + _key: &str, + _value: &[u8], + _version: i64, + ) -> anyhow::Result<()> { + todo!(); + } + + fn put_items(&self, _items: Vec) -> anyhow::Result<()> { + todo!(); + //Ok(()) + } + + fn put_items_in_store(&self, _store_id: &str, _items: Vec) -> anyhow::Result<()> { + todo!(); + //Ok(()) + } + + fn list_key_versions( + &self, + _store_id: &str, + _prefix: Option<&str>, + ) -> anyhow::Result> { + todo!(); + //Ok(vec![]) + } + + #[cfg(test)] + fn clear_database(&self) { + todo!(); + } +} diff --git a/src/models/mod.rs b/src/models/mod.rs index 0da8931..21403bc 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -6,6 +6,7 @@ use diesel_migrations::{embed_migrations, EmbeddedMigrations}; use schema::vss_db; use serde::{Deserialize, Serialize}; +pub mod backend; mod schema; pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); @@ -34,6 +35,17 @@ pub struct VssItem { } impl VssItem { + pub fn from_kv(store_id: &str, kv: KeyValue) -> Self { + Self { + store_id: String::from(store_id), + key: kv.key, + value: Some(kv.value.0), + version: kv.version, + // impl Default: unix epoch + created_date: chrono::NaiveDateTime::default(), + updated_date: chrono::NaiveDateTime::default(), + } + } pub fn into_kv(self) -> Option { self.value .map(|value| KeyValue::new(self.key, value, self.version)) @@ -96,6 +108,7 @@ mod test { use diesel_migrations::MigrationHarness; use secp256k1::Secp256k1; use std::str::FromStr; + use std::sync::Arc; const PUBKEY: &str = "04547d92b618856f4eda84a64ec32f1694c9608a3f9dc73e91f08b5daa087260164fbc9e2a563cf4c5ef9f4c614fd9dfca7582f8de429a4799a4b202fbe80a7db5"; @@ -119,38 +132,31 @@ mod test { let secp = Secp256k1::new(); + let backend = Arc::new(backend::Postgres::new(db_pool)); State { - db_pool, + backend, auth_key, self_hosted: false, secp, } } - fn clear_database(state: &State) { - let conn = &mut state.db_pool.get().unwrap(); - - conn.transaction::<_, anyhow::Error, _>(|conn| { - diesel::delete(vss_db::table).execute(conn)?; - Ok(()) - }) - .unwrap(); - } #[tokio::test] async fn test_vss_flow() { let state = init_state(); - clear_database(&state); + let backend = state.backend.clone(); + + backend.clear_database(); let store_id = "test_store_id"; let key = "test"; let value = [1, 2, 3]; let version = 0; - let mut conn = state.db_pool.get().unwrap(); - VssItem::put_item(&mut conn, store_id, key, &value, version).unwrap(); + backend.put_item(store_id, key, &value, version).unwrap(); - let versions = VssItem::list_key_versions(&mut conn, store_id, None).unwrap(); + let versions = backend.list_key_versions(store_id, None).unwrap(); assert_eq!(versions.len(), 1); assert_eq!(versions[0].0, key); @@ -159,36 +165,33 @@ mod test { let new_value = [4, 5, 6]; let new_version = version + 1; - VssItem::put_item(&mut conn, store_id, key, &new_value, new_version).unwrap(); + backend.put_item(store_id, key, &new_value, new_version).unwrap(); - let item = VssItem::get_item(&mut conn, store_id, key) - .unwrap() - .unwrap(); + let item = backend.get_item(store_id, key).unwrap().unwrap(); assert_eq!(item.store_id, store_id); assert_eq!(item.key, key); assert_eq!(item.value.unwrap(), new_value); assert_eq!(item.version, new_version); - clear_database(&state); + backend.clear_database(); } #[tokio::test] async fn test_max_version_number() { let state = init_state(); - clear_database(&state); + let backend = state.backend.clone(); + + backend.clear_database(); let store_id = "max_test_store_id"; let key = "max_test"; let value = [1, 2, 3]; let version = u32::MAX as i64; - let mut conn = state.db_pool.get().unwrap(); - VssItem::put_item(&mut conn, store_id, key, &value, version).unwrap(); + backend.put_item(store_id, key, &value, version).unwrap(); - let item = VssItem::get_item(&mut conn, store_id, key) - .unwrap() - .unwrap(); + let item = backend.get_item(store_id, key).unwrap().unwrap(); assert_eq!(item.store_id, store_id); assert_eq!(item.key, key); @@ -196,23 +199,23 @@ mod test { let new_value = [4, 5, 6]; - VssItem::put_item(&mut conn, store_id, key, &new_value, version).unwrap(); + backend.put_item(store_id, key, &new_value, version).unwrap(); - let item = VssItem::get_item(&mut conn, store_id, key) - .unwrap() - .unwrap(); + let item = backend.get_item(store_id, key).unwrap().unwrap(); assert_eq!(item.store_id, store_id); assert_eq!(item.key, key); assert_eq!(item.value.unwrap(), new_value); - clear_database(&state); + backend.clear_database(); } #[tokio::test] async fn test_list_key_versions() { let state = init_state(); - clear_database(&state); + let backend = state.backend.clone(); + + backend.clear_database(); let store_id = "list_kv_test_store_id"; let key = "kv_test"; @@ -220,24 +223,23 @@ mod test { let value = [1, 2, 3]; let version = 0; - let mut conn = state.db_pool.get().unwrap(); - VssItem::put_item(&mut conn, store_id, key, &value, version).unwrap(); + backend.put_item(store_id, key, &value, version).unwrap(); - VssItem::put_item(&mut conn, store_id, key1, &value, version).unwrap(); + backend.put_item(store_id, key1, &value, version).unwrap(); - let versions = VssItem::list_key_versions(&mut conn, store_id, None).unwrap(); + let versions = backend.list_key_versions(store_id, None).unwrap(); assert_eq!(versions.len(), 2); - let versions = VssItem::list_key_versions(&mut conn, store_id, Some("kv")).unwrap(); + let versions = backend.list_key_versions(store_id, Some("kv")).unwrap(); assert_eq!(versions.len(), 1); assert_eq!(versions[0].0, key); assert_eq!(versions[0].1, version); - let versions = VssItem::list_key_versions(&mut conn, store_id, Some("other")).unwrap(); + let versions = backend.list_key_versions(store_id, Some("other")).unwrap(); assert_eq!(versions.len(), 1); assert_eq!(versions[0].0, key1); assert_eq!(versions[0].1, version); - clear_database(&state); + backend.clear_database(); } } diff --git a/src/routes.rs b/src/routes.rs index 7003a73..de97fee 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -1,12 +1,10 @@ use crate::auth::verify_token; use crate::kv::{KeyValue, KeyValueOld}; -use crate::models::VssItem; use crate::{State, ALLOWED_LOCALHOST, ALLOWED_ORIGINS, ALLOWED_SUBDOMAIN}; use axum::headers::authorization::Bearer; use axum::headers::{Authorization, Origin}; use axum::http::StatusCode; use axum::{Extension, Json, TypedHeader}; -use diesel::Connection; use log::{debug, error, trace}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; @@ -53,9 +51,8 @@ pub async fn get_object_impl( trace!("get_object_impl: {req:?}"); let store_id = req.store_id.expect("must have"); - let mut conn = state.db_pool.get()?; - - let item = VssItem::get_item(&mut conn, &store_id, &req.key)?; + let backend = state.backend.clone(); + let item = backend.get_item(&store_id, &req.key)?; Ok(item.and_then(|i| i.into_kv())) } @@ -125,15 +122,8 @@ pub async fn put_objects_impl(req: PutObjectsRequest, state: &State) -> anyhow:: let store_id = req.store_id.expect("must have"); - let mut conn = state.db_pool.get()?; - - conn.transaction::<_, anyhow::Error, _>(|conn| { - for kv in req.transaction_items { - VssItem::put_item(conn, &store_id, &kv.key, &kv.value.0, kv.version)?; - } - - Ok(()) - })?; + let backend = state.backend.clone(); + backend.put_items_in_store(&store_id, req.transaction_items)?; Ok(()) } @@ -176,9 +166,8 @@ pub async fn list_key_versions_impl( // todo pagination let store_id = req.store_id.expect("must have"); - let mut conn = state.db_pool.get()?; - - let versions = VssItem::list_key_versions(&mut conn, &store_id, req.key_prefix.as_deref())?; + let backend = state.backend.clone(); + let versions = backend.list_key_versions(&store_id, req.key_prefix.as_deref())?; let json = versions .into_iter()