From a31069aece54c72ebbb1d960a21930ab49e819c0 Mon Sep 17 00:00:00 2001 From: David Justice Date: Fri, 9 Jun 2023 17:57:16 -0400 Subject: [PATCH 1/8] serve content through handler rather than file server Signed-off-by: David Justice --- .gitignore | 1 + crates/server/src/api/mod.rs | 2 - crates/server/src/api/v1/package.rs | 63 ++++++++++++++++++++++------- 3 files changed, 49 insertions(+), 17 deletions(-) diff --git a/.gitignore b/.gitignore index 83abdf7b..3ae03816 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /content /target *.swp +content diff --git a/crates/server/src/api/mod.rs b/crates/server/src/api/mod.rs index 72e779fb..b0e038fd 100644 --- a/crates/server/src/api/mod.rs +++ b/crates/server/src/api/mod.rs @@ -7,7 +7,6 @@ use std::{path::PathBuf, sync::Arc}; use tower::ServiceBuilder; use tower_http::{ cors::{Any, CorsLayer}, - services::ServeDir, trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer}, LatencyUnit, }; @@ -43,7 +42,6 @@ pub fn create_router( record_policy, ), ) - .nest_service("/content", ServeDir::new(files_dir)) .layer( ServiceBuilder::new() .layer( diff --git a/crates/server/src/api/v1/package.rs b/crates/server/src/api/v1/package.rs index 8e0f2a5c..fad4376c 100644 --- a/crates/server/src/api/v1/package.rs +++ b/crates/server/src/api/v1/package.rs @@ -18,9 +18,12 @@ use axum::{ use futures::StreamExt; use std::sync::Arc; use std::{collections::HashMap, path::PathBuf}; +use axum::body::StreamBody; +use axum::http::header; use tempfile::NamedTempFile; use tokio::io::AsyncWriteExt; use url::Url; +use tokio_util::io::ReaderStream; use warg_api::v1::package::{ ContentSource, MissingContent, PackageError, PackageRecord, PackageRecordState, PublishRecordRequest, UploadEndpoint, @@ -65,10 +68,8 @@ impl Config { Router::new() .route("/:log_id/record", post(publish_record)) .route("/:log_id/record/:record_id", get(get_record)) - .route( - "/:log_id/record/:record_id/content/:digest", - post(upload_content), - ) + .route("/:log_id/record/:record_id/content/:digest", post(upload_content)) + .route("/:log_id/record/:record_id/content/:digest", get(fetch_content)) .with_state(self) } @@ -84,13 +85,14 @@ impl Config { self.files_dir.join(self.content_file_name(digest)) } - fn content_url(&self, digest: &AnyHash) -> String { - self.content_base_url - .join("content/") - .unwrap() - .join(&self.content_file_name(digest)) - .unwrap() - .to_string() + fn content_url(&self, + log_id: &LogId, + record_id: &RecordId, + digest: &AnyHash) -> String { + format!( + "{url}/{log_id}/record/{record_id}/content/{digest}", + url = self.content_base_url, + ) } fn build_missing_content<'a>( @@ -138,6 +140,13 @@ impl PackageApiError { message: message.to_string(), }) } + + fn not_found(message: impl ToString) -> Self { + Self(PackageError::Message { + status: StatusCode::NOT_FOUND.as_u16(), + message: message.to_string(), + }) + } } impl From for PackageApiError { @@ -240,7 +249,7 @@ async fn publish_record( if missing.is_empty() { config .core_service - .submit_package_record(log_id, record_id.clone()) + .submit_package_record(log_id.clone(), record_id.clone()) .await; return Ok(( @@ -300,7 +309,7 @@ async fn get_record( ( digest.clone(), vec![ContentSource::Http { - url: config.content_url(digest), + url: config.content_url(&log_id, &record_id, digest), }], ) }) @@ -384,13 +393,13 @@ async fn upload_content( { config .core_service - .submit_package_record(log_id, record_id) + .submit_package_record(log_id.clone(), record_id.clone()) .await; } Ok(( StatusCode::CREATED, - [(axum::http::header::LOCATION, config.content_url(&digest))], + [(header::LOCATION, config.content_url(&log_id, &record_id, &digest))], )) } @@ -437,3 +446,27 @@ async fn process_content( Ok(()) } + +#[debug_handler] +async fn fetch_content( + State(config): State, + Path((_log_id, _record_id, digest)): Path<(LogId, RecordId, AnyHash)>, +) -> Result { + let file = match tokio::fs::File::open(config.content_path(&digest)).await { + Ok(file) => file, + Err(_err) => return Err(PackageApiError::not_found(format!("content with digest `{digest}` not found"))), + }; + + // convert the `AsyncRead` into a `Stream` + let stream = ReaderStream::new(file); + // convert the `Stream` into an `axum::body::HttpBody` + let body = StreamBody::new(stream); + + let disposition = &format!("attachment; filename=\"{digest}\"", digest = digest.clone()); + let headers = [ + (header::CONTENT_TYPE, "application/wasm; charset=utf-8"), + (header::CONTENT_DISPOSITION, &String::from(disposition)), + ]; + + Ok((headers, body).into_response()) +} From e597c34e171f878913baaf69dffa21e1918f606d Mon Sep 17 00:00:00 2001 From: David Justice Date: Thu, 22 Jun 2023 18:49:23 -0400 Subject: [PATCH 2/8] add content store abstraction and local implementation Signed-off-by: David Justice --- crates/server/src/api/mod.rs | 5 +- crates/server/src/api/v1/mod.rs | 5 +- crates/server/src/api/v1/package.rs | 58 ++++++++++-------- crates/server/src/bin/warg-server.rs | 22 +++++++ crates/server/src/contentstore/local.rs | 61 +++++++++++++++++++ crates/server/src/contentstore/mod.rs | 40 ++++++++++++ crates/server/src/datastore/memory.rs | 12 ++++ crates/server/src/datastore/mod.rs | 6 ++ crates/server/src/datastore/postgres/mod.rs | 11 ++++ .../server/src/datastore/postgres/models.rs | 2 +- crates/server/src/lib.rs | 9 ++- 11 files changed, 197 insertions(+), 34 deletions(-) create mode 100644 crates/server/src/contentstore/local.rs create mode 100644 crates/server/src/contentstore/mod.rs diff --git a/crates/server/src/api/mod.rs b/crates/server/src/api/mod.rs index b0e038fd..5ca66e19 100644 --- a/crates/server/src/api/mod.rs +++ b/crates/server/src/api/mod.rs @@ -12,6 +12,7 @@ use tower_http::{ }; use tracing::{Level, Span}; use url::Url; +use crate::contentstore::ContentStore; pub mod v1; @@ -23,9 +24,9 @@ pub fn create_router( content_base_url: Url, core: CoreService, temp_dir: PathBuf, - files_dir: PathBuf, content_policy: Option>, record_policy: Option>, + content_store: Arc, ) -> Router { let router = Router::new(); #[cfg(feature = "debug")] @@ -37,9 +38,9 @@ pub fn create_router( content_base_url, core, temp_dir, - files_dir.clone(), content_policy, record_policy, + content_store, ), ) .layer( diff --git a/crates/server/src/api/v1/mod.rs b/crates/server/src/api/v1/mod.rs index 349fc7b3..f746dd6f 100644 --- a/crates/server/src/api/v1/mod.rs +++ b/crates/server/src/api/v1/mod.rs @@ -15,6 +15,7 @@ use axum::{ use serde::{Serialize, Serializer}; use std::{path::PathBuf, sync::Arc}; use url::Url; +use crate::contentstore::ContentStore; pub mod fetch; pub mod package; @@ -93,18 +94,18 @@ pub fn create_router( content_base_url: Url, core: CoreService, temp_dir: PathBuf, - files_dir: PathBuf, content_policy: Option>, record_policy: Option>, + content_store: Arc, ) -> Router { let proof_config = proof::Config::new(core.clone()); let package_config = package::Config::new( core.clone(), content_base_url, - files_dir, temp_dir, content_policy, record_policy, + content_store, ); let fetch_config = fetch::Config::new(core); diff --git a/crates/server/src/api/v1/package.rs b/crates/server/src/api/v1/package.rs index fad4376c..13d0b974 100644 --- a/crates/server/src/api/v1/package.rs +++ b/crates/server/src/api/v1/package.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use super::{Json, Path}; use crate::{ datastore::{DataStoreError, RecordStatus}, @@ -6,6 +7,7 @@ use crate::{ record::{RecordPolicy, RecordPolicyError}, }, services::CoreService, + contentstore::ContentStore, }; use axum::{ debug_handler, @@ -39,28 +41,28 @@ use warg_protocol::{ pub struct Config { core_service: CoreService, content_base_url: Url, - files_dir: PathBuf, temp_dir: PathBuf, content_policy: Option>, record_policy: Option>, + content_store: Arc, } impl Config { pub fn new( core_service: CoreService, content_base_url: Url, - files_dir: PathBuf, temp_dir: PathBuf, content_policy: Option>, record_policy: Option>, + content_store: Arc, ) -> Self { Self { core_service, content_base_url, - files_dir, temp_dir, content_policy, record_policy, + content_store, } } @@ -73,18 +75,6 @@ impl Config { .with_state(self) } - fn content_present(&self, digest: &AnyHash) -> bool { - self.content_path(digest).is_file() - } - - fn content_file_name(&self, digest: &AnyHash) -> String { - digest.to_string().replace(':', "-") - } - - fn content_path(&self, digest: &AnyHash) -> PathBuf { - self.files_dir.join(self.content_file_name(digest)) - } - fn content_url(&self, log_id: &LogId, record_id: &RecordId, @@ -236,8 +226,17 @@ async fn publish_record( .await?; let record_id = RecordId::package_record::(&record); - let mut missing = record.as_ref().contents(); - missing.retain(|d| !config.content_present(d)); + let mut missing = HashSet::<&AnyHash>::new(); + for key in record.as_ref().contents() { + if !config + .content_store + .content_present(&body.id, key) + .await + .map_err(PackageApiError::internal_error)? + { + missing.insert(key); + } + } config .core_service @@ -380,8 +379,13 @@ async fn upload_content( // Only persist the file if the content was successfully processed res?; - tmp_path - .persist(config.content_path(&digest)) + let package_id = config.core_service.store().get_package_id(&log_id).await?; + let mut tmp_file = tokio::fs::File::open(&tmp_path) + .await + .map_err(PackageApiError::internal_error)?; + + config.content_store.store_content(&package_id, &digest, &mut tmp_file) + .await .map_err(PackageApiError::internal_error)?; // If this is the last content needed, submit the record for processing now @@ -450,18 +454,18 @@ async fn process_content( #[debug_handler] async fn fetch_content( State(config): State, - Path((_log_id, _record_id, digest)): Path<(LogId, RecordId, AnyHash)>, + Path((log_id, record_id, digest)): Path<(LogId, RecordId, AnyHash)>, ) -> Result { - let file = match tokio::fs::File::open(config.content_path(&digest)).await { - Ok(file) => file, - Err(_err) => return Err(PackageApiError::not_found(format!("content with digest `{digest}` not found"))), - }; + tracing::info!("fetching content for record `{record_id}` from `{log_id}`"); + + let package_id = config.core_service.store().get_package_id(&log_id).await?; + + let file = config.content_store.fetch_content(&package_id, &digest) + .await + .map_err(PackageApiError::not_found)?; - // convert the `AsyncRead` into a `Stream` let stream = ReaderStream::new(file); - // convert the `Stream` into an `axum::body::HttpBody` let body = StreamBody::new(stream); - let disposition = &format!("attachment; filename=\"{digest}\"", digest = digest.clone()); let headers = [ (header::CONTENT_TYPE, "application/wasm; charset=utf-8"), diff --git a/crates/server/src/bin/warg-server.rs b/crates/server/src/bin/warg-server.rs index a073aea5..40d383f6 100644 --- a/crates/server/src/bin/warg-server.rs +++ b/crates/server/src/bin/warg-server.rs @@ -16,6 +16,14 @@ enum DataStoreKind { Memory, } +#[derive(ValueEnum, Debug, Clone, Copy, PartialEq, Eq, Default)] +enum ContentStoreKind { + #[default] + Local, + #[value(alias("ociv1-1"))] + OCIv1_1, +} + #[derive(Parser, Debug)] struct Args { /// Use verbose output @@ -38,6 +46,10 @@ struct Args { #[arg(long, env = "WARG_DATA_STORE", default_value = "memory")] data_store: DataStoreKind, + /// The content store to use for the server. + #[arg(long, env = "WARG_CONTENT_STORE", default_value = "local")] + content_store: ContentStoreKind, + /// The database connection URL if data-store is set to postgres. /// /// Prefer using `database-url-file`, or environment variable variation, @@ -110,6 +122,16 @@ async fn main() -> Result<()> { .with_context(|| format!("failed to decode authorized keys from {path:?}"))?; config = config.with_record_policy(authorized_key_policy); } + + let config = match args.content_store { + ContentStoreKind::Local => { + tracing::info!("using local content store"); + config + } + ContentStoreKind::OCIv1_1 => { + todo!("OCIv1.1 content store is not yet implemented"); + } + }; let config = match args.data_store { #[cfg(feature = "postgres")] diff --git a/crates/server/src/contentstore/local.rs b/crates/server/src/contentstore/local.rs new file mode 100644 index 00000000..3c13170e --- /dev/null +++ b/crates/server/src/contentstore/local.rs @@ -0,0 +1,61 @@ +use std::path::{Path, PathBuf}; +use tokio::fs::File; +use tokio::io::copy; +use warg_crypto::hash::AnyHash; +use warg_protocol::registry::PackageId; +use crate::contentstore::{ContentStore, ContentStoreError}; + +#[derive(Clone)] +pub struct LocalContentStore { + files_dir: PathBuf, +} + +impl LocalContentStore { + pub fn new(files_dir: PathBuf) -> Self { + Self { files_dir } + } + + /// Returns the path to the content file for a given content address. + fn content_path(&self, digest: &AnyHash) -> PathBuf { + self.files_dir.join(content_file_name(digest)) + } +} + +/// Returns the file name for a given content address replacing colons with dashes. +fn content_file_name(digest: &AnyHash) -> String { + digest.to_string().replace(':', "-") +} + +#[axum::async_trait] +impl ContentStore for LocalContentStore { + async fn fetch_content( + &self, + _package_id: &PackageId, + digest: &AnyHash, + ) -> Result { + File::open(self.content_path(digest)) + .await + .map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string())) + } + + async fn store_content( + &self, + _package_id: &PackageId, + digest: &AnyHash, + content: &mut File + ) -> Result<(), ContentStoreError> { + let mut stored_file = File::create(self.content_path(digest)) + .await + .map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))?; + + copy(content, &mut stored_file) + .await + .map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))?; + Ok(()) + } + + async fn content_present(&self, _package_id: &PackageId, digest: &AnyHash) -> Result { + let path = self.content_path(digest); + Path::new(&path).try_exists().map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string())) + } +} diff --git a/crates/server/src/contentstore/mod.rs b/crates/server/src/contentstore/mod.rs new file mode 100644 index 00000000..a214fe7c --- /dev/null +++ b/crates/server/src/contentstore/mod.rs @@ -0,0 +1,40 @@ +use tokio::fs::File; +use warg_crypto::hash::AnyHash; +use thiserror::Error; +use warg_protocol::registry::PackageId; + +pub mod local; + +#[derive(Debug, Error)] +pub enum ContentStoreError { + #[error("content with address `{0}` was not found")] + ContentNotFound(AnyHash), + + #[error("content store internal error: {0}")] + ContentStoreInternalError(String), +} + +/// Implemented by content stores. +#[axum::async_trait] +pub trait ContentStore: Send + Sync { + /// Fetch content for a given package. + async fn fetch_content( + &self, + package_id: &PackageId, + digest: &AnyHash, + ) -> Result; + + /// Store content for a given package. + async fn store_content( + &self, + package_id: &PackageId, + digest: &AnyHash, + content: &mut File + ) -> Result<(), ContentStoreError>; + + async fn content_present( + &self, + package_id: &PackageId, + digest: &AnyHash, + ) -> Result; +} diff --git a/crates/server/src/datastore/memory.rs b/crates/server/src/datastore/memory.rs index 4f229f95..cb5c316c 100644 --- a/crates/server/src/datastore/memory.rs +++ b/crates/server/src/datastore/memory.rs @@ -78,6 +78,7 @@ struct State { package_ids: BTreeSet, checkpoints: IndexMap>, records: HashMap>, + names: HashMap, } /// Represents an in-memory data store. @@ -232,6 +233,7 @@ impl DataStore for MemoryDataStore { }); let mut state = self.0.write().await; + state.names.entry(log_id.clone()).or_insert_with(|| package_id.clone()); let prev = state.records.entry(log_id.clone()).or_default().insert( record_id.clone(), RecordStatus::Pending(PendingRecord::Package { @@ -590,6 +592,16 @@ impl DataStore for MemoryDataStore { }) } + async fn get_package_id(&self, log_id: &LogId) -> Result { + self.0 + .read() + .await + .names + .get(log_id) + .map(|package_id| package_id.clone()) + .ok_or_else(|| DataStoreError::LogNotFound(log_id.clone())) + } + async fn verify_package_record_signature( &self, log_id: &LogId, diff --git a/crates/server/src/datastore/mod.rs b/crates/server/src/datastore/mod.rs index fe92797a..eae3fa40 100644 --- a/crates/server/src/datastore/mod.rs +++ b/crates/server/src/datastore/mod.rs @@ -231,6 +231,12 @@ pub trait DataStore: Send + Sync { limit: u16, ) -> Result>, DataStoreError>; + /// Gets the package ID for the given log ID. + async fn get_package_id( + &self, + log_id: &LogId, + ) -> Result; + /// Gets an operator record. async fn get_operator_record( &self, diff --git a/crates/server/src/datastore/postgres/mod.rs b/crates/server/src/datastore/postgres/mod.rs index 30163ab8..8b42d3b4 100644 --- a/crates/server/src/datastore/postgres/mod.rs +++ b/crates/server/src/datastore/postgres/mod.rs @@ -747,6 +747,17 @@ impl DataStore for PostgresDataStore { get_records(&mut conn, log_id, checkpoint_id, since, limit as i64).await } + async fn get_package_id(&self, log_id: &LogId) -> std::result::Result { + let mut conn = self.0.get().await?; + schema::logs::table + .select(schema::logs::name) + .filter(schema::logs::log_id.eq(TextRef(log_id))) + .first::>(conn.as_mut()) + .await + .map(|name| PackageId::new(name.unwrap()).unwrap()) + .map_err(|_| DataStoreError::LogNotFound(log_id.clone())) + } + async fn get_operator_record( &self, log_id: &LogId, diff --git a/crates/server/src/datastore/postgres/models.rs b/crates/server/src/datastore/postgres/models.rs index f2b5fbe0..ae1a85ae 100644 --- a/crates/server/src/datastore/postgres/models.rs +++ b/crates/server/src/datastore/postgres/models.rs @@ -58,7 +58,7 @@ impl<'a, T: std::fmt::Debug + Display> ToSql for TextRef<'a } } -#[derive(Insertable)] +#[derive(Insertable, Queryable, Selectable)] #[diesel(table_name = logs)] pub struct NewLog<'a, V> where diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index d0ad90f0..e70d3025 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs @@ -15,13 +15,16 @@ use std::{ }; use tokio::task::JoinHandle; use url::Url; +use contentstore::ContentStore; use warg_crypto::signing::PrivateKey; +use crate::contentstore::local::LocalContentStore; pub mod api; pub mod args; pub mod datastore; pub mod policy; pub mod services; +pub mod contentstore; const DEFAULT_BIND_ADDRESS: &str = "127.0.0.1:8090"; const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(5); @@ -39,6 +42,7 @@ pub struct Config { checkpoint_interval: Option, content_policy: Option>, record_policy: Option>, + content_store: Arc, } impl std::fmt::Debug for Config { @@ -72,12 +76,13 @@ impl Config { operator_key, addr: None, data_store: None, - content_dir, + content_dir: content_dir.clone(), content_base_url: None, shutdown: None, checkpoint_interval: None, content_policy: None, record_policy: None, + content_store: Arc::new(LocalContentStore::new(content_dir.join("files"))), } } @@ -219,9 +224,9 @@ impl Server { content_base_url, core, temp_dir, - files_dir, self.config.content_policy, self.config.record_policy, + self.config.content_store, ); Ok(InitializedServer { From 957b7237514e07f2d39f962fc7175d410dfa4c25 Mon Sep 17 00:00:00 2001 From: David Justice Date: Thu, 29 Jun 2023 15:58:46 -0400 Subject: [PATCH 3/8] add oci content store and related cli fields Signed-off-by: David Justice --- Cargo.lock | 60 ++++++ crates/server/Cargo.toml | 1 + crates/server/src/api/v1/package.rs | 33 ++- crates/server/src/bin/warg-server.rs | 11 +- crates/server/src/contentstore/local.rs | 19 +- crates/server/src/contentstore/mod.rs | 6 +- crates/server/src/contentstore/oci/client.rs | 197 ++++++++++++++++++ crates/server/src/contentstore/oci/mod.rs | 2 + crates/server/src/contentstore/oci/ociv1_1.rs | 70 +++++++ crates/server/src/datastore/mod.rs | 39 ++-- crates/server/src/lib.rs | 8 + 11 files changed, 417 insertions(+), 29 deletions(-) create mode 100644 crates/server/src/contentstore/oci/client.rs create mode 100644 crates/server/src/contentstore/oci/mod.rs create mode 100644 crates/server/src/contentstore/oci/ociv1_1.rs diff --git a/Cargo.lock b/Cargo.lock index ec747c43..ec0f117f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1695,6 +1695,15 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-auth" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5430cacd7a1f9a02fbeb350dfc81a0e5ed42d81f3398cb0ba184017f85bdcfbc" +dependencies = [ + "memchr", +] + [[package]] name = "http-body" version = "0.4.5" @@ -1943,6 +1952,21 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jwt" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6204285f77fe7d9784db3fdc449ecce1a0114927a51d5a41c4c7a292011c015f" +dependencies = [ + "base64 0.13.1", + "crypto-common", + "digest", + "hmac", + "serde", + "serde_json", + "sha2", +] + [[package]] name = "keyring" version = "2.0.4" @@ -2369,6 +2393,41 @@ dependencies = [ "memchr", ] +[[package]] +name = "oci-distribution" +version = "0.10.0" +source = "git+https://github.com/krustlet/oci-distribution?branch=main#28daf780f90b1f7c3ab6ae9ee9c658d28e626086" +dependencies = [ + "bytes", + "chrono", + "futures-util", + "http", + "http-auth", + "jwt", + "lazy_static", + "olpc-cjson", + "regex", + "reqwest", + "serde", + "serde_json", + "sha2", + "thiserror", + "tokio", + "tracing", + "unicase", +] + +[[package]] +name = "olpc-cjson" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d637c9c15b639ccff597da8f4fa968300651ad2f1e968aefc3b4927a6fb2027a" +dependencies = [ + "serde", + "serde_json", + "unicode-normalization", +] + [[package]] name = "once_cell" version = "1.18.0" @@ -4256,6 +4315,7 @@ dependencies = [ "diesel_migrations", "futures", "indexmap 2.0.0", + "oci-distribution", "secrecy", "serde", "serde_json", diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index 8a1a3d80..d86088d6 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -36,6 +36,7 @@ diesel_migrations = { workspace = true, optional = true } diesel-derive-enum = { workspace = true, optional = true, features = ["postgres"] } serde_json = { workspace = true, optional = true } chrono = { workspace = true, optional = true } +oci-distribution = { git = "https://github.com/krustlet/oci-distribution", branch = "main" } [features] default = [] diff --git a/crates/server/src/api/v1/package.rs b/crates/server/src/api/v1/package.rs index 13d0b974..ca4617b8 100644 --- a/crates/server/src/api/v1/package.rs +++ b/crates/server/src/api/v1/package.rs @@ -227,14 +227,22 @@ async fn publish_record( let record_id = RecordId::package_record::(&record); let mut missing = HashSet::<&AnyHash>::new(); + let version = crate::datastore::get_version_for_release(record.as_ref()); for key in record.as_ref().contents() { - if !config - .content_store - .content_present(&body.id, key) - .await - .map_err(PackageApiError::internal_error)? - { - missing.insert(key); + match version { + Some(version) => { + if !config + .content_store + .content_present(&body.id, key, version.to_string()) + .await + .map_err(PackageApiError::internal_error)? + { + missing.insert(key); + } + } + None => { + missing.insert(key); + } } } @@ -379,12 +387,13 @@ async fn upload_content( // Only persist the file if the content was successfully processed res?; + let version = crate::datastore::get_release_version(config.core_service.store(), &log_id, &record_id).await?; let package_id = config.core_service.store().get_package_id(&log_id).await?; let mut tmp_file = tokio::fs::File::open(&tmp_path) .await .map_err(PackageApiError::internal_error)?; - config.content_store.store_content(&package_id, &digest, &mut tmp_file) + config.content_store.store_content(&package_id, &digest, version.to_string(), &mut tmp_file) .await .map_err(PackageApiError::internal_error)?; @@ -459,8 +468,12 @@ async fn fetch_content( tracing::info!("fetching content for record `{record_id}` from `{log_id}`"); let package_id = config.core_service.store().get_package_id(&log_id).await?; - - let file = config.content_store.fetch_content(&package_id, &digest) + let version = crate::datastore::get_release_version( + config.core_service.store(), + &log_id, + &record_id, + ).await?; + let file = config.content_store.fetch_content(&package_id, &digest, version.to_string()) .await .map_err(PackageApiError::not_found)?; diff --git a/crates/server/src/bin/warg-server.rs b/crates/server/src/bin/warg-server.rs index 40d383f6..6cb9b7e9 100644 --- a/crates/server/src/bin/warg-server.rs +++ b/crates/server/src/bin/warg-server.rs @@ -2,6 +2,7 @@ use anyhow::{Context, Result}; use clap::{Parser, ValueEnum}; use secrecy::SecretString; use std::{net::SocketAddr, path::PathBuf}; +use oci_distribution::secrets::RegistryAuth::Anonymous; use tokio::signal; use tracing_subscriber::filter::LevelFilter; use url::Url; @@ -50,6 +51,10 @@ struct Args { #[arg(long, env = "WARG_CONTENT_STORE", default_value = "local")] content_store: ContentStoreKind, + /// The OCI registry URL if content store is set to oci. + #[arg(long, env = "WARG_OCI_REGISTRY_URL", default_value = "localhost:5000")] + oci_registry_url: Option, + /// The database connection URL if data-store is set to postgres. /// /// Prefer using `database-url-file`, or environment variable variation, @@ -107,7 +112,7 @@ async fn main() -> Result<()> { let operator_key = PrivateKey::decode(operator_key_str).context("failed to parse operator key")?; - let mut config = Config::new(operator_key, args.content_dir) + let mut config = Config::new(operator_key, args.content_dir.clone()) .with_addr(args.listen) .with_shutdown(shutdown_signal()); @@ -129,7 +134,9 @@ async fn main() -> Result<()> { config } ContentStoreKind::OCIv1_1 => { - todo!("OCIv1.1 content store is not yet implemented"); + use warg_server::contentstore::oci::ociv1_1::OCIv1_1ContentStore; + tracing::info!("using OCIv1.1 content store"); + config.with_content_store(OCIv1_1ContentStore::new(args.oci_registry_url.unwrap(), Anonymous, &args.content_dir).await) } }; diff --git a/crates/server/src/contentstore/local.rs b/crates/server/src/contentstore/local.rs index 3c13170e..4b8e8b4f 100644 --- a/crates/server/src/contentstore/local.rs +++ b/crates/server/src/contentstore/local.rs @@ -28,33 +28,44 @@ fn content_file_name(digest: &AnyHash) -> String { #[axum::async_trait] impl ContentStore for LocalContentStore { + /// Fetch content for a given package. async fn fetch_content( &self, _package_id: &PackageId, digest: &AnyHash, + _version: String, ) -> Result { File::open(self.content_path(digest)) .await .map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string())) } + /// Store content for a given package. async fn store_content( &self, _package_id: &PackageId, digest: &AnyHash, + _version: String, content: &mut File - ) -> Result<(), ContentStoreError> { - let mut stored_file = File::create(self.content_path(digest)) + ) -> Result { + let file_path = self.content_path(digest); + let mut stored_file = File::create(file_path.clone()) .await .map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))?; copy(content, &mut stored_file) .await .map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))?; - Ok(()) + Ok(file_path.to_string_lossy().to_string()) } - async fn content_present(&self, _package_id: &PackageId, digest: &AnyHash) -> Result { + /// Check if the content is present in the store. + async fn content_present( + &self, + _package_id: &PackageId, + digest: &AnyHash, + _version: String, + ) -> Result { let path = self.content_path(digest); Path::new(&path).try_exists().map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string())) } diff --git a/crates/server/src/contentstore/mod.rs b/crates/server/src/contentstore/mod.rs index a214fe7c..1e1fc04a 100644 --- a/crates/server/src/contentstore/mod.rs +++ b/crates/server/src/contentstore/mod.rs @@ -4,6 +4,7 @@ use thiserror::Error; use warg_protocol::registry::PackageId; pub mod local; +pub mod oci; #[derive(Debug, Error)] pub enum ContentStoreError { @@ -22,6 +23,7 @@ pub trait ContentStore: Send + Sync { &self, package_id: &PackageId, digest: &AnyHash, + version: String, ) -> Result; /// Store content for a given package. @@ -29,12 +31,14 @@ pub trait ContentStore: Send + Sync { &self, package_id: &PackageId, digest: &AnyHash, + version: String, content: &mut File - ) -> Result<(), ContentStoreError>; + ) -> Result; async fn content_present( &self, package_id: &PackageId, digest: &AnyHash, + version: String, ) -> Result; } diff --git a/crates/server/src/contentstore/oci/client.rs b/crates/server/src/contentstore/oci/client.rs new file mode 100644 index 00000000..fd3c6d82 --- /dev/null +++ b/crates/server/src/contentstore/oci/client.rs @@ -0,0 +1,197 @@ +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use anyhow::{Context, Result}; +use oci_distribution::{ + client, + client::{ClientProtocol, Config, ImageLayer}, + manifest::OciImageManifest, + Reference, + secrets::RegistryAuth, +}; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::runtime::Handle; +use tokio::sync::RwLock; +use tokio::task::block_in_place; + +use warg_crypto::hash::AnyHash; + +use crate::{ + contentstore::ContentStoreError, + contentstore::ContentStoreError::ContentStoreInternalError, +}; + +// const COMPONENT_CONFIG_MEDIA_TYPE: &str = "application/vnd.bytecodealliance.component.v1+config"; +const WASM_LAYER_MEDIA_TYPE: &str = "application/vnd.bytecodealliance.wasm.content.layer.v1+wasm"; + +/// Client for interacting with an OCI registry +pub struct Client { + oci_client: Arc>, + auth: RegistryAuth, + temp_dir: PathBuf, +} + +impl Client { + /// Create a new instance of an OCI client for storing components. + pub async fn new(insecure: bool, auth: RegistryAuth, temp_dir: &PathBuf) -> Self { + let client = oci_distribution::Client::new(Self::build_config(insecure)); + Self { + oci_client: Arc::new(RwLock::new(client)), + auth: auth.into(), + temp_dir: temp_dir.clone(), + } + } + + pub async fn pull( + &self, + reference: impl AsRef, + digest: &AnyHash, + ) -> Result { + let path = self.cached_content_path(digest); + if Path::new(&path).try_exists().map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))? { + let file = File::open(path) + .await + .map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))?; + return Ok(file); + } + + let reference: Reference = reference + .as_ref() + .parse() + .with_context(|| format!("cannot parse reference {}", reference.as_ref())) + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + + // TODO: fix the higher-level lifetime error that occurs when not using block_in_place and + // block_on. + let result = block_in_place(|| { + Handle::current().block_on(async move { + let mut oci = self + .oci_client + .write() + .await; + oci + .pull(&reference, &self.auth, vec![WASM_LAYER_MEDIA_TYPE]) + .await + }) + }); + + let image = result + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + + let layer = image + .layers + .into_iter() + .find(|l| l.sha256_digest() == digest.to_string()) + .ok_or(ContentStoreInternalError("layer not found".to_string()))?; + let mut file = File::create(self.cached_content_path(digest)) + .await + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + file + .write_all(&layer.data) + .await + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + Ok(file) + } + + /// Push a component to an OCI registry. + pub async fn push( + &self, + reference: impl AsRef, + file: &mut File, + _digest: &AnyHash, + ) -> Result { + let reference: Reference = reference + .as_ref() + .parse() + .with_context(|| format!("cannot parse reference {}", reference.as_ref())) + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + + let oci_config = Config::oci_v1("{}".as_bytes().to_vec(), None); + let mut layers = Vec::new(); + let wasm_layer = Self::wasm_layer(file) + .await + .context("cannot create wasm layer") + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + layers.insert(0, wasm_layer); + let manifest = OciImageManifest::build(&layers, &oci_config, None); + // TODO: add artifactType to describe the mediaType for the component. + // Candidate mediaType: "application/vnd.bytecodealliance.wasm.component.v1+config" + + // TODO: fix the higher-level lifetime error that occurs when not using block_in_place and + // block_on. + let result= block_in_place(|| { + Handle::current().block_on(async move { + tracing::log::trace!("Pushing component to {:?}", reference); + let mut oci = self + .oci_client + .write() + .await; + oci + .push(&reference, &layers, oci_config, &self.auth, Some(manifest)) + .await + }) + }); + + result + .map(|push_response| push_response.manifest_url) + .context("cannot push component to the registry") + .map_err(|e| ContentStoreInternalError(e.to_string())) + } + + pub async fn content_exists(&self, reference: impl AsRef) -> Result { + let reference: Reference = reference + .as_ref() + .parse() + .with_context(|| format!("cannot parse reference {}", reference.as_ref())) + .map_err(|e| ContentStoreInternalError(e.to_string())) + .unwrap(); + + let mut oci = self.oci_client.write().await; + match oci + .fetch_manifest_digest(&reference, &self.auth) + .await + { + Ok(_) => Ok(true), + Err(_) => Ok(false), + } + } + + /// Create a new wasm layer based on a file. + async fn wasm_layer(file: &mut File) -> Result { + tracing::log::trace!("Reading wasm component from {:?}", file); + + let mut contents = vec![]; + file.read_to_end(&mut contents).await.context("cannot read wasm component")?; + + Ok(ImageLayer::new( + contents, + WASM_LAYER_MEDIA_TYPE.to_string(), + None, + )) + } + + /// Returns the path to the content file for a given content address. + fn cached_content_path(&self, digest: &AnyHash) -> PathBuf { + self.temp_dir.join(Self::content_file_name(digest)) + } + + /// Returns the file name for a given content address replacing colons with dashes. + fn content_file_name(digest: &AnyHash) -> String { + digest.to_string().replace(':', "-") + } + + /// Build the OCI client configuration given the insecure option. + fn build_config(insecure: bool) -> client::ClientConfig { + let protocol = if insecure { + ClientProtocol::Http + } else { + ClientProtocol::Https + }; + + client::ClientConfig { + protocol, + ..Default::default() + } + } +} diff --git a/crates/server/src/contentstore/oci/mod.rs b/crates/server/src/contentstore/oci/mod.rs new file mode 100644 index 00000000..bacac086 --- /dev/null +++ b/crates/server/src/contentstore/oci/mod.rs @@ -0,0 +1,2 @@ +pub mod ociv1_1; +mod client; diff --git a/crates/server/src/contentstore/oci/ociv1_1.rs b/crates/server/src/contentstore/oci/ociv1_1.rs new file mode 100644 index 00000000..03aa10f6 --- /dev/null +++ b/crates/server/src/contentstore/oci/ociv1_1.rs @@ -0,0 +1,70 @@ +use std::path::PathBuf; +use oci_distribution::secrets::RegistryAuth; +use tokio::fs::File; +use warg_crypto::hash::AnyHash; +use warg_protocol::registry::PackageId; +use crate::contentstore::{ContentStore, ContentStoreError}; +use crate::contentstore::oci::client::Client; + +type Auth = RegistryAuth; + +/// Content store for OCI v1.1 registries. +pub struct OCIv1_1ContentStore { + client: Client, + registry_url: String, +} + +impl OCIv1_1ContentStore { + pub async fn new(registry_url: impl Into, auth: Auth, temp_dir: &PathBuf) -> Self { + let client = Client::new(true, auth, temp_dir).await; + Self { client, registry_url: registry_url.into() } + } + + fn reference(&self, package_id: &PackageId, version: String) -> String { + let (reg_url, namespace, name) = (self.registry_url.clone(), package_id.namespace(), package_id.name()); + format!("{reg_url}/{namespace}/{name}:{version}") + } +} + +#[axum::async_trait] +impl ContentStore for OCIv1_1ContentStore { + /// Fetch the content from the store. + async fn fetch_content( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + ) -> Result { + let reference = self.reference(package_id, version); + self + .client + .pull(reference, digest) + .await + } + + /// Store the content in the store. + async fn store_content( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + content: &mut File + ) -> Result { + let reference = self.reference(package_id, version); + self + .client + .push(reference, content, digest) + .await + } + + /// Check if the content is present in the store. + async fn content_present( + &self, + package_id: &PackageId, + _digest: &AnyHash, + version: String, + ) -> Result { + let reference = self.reference(package_id, version); + self.client.content_exists(reference).await + } +} diff --git a/crates/server/src/datastore/mod.rs b/crates/server/src/datastore/mod.rs index eae3fa40..fc7b42db 100644 --- a/crates/server/src/datastore/mod.rs +++ b/crates/server/src/datastore/mod.rs @@ -1,21 +1,20 @@ -use futures::Stream; use std::{collections::HashSet, pin::Pin}; + +use futures::Stream; use thiserror::Error; + +pub use memory::*; +use PackageEntry::Release; +#[cfg(feature = "postgres")] +pub use postgres::*; use warg_crypto::{hash::AnyHash, signing::KeyID}; -use warg_protocol::{ - operator, package, - registry::{LogId, LogLeaf, MapCheckpoint, PackageId, RecordId}, - ProtoEnvelope, SerdeEnvelope, -}; +use warg_protocol::{operator, package, ProtoEnvelope, registry::{LogId, LogLeaf, MapCheckpoint, PackageId, RecordId}, SerdeEnvelope, Version}; +use warg_protocol::package::PackageEntry; mod memory; #[cfg(feature = "postgres")] mod postgres; -pub use memory::*; -#[cfg(feature = "postgres")] -pub use postgres::*; - #[derive(Debug, Error)] pub enum DataStoreError { #[error("a conflicting operation was processed: update to the latest checkpoint and try the operation again")] @@ -80,8 +79,8 @@ pub enum RecordStatus { /// Represents a record in a log. pub struct Record -where - T: Clone, + where + T: Clone, { /// The status of the record. pub status: RecordStatus, @@ -270,3 +269,19 @@ pub trait DataStore: Send + Sync { anyhow::bail!("not implemented") } } + +pub fn get_version_for_release(record: &package::PackageRecord) -> Option<&Version> { + record.entries.iter().find_map(|entry| { + match entry { + Release { version, content: _ } => Some(version), + _ => None, + } + }) +} + +pub async fn get_release_version(data_store: &dyn DataStore, log_id: &LogId, record_id: &RecordId) -> Result { + let record = data_store.get_package_record(&log_id, &record_id).await?; + get_version_for_release(&record.envelope.as_ref()) + .cloned() + .ok_or_else(|| DataStoreError::RecordNotFound(record_id.clone())) +} diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index e70d3025..36b733bb 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs @@ -108,6 +108,14 @@ impl Config { self } + /// Specify the content store to use. + /// + /// If this is not specified, the server will use a local content store. + pub fn with_content_store(mut self, store: impl ContentStore + 'static) -> Self { + self.content_store = Arc::new(store); + self + } + /// Specify the data store to use via a boxed data store. /// /// If this is not specified, the server will use an in-memory data store. From 30c523b95fd88c370b9654df1e0ce945c755a957 Mon Sep 17 00:00:00 2001 From: David Justice Date: Thu, 20 Jul 2023 18:32:03 -0400 Subject: [PATCH 4/8] add more reasonable config values and artifactType Signed-off-by: David Justice --- Cargo.lock | 2 +- crates/server/Cargo.toml | 7 ++--- crates/server/src/contentstore/oci/client.rs | 29 +++++++++++++++----- 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ec0f117f..2f2f36f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2396,7 +2396,7 @@ dependencies = [ [[package]] name = "oci-distribution" version = "0.10.0" -source = "git+https://github.com/krustlet/oci-distribution?branch=main#28daf780f90b1f7c3ab6ae9ee9c658d28e626086" +source = "git+https://github.com/devigned/oci-distribution?branch=os-wasi#c161f079c98f03a39e9c24127b554b6ceeb83fb3" dependencies = [ "bytes", "chrono", diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index d86088d6..e247f57e 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -34,11 +34,10 @@ diesel-async = { workspace = true, features = ["postgres", "deadpool"], optional diesel_json = { workspace = true, optional = true} diesel_migrations = { workspace = true, optional = true } diesel-derive-enum = { workspace = true, optional = true, features = ["postgres"] } -serde_json = { workspace = true, optional = true } +serde_json = { workspace = true } chrono = { workspace = true, optional = true } -oci-distribution = { git = "https://github.com/krustlet/oci-distribution", branch = "main" } +oci-distribution = { git = "https://github.com/devigned/oci-distribution", branch = "os-wasi" } [features] default = [] -debug = [] -postgres = ["diesel", "diesel-async", "diesel_json", "diesel_migrations", "diesel-derive-enum", "serde_json", "chrono"] +postgres = ["diesel", "diesel-async", "diesel_json", "diesel_migrations", "diesel-derive-enum", "chrono"] diff --git a/crates/server/src/contentstore/oci/client.rs b/crates/server/src/contentstore/oci/client.rs index fd3c6d82..d4eb550a 100644 --- a/crates/server/src/contentstore/oci/client.rs +++ b/crates/server/src/contentstore/oci/client.rs @@ -9,6 +9,8 @@ use oci_distribution::{ Reference, secrets::RegistryAuth, }; +use oci_distribution::config::{Architecture, ConfigFile, Config as DistConfig, Os}; +use serde_json; use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::runtime::Handle; @@ -22,8 +24,9 @@ use crate::{ contentstore::ContentStoreError::ContentStoreInternalError, }; -// const COMPONENT_CONFIG_MEDIA_TYPE: &str = "application/vnd.bytecodealliance.component.v1+config"; -const WASM_LAYER_MEDIA_TYPE: &str = "application/vnd.bytecodealliance.wasm.content.layer.v1+wasm"; +const COMPONENT_ARTIFACT_TYPE: &str = "application/vnd.bytecodealliance.component.v1+wasm"; +const WASM_LAYER_MEDIA_TYPE: &str = "application/vnd.bytecodealliance.wasm.component.layer.v0+wasm"; +// const COMPONENT_COMPOSE_MANIFEST_MEDIA_TYPE: &str = "application/vnd.bytecodealliance.component.compose.v0+yaml"; /// Client for interacting with an OCI registry pub struct Client { @@ -99,7 +102,7 @@ impl Client { &self, reference: impl AsRef, file: &mut File, - _digest: &AnyHash, + digest: &AnyHash, ) -> Result { let reference: Reference = reference .as_ref() @@ -107,16 +110,28 @@ impl Client { .with_context(|| format!("cannot parse reference {}", reference.as_ref())) .map_err(|e| ContentStoreInternalError(e.to_string()))?; - let oci_config = Config::oci_v1("{}".as_bytes().to_vec(), None); + let entrypoint = format!("/{}", digest.to_string().strip_prefix("sha256:").unwrap()); + let config = ConfigFile { + architecture: Architecture::Wasm, + os: Os::Wasi, + config: Some(DistConfig { + // use the sha256 hash as the file name for the entrypoint + entrypoint: vec![entrypoint], + ..Default::default() + }), + ..Default::default() + }; + let config_data = serde_json::to_vec(&config) + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + let oci_config = Config::oci_v1(config_data, None); let mut layers = Vec::new(); let wasm_layer = Self::wasm_layer(file) .await .context("cannot create wasm layer") .map_err(|e| ContentStoreInternalError(e.to_string()))?; layers.insert(0, wasm_layer); - let manifest = OciImageManifest::build(&layers, &oci_config, None); - // TODO: add artifactType to describe the mediaType for the component. - // Candidate mediaType: "application/vnd.bytecodealliance.wasm.component.v1+config" + let mut manifest = OciImageManifest::build(&layers, &oci_config, None); + manifest.artifact_type = Some(COMPONENT_ARTIFACT_TYPE.to_string()); // TODO: fix the higher-level lifetime error that occurs when not using block_in_place and // block_on. From e125664270aedb07e2b031b553f03258312b9156 Mon Sep 17 00:00:00 2001 From: David Justice Date: Wed, 26 Jul 2023 11:56:58 -0400 Subject: [PATCH 5/8] add back the debug feature to server Signed-off-by: David Justice --- crates/server/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index e247f57e..cc2b9c69 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -40,4 +40,5 @@ oci-distribution = { git = "https://github.com/devigned/oci-distribution", branc [features] default = [] +debug = [] postgres = ["diesel", "diesel-async", "diesel_json", "diesel_migrations", "diesel-derive-enum", "chrono"] From e9d30ccdc8f92a1ef8ccfbbde5252f4fdbd92521 Mon Sep 17 00:00:00 2001 From: David Justice Date: Wed, 26 Jul 2023 12:51:19 -0400 Subject: [PATCH 6/8] fix fmt and test fails Signed-off-by: David Justice --- crates/protocol/src/registry.rs | 6 +++ crates/server/src/api/mod.rs | 2 +- crates/server/src/api/v1/mod.rs | 2 +- crates/server/src/api/v1/package.rs | 52 ++++++++++-------- crates/server/src/bin/warg-server.rs | 13 +++-- crates/server/src/contentstore/local.rs | 8 +-- crates/server/src/contentstore/mod.rs | 4 +- crates/server/src/contentstore/oci/client.rs | 54 +++++++++---------- crates/server/src/contentstore/oci/mod.rs | 2 +- crates/server/src/contentstore/oci/ociv1_1.rs | 29 +++++----- crates/server/src/datastore/memory.rs | 5 +- crates/server/src/datastore/mod.rs | 34 +++++++----- crates/server/src/datastore/postgres/mod.rs | 7 ++- crates/server/src/lib.rs | 6 +-- tests/server.rs | 4 +- 15 files changed, 130 insertions(+), 98 deletions(-) diff --git a/crates/protocol/src/registry.rs b/crates/protocol/src/registry.rs index 0d69258e..ab7ca3ff 100644 --- a/crates/protocol/src/registry.rs +++ b/crates/protocol/src/registry.rs @@ -246,6 +246,12 @@ impl From for RecordId { } } +impl From for AnyHash { + fn from(id: RecordId) -> AnyHash { + id.0 + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/server/src/api/mod.rs b/crates/server/src/api/mod.rs index 5ca66e19..c6f997b9 100644 --- a/crates/server/src/api/mod.rs +++ b/crates/server/src/api/mod.rs @@ -1,3 +1,4 @@ +use crate::contentstore::ContentStore; use crate::{ policy::{content::ContentPolicy, record::RecordPolicy}, services::CoreService, @@ -12,7 +13,6 @@ use tower_http::{ }; use tracing::{Level, Span}; use url::Url; -use crate::contentstore::ContentStore; pub mod v1; diff --git a/crates/server/src/api/v1/mod.rs b/crates/server/src/api/v1/mod.rs index f746dd6f..ecd95e7a 100644 --- a/crates/server/src/api/v1/mod.rs +++ b/crates/server/src/api/v1/mod.rs @@ -1,3 +1,4 @@ +use crate::contentstore::ContentStore; use crate::{ policy::{content::ContentPolicy, record::RecordPolicy}, services::CoreService, @@ -15,7 +16,6 @@ use axum::{ use serde::{Serialize, Serializer}; use std::{path::PathBuf, sync::Arc}; use url::Url; -use crate::contentstore::ContentStore; pub mod fetch; pub mod package; diff --git a/crates/server/src/api/v1/package.rs b/crates/server/src/api/v1/package.rs index ca4617b8..73037a9d 100644 --- a/crates/server/src/api/v1/package.rs +++ b/crates/server/src/api/v1/package.rs @@ -1,14 +1,15 @@ -use std::collections::HashSet; use super::{Json, Path}; use crate::{ + contentstore::ContentStore, datastore::{DataStoreError, RecordStatus}, policy::{ content::{ContentPolicy, ContentPolicyError}, record::{RecordPolicy, RecordPolicyError}, }, services::CoreService, - contentstore::ContentStore, }; +use axum::body::StreamBody; +use axum::http::header; use axum::{ debug_handler, extract::{BodyStream, State}, @@ -18,14 +19,13 @@ use axum::{ Router, }; use futures::StreamExt; +use std::collections::HashSet; use std::sync::Arc; use std::{collections::HashMap, path::PathBuf}; -use axum::body::StreamBody; -use axum::http::header; use tempfile::NamedTempFile; use tokio::io::AsyncWriteExt; -use url::Url; use tokio_util::io::ReaderStream; +use url::Url; use warg_api::v1::package::{ ContentSource, MissingContent, PackageError, PackageRecord, PackageRecordState, PublishRecordRequest, UploadEndpoint, @@ -70,17 +70,20 @@ impl Config { Router::new() .route("/:log_id/record", post(publish_record)) .route("/:log_id/record/:record_id", get(get_record)) - .route("/:log_id/record/:record_id/content/:digest", post(upload_content)) - .route("/:log_id/record/:record_id/content/:digest", get(fetch_content)) + .route( + "/:log_id/record/:record_id/content/:digest", + post(upload_content), + ) + .route( + "/:log_id/record/:record_id/content/:digest", + get(fetch_content), + ) .with_state(self) } - fn content_url(&self, - log_id: &LogId, - record_id: &RecordId, - digest: &AnyHash) -> String { + fn content_url(&self, log_id: &LogId, record_id: &RecordId, digest: &AnyHash) -> String { format!( - "{url}/{log_id}/record/{record_id}/content/{digest}", + "{url}v1/package/{log_id}/record/{record_id}/content/{digest}", url = self.content_base_url, ) } @@ -387,13 +390,17 @@ async fn upload_content( // Only persist the file if the content was successfully processed res?; - let version = crate::datastore::get_release_version(config.core_service.store(), &log_id, &record_id).await?; + let version = + crate::datastore::get_release_version(config.core_service.store(), &log_id, &record_id) + .await?; let package_id = config.core_service.store().get_package_id(&log_id).await?; let mut tmp_file = tokio::fs::File::open(&tmp_path) .await .map_err(PackageApiError::internal_error)?; - config.content_store.store_content(&package_id, &digest, version.to_string(), &mut tmp_file) + config + .content_store + .store_content(&package_id, &digest, version.to_string(), &mut tmp_file) .await .map_err(PackageApiError::internal_error)?; @@ -412,7 +419,10 @@ async fn upload_content( Ok(( StatusCode::CREATED, - [(header::LOCATION, config.content_url(&log_id, &record_id, &digest))], + [( + header::LOCATION, + config.content_url(&log_id, &record_id, &digest), + )], )) } @@ -468,12 +478,12 @@ async fn fetch_content( tracing::info!("fetching content for record `{record_id}` from `{log_id}`"); let package_id = config.core_service.store().get_package_id(&log_id).await?; - let version = crate::datastore::get_release_version( - config.core_service.store(), - &log_id, - &record_id, - ).await?; - let file = config.content_store.fetch_content(&package_id, &digest, version.to_string()) + let version = + crate::datastore::get_release_version(config.core_service.store(), &log_id, &record_id) + .await?; + let file = config + .content_store + .fetch_content(&package_id, &digest, version.to_string()) .await .map_err(PackageApiError::not_found)?; diff --git a/crates/server/src/bin/warg-server.rs b/crates/server/src/bin/warg-server.rs index 6cb9b7e9..64594a47 100644 --- a/crates/server/src/bin/warg-server.rs +++ b/crates/server/src/bin/warg-server.rs @@ -1,8 +1,8 @@ use anyhow::{Context, Result}; use clap::{Parser, ValueEnum}; +use oci_distribution::secrets::RegistryAuth::Anonymous; use secrecy::SecretString; use std::{net::SocketAddr, path::PathBuf}; -use oci_distribution::secrets::RegistryAuth::Anonymous; use tokio::signal; use tracing_subscriber::filter::LevelFilter; use url::Url; @@ -127,7 +127,7 @@ async fn main() -> Result<()> { .with_context(|| format!("failed to decode authorized keys from {path:?}"))?; config = config.with_record_policy(authorized_key_policy); } - + let config = match args.content_store { ContentStoreKind::Local => { tracing::info!("using local content store"); @@ -136,7 +136,14 @@ async fn main() -> Result<()> { ContentStoreKind::OCIv1_1 => { use warg_server::contentstore::oci::ociv1_1::OCIv1_1ContentStore; tracing::info!("using OCIv1.1 content store"); - config.with_content_store(OCIv1_1ContentStore::new(args.oci_registry_url.unwrap(), Anonymous, &args.content_dir).await) + config.with_content_store( + OCIv1_1ContentStore::new( + args.oci_registry_url.unwrap(), + Anonymous, + &args.content_dir, + ) + .await, + ) } }; diff --git a/crates/server/src/contentstore/local.rs b/crates/server/src/contentstore/local.rs index 4b8e8b4f..7eb59ae0 100644 --- a/crates/server/src/contentstore/local.rs +++ b/crates/server/src/contentstore/local.rs @@ -1,9 +1,9 @@ +use crate::contentstore::{ContentStore, ContentStoreError}; use std::path::{Path, PathBuf}; use tokio::fs::File; use tokio::io::copy; use warg_crypto::hash::AnyHash; use warg_protocol::registry::PackageId; -use crate::contentstore::{ContentStore, ContentStoreError}; #[derive(Clone)] pub struct LocalContentStore { @@ -46,7 +46,7 @@ impl ContentStore for LocalContentStore { _package_id: &PackageId, digest: &AnyHash, _version: String, - content: &mut File + content: &mut File, ) -> Result { let file_path = self.content_path(digest); let mut stored_file = File::create(file_path.clone()) @@ -67,6 +67,8 @@ impl ContentStore for LocalContentStore { _version: String, ) -> Result { let path = self.content_path(digest); - Path::new(&path).try_exists().map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string())) + Path::new(&path) + .try_exists() + .map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string())) } } diff --git a/crates/server/src/contentstore/mod.rs b/crates/server/src/contentstore/mod.rs index 1e1fc04a..bf632646 100644 --- a/crates/server/src/contentstore/mod.rs +++ b/crates/server/src/contentstore/mod.rs @@ -1,6 +1,6 @@ +use thiserror::Error; use tokio::fs::File; use warg_crypto::hash::AnyHash; -use thiserror::Error; use warg_protocol::registry::PackageId; pub mod local; @@ -32,7 +32,7 @@ pub trait ContentStore: Send + Sync { package_id: &PackageId, digest: &AnyHash, version: String, - content: &mut File + content: &mut File, ) -> Result; async fn content_present( diff --git a/crates/server/src/contentstore/oci/client.rs b/crates/server/src/contentstore/oci/client.rs index d4eb550a..ff992f3d 100644 --- a/crates/server/src/contentstore/oci/client.rs +++ b/crates/server/src/contentstore/oci/client.rs @@ -2,14 +2,14 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use anyhow::{Context, Result}; +use oci_distribution::config::{Architecture, Config as DistConfig, ConfigFile, Os}; use oci_distribution::{ client, client::{ClientProtocol, Config, ImageLayer}, manifest::OciImageManifest, - Reference, secrets::RegistryAuth, + Reference, }; -use oci_distribution::config::{Architecture, ConfigFile, Config as DistConfig, Os}; use serde_json; use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -20,8 +20,7 @@ use tokio::task::block_in_place; use warg_crypto::hash::AnyHash; use crate::{ - contentstore::ContentStoreError, - contentstore::ContentStoreError::ContentStoreInternalError, + contentstore::ContentStoreError, contentstore::ContentStoreError::ContentStoreInternalError, }; const COMPONENT_ARTIFACT_TYPE: &str = "application/vnd.bytecodealliance.component.v1+wasm"; @@ -52,7 +51,10 @@ impl Client { digest: &AnyHash, ) -> Result { let path = self.cached_content_path(digest); - if Path::new(&path).try_exists().map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))? { + if Path::new(&path) + .try_exists() + .map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))? + { let file = File::open(path) .await .map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))?; @@ -69,18 +71,13 @@ impl Client { // block_on. let result = block_in_place(|| { Handle::current().block_on(async move { - let mut oci = self - .oci_client - .write() - .await; - oci - .pull(&reference, &self.auth, vec![WASM_LAYER_MEDIA_TYPE]) + let mut oci = self.oci_client.write().await; + oci.pull(&reference, &self.auth, vec![WASM_LAYER_MEDIA_TYPE]) .await }) }); - let image = result - .map_err(|e| ContentStoreInternalError(e.to_string()))?; + let image = result.map_err(|e| ContentStoreInternalError(e.to_string()))?; let layer = image .layers @@ -90,8 +87,7 @@ impl Client { let mut file = File::create(self.cached_content_path(digest)) .await .map_err(|e| ContentStoreInternalError(e.to_string()))?; - file - .write_all(&layer.data) + file.write_all(&layer.data) .await .map_err(|e| ContentStoreInternalError(e.to_string()))?; Ok(file) @@ -121,8 +117,8 @@ impl Client { }), ..Default::default() }; - let config_data = serde_json::to_vec(&config) - .map_err(|e| ContentStoreInternalError(e.to_string()))?; + let config_data = + serde_json::to_vec(&config).map_err(|e| ContentStoreInternalError(e.to_string()))?; let oci_config = Config::oci_v1(config_data, None); let mut layers = Vec::new(); let wasm_layer = Self::wasm_layer(file) @@ -135,15 +131,11 @@ impl Client { // TODO: fix the higher-level lifetime error that occurs when not using block_in_place and // block_on. - let result= block_in_place(|| { + let result = block_in_place(|| { Handle::current().block_on(async move { tracing::log::trace!("Pushing component to {:?}", reference); - let mut oci = self - .oci_client - .write() - .await; - oci - .push(&reference, &layers, oci_config, &self.auth, Some(manifest)) + let mut oci = self.oci_client.write().await; + oci.push(&reference, &layers, oci_config, &self.auth, Some(manifest)) .await }) }); @@ -154,7 +146,10 @@ impl Client { .map_err(|e| ContentStoreInternalError(e.to_string())) } - pub async fn content_exists(&self, reference: impl AsRef) -> Result { + pub async fn content_exists( + &self, + reference: impl AsRef, + ) -> Result { let reference: Reference = reference .as_ref() .parse() @@ -163,10 +158,7 @@ impl Client { .unwrap(); let mut oci = self.oci_client.write().await; - match oci - .fetch_manifest_digest(&reference, &self.auth) - .await - { + match oci.fetch_manifest_digest(&reference, &self.auth).await { Ok(_) => Ok(true), Err(_) => Ok(false), } @@ -177,7 +169,9 @@ impl Client { tracing::log::trace!("Reading wasm component from {:?}", file); let mut contents = vec![]; - file.read_to_end(&mut contents).await.context("cannot read wasm component")?; + file.read_to_end(&mut contents) + .await + .context("cannot read wasm component")?; Ok(ImageLayer::new( contents, diff --git a/crates/server/src/contentstore/oci/mod.rs b/crates/server/src/contentstore/oci/mod.rs index bacac086..8cf00ceb 100644 --- a/crates/server/src/contentstore/oci/mod.rs +++ b/crates/server/src/contentstore/oci/mod.rs @@ -1,2 +1,2 @@ -pub mod ociv1_1; mod client; +pub mod ociv1_1; diff --git a/crates/server/src/contentstore/oci/ociv1_1.rs b/crates/server/src/contentstore/oci/ociv1_1.rs index 03aa10f6..12183bd1 100644 --- a/crates/server/src/contentstore/oci/ociv1_1.rs +++ b/crates/server/src/contentstore/oci/ociv1_1.rs @@ -1,10 +1,10 @@ -use std::path::PathBuf; +use crate::contentstore::oci::client::Client; +use crate::contentstore::{ContentStore, ContentStoreError}; use oci_distribution::secrets::RegistryAuth; +use std::path::PathBuf; use tokio::fs::File; use warg_crypto::hash::AnyHash; use warg_protocol::registry::PackageId; -use crate::contentstore::{ContentStore, ContentStoreError}; -use crate::contentstore::oci::client::Client; type Auth = RegistryAuth; @@ -17,11 +17,18 @@ pub struct OCIv1_1ContentStore { impl OCIv1_1ContentStore { pub async fn new(registry_url: impl Into, auth: Auth, temp_dir: &PathBuf) -> Self { let client = Client::new(true, auth, temp_dir).await; - Self { client, registry_url: registry_url.into() } + Self { + client, + registry_url: registry_url.into(), + } } fn reference(&self, package_id: &PackageId, version: String) -> String { - let (reg_url, namespace, name) = (self.registry_url.clone(), package_id.namespace(), package_id.name()); + let (reg_url, namespace, name) = ( + self.registry_url.clone(), + package_id.namespace(), + package_id.name(), + ); format!("{reg_url}/{namespace}/{name}:{version}") } } @@ -36,10 +43,7 @@ impl ContentStore for OCIv1_1ContentStore { version: String, ) -> Result { let reference = self.reference(package_id, version); - self - .client - .pull(reference, digest) - .await + self.client.pull(reference, digest).await } /// Store the content in the store. @@ -48,13 +52,10 @@ impl ContentStore for OCIv1_1ContentStore { package_id: &PackageId, digest: &AnyHash, version: String, - content: &mut File + content: &mut File, ) -> Result { let reference = self.reference(package_id, version); - self - .client - .push(reference, content, digest) - .await + self.client.push(reference, content, digest).await } /// Check if the content is present in the store. diff --git a/crates/server/src/datastore/memory.rs b/crates/server/src/datastore/memory.rs index cb5c316c..3dee712d 100644 --- a/crates/server/src/datastore/memory.rs +++ b/crates/server/src/datastore/memory.rs @@ -233,7 +233,10 @@ impl DataStore for MemoryDataStore { }); let mut state = self.0.write().await; - state.names.entry(log_id.clone()).or_insert_with(|| package_id.clone()); + state + .names + .entry(log_id.clone()) + .or_insert_with(|| package_id.clone()); let prev = state.records.entry(log_id.clone()).or_default().insert( record_id.clone(), RecordStatus::Pending(PendingRecord::Package { diff --git a/crates/server/src/datastore/mod.rs b/crates/server/src/datastore/mod.rs index fc7b42db..e9e9d570 100644 --- a/crates/server/src/datastore/mod.rs +++ b/crates/server/src/datastore/mod.rs @@ -4,12 +4,16 @@ use futures::Stream; use thiserror::Error; pub use memory::*; -use PackageEntry::Release; #[cfg(feature = "postgres")] pub use postgres::*; use warg_crypto::{hash::AnyHash, signing::KeyID}; -use warg_protocol::{operator, package, ProtoEnvelope, registry::{LogId, LogLeaf, MapCheckpoint, PackageId, RecordId}, SerdeEnvelope, Version}; use warg_protocol::package::PackageEntry; +use warg_protocol::{ + operator, package, + registry::{LogId, LogLeaf, MapCheckpoint, PackageId, RecordId}, + ProtoEnvelope, SerdeEnvelope, Version, +}; +use PackageEntry::Release; mod memory; #[cfg(feature = "postgres")] @@ -79,8 +83,8 @@ pub enum RecordStatus { /// Represents a record in a log. pub struct Record - where - T: Clone, +where + T: Clone, { /// The status of the record. pub status: RecordStatus, @@ -231,10 +235,7 @@ pub trait DataStore: Send + Sync { ) -> Result>, DataStoreError>; /// Gets the package ID for the given log ID. - async fn get_package_id( - &self, - log_id: &LogId, - ) -> Result; + async fn get_package_id(&self, log_id: &LogId) -> Result; /// Gets an operator record. async fn get_operator_record( @@ -271,15 +272,20 @@ pub trait DataStore: Send + Sync { } pub fn get_version_for_release(record: &package::PackageRecord) -> Option<&Version> { - record.entries.iter().find_map(|entry| { - match entry { - Release { version, content: _ } => Some(version), - _ => None, - } + record.entries.iter().find_map(|entry| match entry { + Release { + version, + content: _, + } => Some(version), + _ => None, }) } -pub async fn get_release_version(data_store: &dyn DataStore, log_id: &LogId, record_id: &RecordId) -> Result { +pub async fn get_release_version( + data_store: &dyn DataStore, + log_id: &LogId, + record_id: &RecordId, +) -> Result { let record = data_store.get_package_record(&log_id, &record_id).await?; get_version_for_release(&record.envelope.as_ref()) .cloned() diff --git a/crates/server/src/datastore/postgres/mod.rs b/crates/server/src/datastore/postgres/mod.rs index 8b42d3b4..4eec3d59 100644 --- a/crates/server/src/datastore/postgres/mod.rs +++ b/crates/server/src/datastore/postgres/mod.rs @@ -747,8 +747,11 @@ impl DataStore for PostgresDataStore { get_records(&mut conn, log_id, checkpoint_id, since, limit as i64).await } - async fn get_package_id(&self, log_id: &LogId) -> std::result::Result { - let mut conn = self.0.get().await?; + async fn get_package_id( + &self, + log_id: &LogId, + ) -> std::result::Result { + let mut conn = self.pool.get().await?; schema::logs::table .select(schema::logs::name) .filter(schema::logs::log_id.eq(TextRef(log_id))) diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index 36b733bb..557a47af 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs @@ -1,6 +1,8 @@ +use crate::contentstore::local::LocalContentStore; use crate::{api::create_router, datastore::MemoryDataStore}; use anyhow::{Context, Result}; use axum::Router; +use contentstore::ContentStore; use datastore::DataStore; use futures::Future; use policy::{content::ContentPolicy, record::RecordPolicy}; @@ -15,16 +17,14 @@ use std::{ }; use tokio::task::JoinHandle; use url::Url; -use contentstore::ContentStore; use warg_crypto::signing::PrivateKey; -use crate::contentstore::local::LocalContentStore; pub mod api; pub mod args; +pub mod contentstore; pub mod datastore; pub mod policy; pub mod services; -pub mod contentstore; const DEFAULT_BIND_ADDRESS: &str = "127.0.0.1:8090"; const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(5); diff --git a/tests/server.rs b/tests/server.rs index 60d029aa..47c7713f 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -421,8 +421,8 @@ async fn test_custom_content_url(config: &Config) -> Result<()> { .await?; let expected_url = format!( - "https://example.com/content/{digest}", - digest = digest.to_string().replace(':', "-") + "https://example.com/v1/package/{log_id}/record/{record_id}/content/{digest}", + record_id = record.id, ); match record.state { From 64736cc17d004ed843b5785073c457f6b2cde501 Mon Sep 17 00:00:00 2001 From: David Justice Date: Thu, 3 Aug 2023 14:21:59 -0400 Subject: [PATCH 7/8] implement s3 compatible content store without signed Uris Signed-off-by: David Justice --- .gitignore | 3 + Cargo.lock | 564 +++++++++++++++++++++++ Cargo.toml | 6 + crates/server/Cargo.toml | 18 +- crates/server/src/bin/warg-server.rs | 62 ++- crates/server/src/contentstore/mod.rs | 3 + crates/server/src/contentstore/s3/mod.rs | 216 +++++++++ 7 files changed, 866 insertions(+), 6 deletions(-) create mode 100644 crates/server/src/contentstore/s3/mod.rs diff --git a/.gitignore b/.gitignore index 3ae03816..8358af30 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,6 @@ /target *.swp content + +.env + diff --git a/Cargo.lock b/Cargo.lock index 2f2f36f5..f03f8818 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -280,6 +280,367 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "aws-config" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcdcf0d683fe9c23d32cf5b53c9918ea0a500375a9fb20109802552658e576c9" +dependencies = [ + "aws-credential-types", + "aws-http", + "aws-sdk-sso", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "hex", + "http", + "hyper", + "ring", + "time 0.3.21", + "tokio", + "tower", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-credential-types" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fcdb2f7acbc076ff5ad05e7864bdb191ca70a6fd07668dc3a1a8bcd051de5ae" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "fastrand", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-endpoint" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cce1c41a6cfaa726adee9ebb9a56fcd2bbfd8be49fd8a04c5e20fd968330b04" +dependencies = [ + "aws-smithy-http", + "aws-smithy-types", + "aws-types", + "http", + "regex", + "tracing", +] + +[[package]] +name = "aws-http" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aadbc44e7a8f3e71c8b374e03ecd972869eb91dd2bc89ed018954a52ba84bc44" +dependencies = [ + "aws-credential-types", + "aws-smithy-http", + "aws-smithy-types", + "aws-types", + "bytes", + "http", + "http-body", + "lazy_static", + "percent-encoding", + "pin-project-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-s3" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fba197193cbb4bcb6aad8d99796b2291f36fa89562ded5d4501363055b0de89f" +dependencies = [ + "aws-credential-types", + "aws-endpoint", + "aws-http", + "aws-sig-auth", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-checksums", + "aws-smithy-client", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "http", + "http-body", + "once_cell", + "percent-encoding", + "regex", + "tokio-stream", + "tower", + "tracing", + "url", +] + +[[package]] +name = "aws-sdk-sso" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8b812340d86d4a766b2ca73f740dfd47a97c2dff0c06c8517a16d88241957e4" +dependencies = [ + "aws-credential-types", + "aws-endpoint", + "aws-http", + "aws-sig-auth", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-types", + "aws-types", + "bytes", + "http", + "regex", + "tokio-stream", + "tower", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "265fac131fbfc188e5c3d96652ea90ecc676a934e3174eaaee523c6cec040b3b" +dependencies = [ + "aws-credential-types", + "aws-endpoint", + "aws-http", + "aws-sig-auth", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "http", + "regex", + "tower", + "tracing", +] + +[[package]] +name = "aws-sig-auth" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b94acb10af0c879ecd5c7bdf51cda6679a0a4f4643ce630905a77673bfa3c61" +dependencies = [ + "aws-credential-types", + "aws-sigv4", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-types", + "http", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d2ce6f507be68e968a33485ced670111d1cbad161ddbbab1e313c03d37d8f4c" +dependencies = [ + "aws-smithy-eventstream", + "aws-smithy-http", + "bytes", + "form_urlencoded", + "hex", + "hmac", + "http", + "once_cell", + "percent-encoding", + "regex", + "sha2", + "time 0.3.21", + "tracing", +] + +[[package]] +name = "aws-smithy-async" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13bda3996044c202d75b91afeb11a9afae9db9a721c6a7a427410018e286b880" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", + "tokio-stream", +] + +[[package]] +name = "aws-smithy-checksums" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07ed8b96d95402f3f6b8b57eb4e0e45ee365f78b1a924faf20ff6e97abf1eae6" +dependencies = [ + "aws-smithy-http", + "aws-smithy-types", + "bytes", + "crc32c", + "crc32fast", + "hex", + "http", + "http-body", + "md-5", + "pin-project-lite", + "sha1", + "sha2", + "tracing", +] + +[[package]] +name = "aws-smithy-client" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a86aa6e21e86c4252ad6a0e3e74da9617295d8d6e374d552be7d3059c41cedd" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-types", + "bytes", + "fastrand", + "http", + "http-body", + "hyper", + "hyper-rustls", + "lazy_static", + "pin-project-lite", + "rustls", + "tokio", + "tower", + "tracing", +] + +[[package]] +name = "aws-smithy-eventstream" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460c8da5110835e3d9a717c61f5556b20d03c32a1dec57f8fc559b360f733bb8" +dependencies = [ + "aws-smithy-types", + "bytes", + "crc32fast", +] + +[[package]] +name = "aws-smithy-http" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b3b693869133551f135e1f2c77cb0b8277d9e3e17feaf2213f735857c4f0d28" +dependencies = [ + "aws-smithy-eventstream", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "http", + "http-body", + "hyper", + "once_cell", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "aws-smithy-http-tower" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ae4f6c5798a247fac98a867698197d9ac22643596dc3777f0c76b91917616b9" +dependencies = [ + "aws-smithy-http", + "aws-smithy-types", + "bytes", + "http", + "http-body", + "pin-project-lite", + "tower", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23f9f42fbfa96d095194a632fbac19f60077748eba536eb0b9fecc28659807f8" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-query" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98819eb0b04020a1c791903533b638534ae6c12e2aceda3e6e6fba015608d51d" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-types" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16a3d0bf4f324f4ef9793b86a1701d9700fbcdbd12a846da45eed104c634c6e8" +dependencies = [ + "base64-simd", + "itoa", + "num-integer", + "ryu", + "time 0.3.21", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1b9d12875731bd07e767be7baad95700c3137b56730ec9ddeedb52a5e5ca63b" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd209616cc8d7bfb82f87811a5c655dc97537f592689b18743bddf5dc5c4829" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-types", + "http", + "rustc_version", + "tracing", +] + [[package]] name = "axum" version = "0.6.18" @@ -376,6 +737,16 @@ version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "base64ct" version = "1.6.0" @@ -467,6 +838,16 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +[[package]] +name = "bytes-utils" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e47d3a8076e283f3acd27400535992edb3ba4b5bb72f8891ad8fbe7932a7d4b9" +dependencies = [ + "bytes", + "either", +] + [[package]] name = "cap-fs-ext" version = "1.0.15" @@ -820,6 +1201,15 @@ dependencies = [ "wasmtime-types", ] +[[package]] +name = "crc32c" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8f48d60e5b4d2c53d5c2b1d8a58c849a70ae5e5509b08a48d047e3b65714a74" +dependencies = [ + "rustc_version", +] + [[package]] name = "crc32fast" version = "1.3.2" @@ -1763,6 +2153,39 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-proxy" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca815a891b24fdfb243fa3239c86154392b0953ee584aa1a2a1f66d20cbe75cc" +dependencies = [ + "bytes", + "futures", + "headers", + "http", + "hyper", + "hyper-tls", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + +[[package]] +name = "hyper-rustls" +version = "0.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c" +dependencies = [ + "http", + "hyper", + "log", + "rustls", + "rustls-native-certs", + "tokio", + "tokio-rustls", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -2506,6 +2929,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "outref" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" + [[package]] name = "overload" version = "0.1.1" @@ -3156,6 +3585,21 @@ dependencies = [ "subtle", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi", +] + [[package]] name = "rpassword" version = "7.2.0" @@ -3189,6 +3633,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "0.37.23" @@ -3218,6 +3671,39 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "rustls" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" +dependencies = [ + "log", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" +dependencies = [ + "base64 0.21.2", +] + [[package]] name = "rustversion" version = "1.0.12" @@ -3264,6 +3750,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "sec1" version = "0.7.2" @@ -3569,6 +4065,12 @@ dependencies = [ "smallvec", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "spki" version = "0.7.2" @@ -3856,6 +4358,28 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-rustls" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +dependencies = [ + "rustls", + "tokio", + "webpki", +] + +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.8" @@ -4093,6 +4617,12 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "url" version = "2.4.0" @@ -4104,6 +4634,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf8parse" version = "0.2.1" @@ -4134,6 +4670,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "waker-fn" version = "1.1.0" @@ -4304,6 +4846,10 @@ name = "warg-server" version = "0.1.0" dependencies = [ "anyhow", + "aws-config", + "aws-credential-types", + "aws-sdk-s3", + "aws-smithy-client", "axum", "bytes", "chrono", @@ -4314,6 +4860,8 @@ dependencies = [ "diesel_json", "diesel_migrations", "futures", + "hyper", + "hyper-proxy", "indexmap 2.0.0", "oci-distribution", "secrecy", @@ -4892,6 +5440,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "which" version = "4.4.0" @@ -5247,6 +5805,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "xmlparser" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d25c75bf9ea12c4040a97f829154768bbbce366287e2dc044af160cd79a13fd" + [[package]] name = "yansi" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 7b4fe5a6..d7cb2472 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,3 +120,9 @@ regex = "1" wasmparser = "0.108.0" protox = "0.4.1" toml = "0.7.6" +aws-config = "0.55.3" +aws-sdk-s3 = "0.28.0" +aws-credential-types = "0.55.3" +aws-smithy-client = "0.55.3" +hyper = "0.14.13" +hyper-proxy = "0.9.1" \ No newline at end of file diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index cc2b9c69..92e015c5 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -19,7 +19,7 @@ tempfile = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } tower = { workspace = true } -tower-http = { workspace = true, features = ["trace", "cors"]} +tower-http = { workspace = true, features = ["trace", "cors"] } tracing = { workspace = true } tracing-subscriber = { workspace = true } indexmap = { workspace = true } @@ -29,16 +29,24 @@ bytes = { workspace = true } wasmparser = { workspace = true } secrecy = { workspace = true } toml = { workspace = true } +serde_json = { workspace = true, optional = true } diesel = { workspace = true, features = ["postgres", "serde_json", "chrono"], optional = true } diesel-async = { workspace = true, features = ["postgres", "deadpool"], optional = true } -diesel_json = { workspace = true, optional = true} +diesel_json = { workspace = true, optional = true } diesel_migrations = { workspace = true, optional = true } diesel-derive-enum = { workspace = true, optional = true, features = ["postgres"] } -serde_json = { workspace = true } chrono = { workspace = true, optional = true } -oci-distribution = { git = "https://github.com/devigned/oci-distribution", branch = "os-wasi" } +aws-config = { workspace = true, optional = true } +aws-sdk-s3 = { workspace = true, optional = true } +aws-credential-types = { workspace = true, optional = true } +aws-smithy-client = { workspace = true, optional = true } +hyper = { workspace = true, optional = true } +hyper-proxy = { workspace = true, optional = true } +oci-distribution = { git = "https://github.com/devigned/oci-distribution", branch = "os-wasi", optional = true } [features] default = [] debug = [] -postgres = ["diesel", "diesel-async", "diesel_json", "diesel_migrations", "diesel-derive-enum", "chrono"] +postgres = ["diesel", "diesel-async", "diesel_json", "diesel_migrations", "diesel-derive-enum", "chrono", "serde_json"] +s3 = ["aws-config", "aws-sdk-s3", "aws-credential-types", "aws-smithy-client", "hyper", "hyper-proxy"] +oci = ["oci-distribution", "serde_json"] diff --git a/crates/server/src/bin/warg-server.rs b/crates/server/src/bin/warg-server.rs index 64594a47..11c1a1a5 100644 --- a/crates/server/src/bin/warg-server.rs +++ b/crates/server/src/bin/warg-server.rs @@ -1,6 +1,5 @@ use anyhow::{Context, Result}; use clap::{Parser, ValueEnum}; -use oci_distribution::secrets::RegistryAuth::Anonymous; use secrecy::SecretString; use std::{net::SocketAddr, path::PathBuf}; use tokio::signal; @@ -21,8 +20,11 @@ enum DataStoreKind { enum ContentStoreKind { #[default] Local, + #[cfg(feature = "oci")] #[value(alias("ociv1-1"))] OCIv1_1, + #[cfg(feature = "s3")] + S3, } #[derive(Parser, Debug)] @@ -52,6 +54,7 @@ struct Args { content_store: ContentStoreKind, /// The OCI registry URL if content store is set to oci. + #[cfg(feature = "oci")] #[arg(long, env = "WARG_OCI_REGISTRY_URL", default_value = "localhost:5000")] oci_registry_url: Option, @@ -73,6 +76,31 @@ struct Args { #[arg(long)] database_run_migrations: bool, + /// The S3 compatible endpoint. + #[cfg(feature = "s3")] + #[arg(long, env = "WARG_S3_ENDPOINT")] + s3_endpoint: Option, + + /// The S3 compatible API key secret. + #[cfg(feature = "s3")] + #[arg(long, env = "WARG_S3_API_KEY_ID")] + s3_api_key_id: Option, + + /// The S3 compatible API key secret. + #[cfg(feature = "s3")] + #[arg(long, env = "WARG_S3_API_KEY_SECRET")] + s3_api_key_secret: Option, + + /// The S3 compatible region. + #[cfg(feature = "s3")] + #[arg(long, env = "WARG_S3_REGION", default_value = "auto")] + s3_region: Option, + + /// The S3 compatible region. + #[cfg(feature = "s3")] + #[arg(long, env = "WARG_S3_BUCKET_NAME", default_value = "warg-registry")] + s3_bucket_name: Option, + /// The operator key. /// /// Prefer using `operator-key-file`, or environment variable variation. @@ -133,7 +161,9 @@ async fn main() -> Result<()> { tracing::info!("using local content store"); config } + #[cfg(feature = "oci")] ContentStoreKind::OCIv1_1 => { + use oci_distribution::secrets::RegistryAuth::Anonymous; use warg_server::contentstore::oci::ociv1_1::OCIv1_1ContentStore; tracing::info!("using OCIv1.1 content store"); config.with_content_store( @@ -145,6 +175,36 @@ async fn main() -> Result<()> { .await, ) } + #[cfg(feature = "s3")] + ContentStoreKind::S3 => { + use warg_server::contentstore::s3::S3ContentStore; + tracing::info!("using s3 content store"); + config.with_content_store( + S3ContentStore::new( + args.s3_endpoint + .with_context(|| "must specify the s3 compatible endpoint: --s3-endpoint") + .unwrap(), + args.s3_api_key_id + .with_context(|| { + "must specify the s3 compatible API key ID: --s3-api-key-id" + }) + .unwrap(), + args.s3_api_key_secret + .with_context(|| { + "must specify the s3 compatible API key secret: --s3-api-key-secret" + }) + .unwrap(), + args.s3_region + .with_context(|| "must specify the s3 compatible region") + .unwrap(), + args.s3_bucket_name + .with_context(|| "must specify the s3 compatible bucket name") + .unwrap(), + &args.content_dir, + ) + .await, + ) + } }; let config = match args.data_store { diff --git a/crates/server/src/contentstore/mod.rs b/crates/server/src/contentstore/mod.rs index bf632646..8bc35aeb 100644 --- a/crates/server/src/contentstore/mod.rs +++ b/crates/server/src/contentstore/mod.rs @@ -4,7 +4,10 @@ use warg_crypto::hash::AnyHash; use warg_protocol::registry::PackageId; pub mod local; +#[cfg(feature = "oci")] pub mod oci; +#[cfg(feature = "s3")] +pub mod s3; #[derive(Debug, Error)] pub enum ContentStoreError { diff --git a/crates/server/src/contentstore/s3/mod.rs b/crates/server/src/contentstore/s3/mod.rs new file mode 100644 index 00000000..a2e22339 --- /dev/null +++ b/crates/server/src/contentstore/s3/mod.rs @@ -0,0 +1,216 @@ +use crate::contentstore::ContentStoreError::ContentStoreInternalError; +use crate::contentstore::{ContentStore, ContentStoreError}; +use aws_credential_types::Credentials; +use aws_sdk_s3; +use aws_sdk_s3::config::Region; +use aws_sdk_s3::primitives::ByteStream; +use axum::http::HeaderValue; +use futures::TryStreamExt; +use hyper::client::HttpConnector; +use hyper::Uri; +use hyper_proxy::{Intercept, Proxy, ProxyConnector}; +use secrecy::{ExposeSecret, SecretString}; +use std::env; +use std::path::{Path, PathBuf}; +use std::str::FromStr; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tracing::info; +use url::Url; +use warg_crypto::hash::AnyHash; +use warg_protocol::registry::PackageId; + +pub struct S3ContentStore { + client: aws_sdk_s3::Client, + // on R2 there is a max number of buckets per account (default: 1000) + bucket_name: String, + temp_dir: PathBuf, +} + +impl S3ContentStore { + pub async fn new( + endpoint: Url, + access_key_id: SecretString, + access_key_secret: SecretString, + region: String, + bucket_name: String, + temp_dir: &PathBuf, + ) -> Self { + let creds = Credentials::new( + access_key_id.expose_secret().to_string(), + access_key_secret.expose_secret().to_string(), + None, + None, + "warg-s3-static-provider", + ); + let config_builder = aws_config::from_env() + .region(Region::new(region)) + .endpoint_url(endpoint) + .credentials_provider(creds); + let config = match env::var("HTTP_PROXY") { + Ok(proxy_env) => { + let proxy_str = proxy_env.as_str(); + let proxy_uri = Uri::from_str(proxy_str).unwrap(); + let proxy_connector = { + let proxy = Proxy::new(Intercept::All, proxy_uri.clone()); + let connector = HttpConnector::new(); + ProxyConnector::from_proxy(connector, proxy).unwrap() + }; + // need to ensure the `hyper` feature of smithy-client is enabled + let hyper_client = + aws_smithy_client::hyper_ext::Adapter::builder().build(proxy_connector); + info!("Using proxy {}", proxy_str); + aws_sdk_s3::config::Builder::from(&config_builder.load().await) + .http_connector(hyper_client) + .build() + } + _ => (&config_builder.load().await).into(), + }; + let client = aws_sdk_s3::Client::from_conf(config); + Self { + client, + bucket_name, + temp_dir: temp_dir.clone(), + } + } + + /// Returns the path to the content file for a given content address. + fn cached_content_path(&self, digest: &AnyHash) -> PathBuf { + self.temp_dir.join(Self::content_file_name(digest)) + } + + /// Returns the file name for a given content address replacing colons with dashes. + fn content_file_name(digest: &AnyHash) -> String { + digest.to_string().replace(':', "-") + } + + fn content_store_name(package_id: &PackageId, version: &str, digest: &AnyHash) -> String { + format!( + "{}-{}-{}-{}", + package_id.namespace(), + package_id.name(), + version, + Self::content_file_name(digest) + ) + } +} + +#[axum::async_trait] +impl ContentStore for S3ContentStore { + async fn fetch_content( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + ) -> Result { + let path = self.cached_content_path(digest); + if Path::new(&path) + .try_exists() + .map_err(|e| ContentStoreInternalError(e.to_string()))? + { + let file = File::open(path) + .await + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + return Ok(file); + } + + let mut object = self + .client + .get_object() + .bucket(self.bucket_name.clone()) + .key(S3ContentStore::content_store_name( + package_id, &version, digest, + )) + .send() + .await + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + + let mut file = File::create(path) + .await + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + + while let Some(bits) = object + .body + .try_next() + .await + .map_err(|e| ContentStoreInternalError(e.to_string()))? + { + file.write_all(&bits) + .await + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + } + Ok(file) + } + + async fn store_content( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + content: &mut File, + ) -> Result { + let mut buf: Vec = Vec::new(); + content.read_to_end(&mut buf).await.map_err(|e| { + ContentStoreInternalError(format!( + "cannot read content for package {} version {}: {}", + package_id, + version.clone(), + e + )) + })?; + + self.client + .put_object() + .bucket(self.bucket_name.clone()) + .body(ByteStream::from(buf)) + .key(S3ContentStore::content_store_name( + package_id, &version, digest, + )) + .content_type("application/wasm".to_string()) + .customize() + .await + .map_err(|e| { + ContentStoreInternalError(format!( + "cannot customize request for package {} version {}: {}", + package_id, + version.clone(), + e + )) + })? + // add a header so that Cloudflare will automatically create the bucket if it doesn't exist + .mutate_request(|req| { + req.headers_mut().insert( + "cf-create-bucket-if-missing", + HeaderValue::from_static("true"), + ); + }) + .send() + .await + .map_err(|e| { + ContentStoreInternalError(format!( + "cannot store content for package {} version {}: {}", + package_id, version, e + )) + })?; + + Ok(digest.to_string()) + } + + async fn content_present( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + ) -> Result { + self.client + .head_object() + .bucket(self.bucket_name.clone()) + .key(S3ContentStore::content_store_name( + package_id, &version, digest, + )) + .send() + .await + .map(|_| true) + .or_else(|_| Ok(false)) + } +} From 851a82c191b7a7b284631ded221d6a6754931879 Mon Sep 17 00:00:00 2001 From: David Justice Date: Fri, 4 Aug 2023 16:30:38 -0400 Subject: [PATCH 8/8] add presign behavior to the content store with s3 compat impl Signed-off-by: David Justice --- crates/server/src/bin/warg-server.rs | 16 ++++- crates/server/src/contentstore/mod.rs | 27 ++++++++ crates/server/src/contentstore/s3/mod.rs | 78 +++++++++++++++++++++++- 3 files changed, 116 insertions(+), 5 deletions(-) diff --git a/crates/server/src/bin/warg-server.rs b/crates/server/src/bin/warg-server.rs index 11c1a1a5..457da72a 100644 --- a/crates/server/src/bin/warg-server.rs +++ b/crates/server/src/bin/warg-server.rs @@ -101,6 +101,11 @@ struct Args { #[arg(long, env = "WARG_S3_BUCKET_NAME", default_value = "warg-registry")] s3_bucket_name: Option, + /// The S3 compatible presign time to live in u64 seconds. + #[cfg(feature = "s3")] + #[arg(long, env = "WARG_S3_PRESIGN_TTL", default_value = "3600")] + s3_presign_ttl: Option, + /// The operator key. /// /// Prefer using `operator-key-file`, or environment variable variation. @@ -195,12 +200,19 @@ async fn main() -> Result<()> { }) .unwrap(), args.s3_region - .with_context(|| "must specify the s3 compatible region") + .with_context(|| "must specify the s3 compatible region: --s3-region") .unwrap(), args.s3_bucket_name - .with_context(|| "must specify the s3 compatible bucket name") + .with_context(|| { + "must specify the s3 compatible bucket name: --s3-bucket-name" + }) .unwrap(), &args.content_dir, + args.s3_presign_ttl + .with_context(|| { + "must specify the s3 compatible presign time to live: --s3-presign-ttl" + }) + .unwrap(), ) .await, ) diff --git a/crates/server/src/contentstore/mod.rs b/crates/server/src/contentstore/mod.rs index 8bc35aeb..204e9ed9 100644 --- a/crates/server/src/contentstore/mod.rs +++ b/crates/server/src/contentstore/mod.rs @@ -1,3 +1,4 @@ +use hyper::Uri; use thiserror::Error; use tokio::fs::File; use warg_crypto::hash::AnyHash; @@ -18,6 +19,28 @@ pub enum ContentStoreError { ContentStoreInternalError(String), } +pub enum ContentStoreUriSigning { + None, + Presigned(Box), +} + +/// Implemented by content stores that support presigned URIs. +#[axum::async_trait] +pub trait PresignedContentStore { + async fn read_uri( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + ) -> Result; + async fn write_uri( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + ) -> Result; +} + /// Implemented by content stores. #[axum::async_trait] pub trait ContentStore: Send + Sync { @@ -44,4 +67,8 @@ pub trait ContentStore: Send + Sync { digest: &AnyHash, version: String, ) -> Result; + + async fn uri_signing(&self) -> ContentStoreUriSigning { + ContentStoreUriSigning::None + } } diff --git a/crates/server/src/contentstore/s3/mod.rs b/crates/server/src/contentstore/s3/mod.rs index a2e22339..8c8112d5 100644 --- a/crates/server/src/contentstore/s3/mod.rs +++ b/crates/server/src/contentstore/s3/mod.rs @@ -1,8 +1,11 @@ use crate::contentstore::ContentStoreError::ContentStoreInternalError; -use crate::contentstore::{ContentStore, ContentStoreError}; +use crate::contentstore::{ + ContentStore, ContentStoreError, ContentStoreUriSigning, PresignedContentStore, +}; use aws_credential_types::Credentials; use aws_sdk_s3; use aws_sdk_s3::config::Region; +use aws_sdk_s3::presigning::PresigningConfig; use aws_sdk_s3::primitives::ByteStream; use axum::http::HeaderValue; use futures::TryStreamExt; @@ -13,6 +16,7 @@ use secrecy::{ExposeSecret, SecretString}; use std::env; use std::path::{Path, PathBuf}; use std::str::FromStr; +use std::time::Duration; use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tracing::info; @@ -20,10 +24,12 @@ use url::Url; use warg_crypto::hash::AnyHash; use warg_protocol::registry::PackageId; +#[derive(Debug, Clone)] pub struct S3ContentStore { client: aws_sdk_s3::Client, // on R2 there is a max number of buckets per account (default: 1000) bucket_name: String, + presign_ttl: u64, temp_dir: PathBuf, } @@ -35,6 +41,7 @@ impl S3ContentStore { region: String, bucket_name: String, temp_dir: &PathBuf, + presign_ttl: u64, ) -> Self { let creds = Credentials::new( access_key_id.expose_secret().to_string(), @@ -44,8 +51,8 @@ impl S3ContentStore { "warg-s3-static-provider", ); let config_builder = aws_config::from_env() - .region(Region::new(region)) - .endpoint_url(endpoint) + .region(Region::new(region.clone())) + .endpoint_url(endpoint.clone()) .credentials_provider(creds); let config = match env::var("HTTP_PROXY") { Ok(proxy_env) => { @@ -71,6 +78,7 @@ impl S3ContentStore { client, bucket_name, temp_dir: temp_dir.clone(), + presign_ttl, } } @@ -93,6 +101,15 @@ impl S3ContentStore { Self::content_file_name(digest) ) } + + fn get_presign_config(&self) -> Result { + Ok(PresigningConfig::builder() + .expires_in(Duration::from_secs(self.presign_ttl)) + .build() + .map_err(|e| { + ContentStoreInternalError(format!("cannot build presigning config: {}", e)) + })?) + } } #[axum::async_trait] @@ -213,4 +230,59 @@ impl ContentStore for S3ContentStore { .map(|_| true) .or_else(|_| Ok(false)) } + + async fn uri_signing(&self) -> ContentStoreUriSigning { + ContentStoreUriSigning::Presigned(Box::new(self.clone())) + } +} + +#[axum::async_trait] +impl PresignedContentStore for S3ContentStore { + async fn read_uri( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + ) -> Result { + let config = self.get_presign_config()?; + + let presigned = self + .client + .get_object() + .bucket(self.bucket_name.clone()) + .key(S3ContentStore::content_store_name( + package_id, &version, digest, + )) + .presigned(config) + .await + .map_err(|e| { + ContentStoreInternalError(format!("cannot generate presigned Uri: {e}")) + })?; + + Ok(presigned.uri().clone()) + } + + async fn write_uri( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + ) -> Result { + let config = self.get_presign_config()?; + + let presigned = self + .client + .put_object() + .bucket(self.bucket_name.clone()) + .key(S3ContentStore::content_store_name( + package_id, &version, digest, + )) + .presigned(config) + .await + .map_err(|e| { + ContentStoreInternalError(format!("cannot generate presigned Uri: {e}")) + })?; + + Ok(presigned.uri().clone()) + } }