From 2b297de0f247c3818d26a8beb7527deb8e6ffb94 Mon Sep 17 00:00:00 2001 From: haze518 Date: Tue, 14 Jan 2025 09:00:55 +0600 Subject: [PATCH] Refactor: Replace dyn Trait with Enum for Encryptor and Persister Types --- Cargo.lock | 43 +++++++++++++- cli/src/main.rs | 8 +-- integration/tests/state/mod.rs | 15 ++--- .../tests/streaming/common/test_setup.rs | 4 +- sdk/Cargo.toml | 2 +- sdk/src/clients/builder.rs | 6 +- sdk/src/clients/client.rs | 6 +- sdk/src/clients/consumer.rs | 12 ++-- sdk/src/clients/producer.rs | 12 ++-- sdk/src/utils/crypto.rs | 23 ++++++-- server/Cargo.toml | 5 +- server/src/http/jwt/jwt_manager.rs | 6 +- server/src/http/jwt/storage.rs | 9 +-- server/src/state/file.rs | 12 ++-- server/src/streaming/partitions/storage.rs | 6 +- server/src/streaming/persistence/persister.rs | 57 ++++++++++++++----- server/src/streaming/persistence/task.rs | 7 ++- server/src/streaming/segments/storage.rs | 8 +-- server/src/streaming/storage.rs | 31 +++------- server/src/streaming/systems/storage.rs | 6 +- server/src/streaming/systems/system.rs | 18 +++--- tools/src/data-seeder/main.rs | 8 +-- 22 files changed, 188 insertions(+), 116 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8a9d85b4b..c90e98371 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1476,6 +1476,12 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "dtoa" version = "1.0.9" @@ -1690,6 +1696,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "fs_extra" version = "1.3.0" @@ -2318,7 +2330,7 @@ dependencies = [ [[package]] name = "iggy" -version = "0.6.71" +version = "0.6.80" dependencies = [ "aes-gcm", "ahash 0.8.11", @@ -2822,6 +2834,32 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "mockall" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39a6bfcc6c8c7eed5ee98b9c3e33adc726054389233e201c95dab2d41a3839d2" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25ca3004c2efe9011bd4e461bd8256445052b9615405b4f7ea43fc8ca5c20898" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "moka" version = "0.12.10" @@ -4342,7 +4380,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.112" +version = "0.4.120" dependencies = [ "ahash 0.8.11", "anyhow", @@ -4368,6 +4406,7 @@ dependencies = [ "iggy", "jsonwebtoken", "mimalloc", + "mockall", "moka", "openssl", "opentelemetry", diff --git a/cli/src/main.rs b/cli/src/main.rs index 7e7e43d64..044f97958 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -65,7 +65,7 @@ use iggy::cli::{ use iggy::cli_command::{CliCommand, PRINT_TARGET}; use iggy::client_provider::{self, ClientProviderConfig}; use iggy::clients::client::IggyClient; -use iggy::utils::crypto::{Aes256GcmEncryptor, Encryptor}; +use iggy::utils::crypto::{Aes256GcmEncryptor, EncryptorKind}; use iggy::utils::personal_access_token_expiry::PersonalAccessTokenExpiry; use std::sync::Arc; use tracing::{event, Level}; @@ -339,11 +339,11 @@ async fn main() -> Result<(), IggyCmdError> { // Create credentials based on command line arguments and command let mut credentials = IggyCredentials::new(&cli_options, &iggy_args, command.login_required())?; - let encryptor: Option> = match iggy_args.encryption_key.is_empty() { + let encryptor = match iggy_args.encryption_key.is_empty() { true => None, - false => Some(Arc::new( + false => Some(Arc::new(EncryptorKind::Aes256Gcm( Aes256GcmEncryptor::from_base64_key(&iggy_args.encryption_key).unwrap(), - )), + ))), }; let client_provider_config = Arc::new(ClientProviderConfig::from_args_set_autologin( iggy_args.clone(), diff --git a/integration/tests/state/mod.rs b/integration/tests/state/mod.rs index 588163120..9fba07526 100644 --- a/integration/tests/state/mod.rs +++ b/integration/tests/state/mod.rs @@ -1,6 +1,6 @@ -use iggy::utils::crypto::{Aes256GcmEncryptor, Encryptor}; +use iggy::utils::crypto::{Aes256GcmEncryptor, EncryptorKind}; use server::state::file::FileState; -use server::streaming::persistence::persister::FilePersister; +use server::streaming::persistence::persister::{FilePersister, PersisterKind}; use server::versioning::SemanticVersion; use std::str::FromStr; use std::sync::Arc; @@ -31,11 +31,12 @@ impl StateSetup { create_dir(&directory_path).await.unwrap(); let version = SemanticVersion::from_str("1.2.3").unwrap(); - let persister = FilePersister {}; - let encryptor: Option> = match encryption_key { - Some(key) => Some(Arc::new(Aes256GcmEncryptor::new(key).unwrap())), - None => None, - }; + let persister = PersisterKind::File(FilePersister {}); + let encryptor = encryption_key.map(|key| { + Arc::new(EncryptorKind::Aes256Gcm( + Aes256GcmEncryptor::new(key).unwrap(), + )) + }); let state = FileState::new(&log_path, &version, Arc::new(persister), encryptor); Self { diff --git a/integration/tests/streaming/common/test_setup.rs b/integration/tests/streaming/common/test_setup.rs index 7585ee60c..4ab4f1939 100644 --- a/integration/tests/streaming/common/test_setup.rs +++ b/integration/tests/streaming/common/test_setup.rs @@ -1,5 +1,5 @@ use server::configs::system::SystemConfig; -use server::streaming::persistence::persister::FilePersister; +use server::streaming::persistence::persister::{FilePersister, PersisterKind}; use server::streaming::storage::SystemStorage; use std::sync::Arc; use tokio::fs; @@ -20,7 +20,7 @@ impl TestSetup { let config = Arc::new(config); fs::create_dir(config.get_system_path()).await.unwrap(); - let persister = FilePersister {}; + let persister = PersisterKind::File(FilePersister {}); let storage = Arc::new(SystemStorage::new(config.clone(), Arc::new(persister))); TestSetup { config, storage } } diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index ea636bd9d..0b1ac5a4f 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iggy" -version = "0.6.71" +version = "0.6.80" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2021" license = "Apache-2.0" diff --git a/sdk/src/clients/builder.rs b/sdk/src/clients/builder.rs index 2e19550ae..d43f46f8b 100644 --- a/sdk/src/clients/builder.rs +++ b/sdk/src/clients/builder.rs @@ -8,7 +8,7 @@ use crate::quic::client::QuicClient; use crate::quic::config::QuicClientConfigBuilder; use crate::tcp::client::TcpClient; use crate::tcp::config::TcpClientConfigBuilder; -use crate::utils::crypto::Encryptor; +use crate::utils::crypto::EncryptorKind; use crate::utils::duration::IggyDuration; use std::sync::Arc; use tracing::error; @@ -18,7 +18,7 @@ use tracing::error; pub struct IggyClientBuilder { client: Option>, partitioner: Option>, - encryptor: Option>, + encryptor: Option>, } impl IggyClientBuilder { @@ -49,7 +49,7 @@ impl IggyClientBuilder { } /// Use the custom encryptor implementation. - pub fn with_encryptor(mut self, encryptor: Arc) -> Self { + pub fn with_encryptor(mut self, encryptor: Arc) -> Self { self.encryptor = Some(encryptor); self } diff --git a/sdk/src/clients/client.rs b/sdk/src/clients/client.rs index 66a237eb2..d95a63f80 100644 --- a/sdk/src/clients/client.rs +++ b/sdk/src/clients/client.rs @@ -31,7 +31,7 @@ use crate::partitioner::Partitioner; use crate::snapshot::{SnapshotCompression, SystemSnapshotType}; use crate::tcp::client::TcpClient; use crate::utils::byte_size::IggyByteSize; -use crate::utils::crypto::Encryptor; +use crate::utils::crypto::EncryptorKind; use crate::utils::duration::IggyDuration; use crate::utils::expiry::IggyExpiry; use crate::utils::personal_access_token_expiry::PersonalAccessTokenExpiry; @@ -55,7 +55,7 @@ use tracing::{debug, error, info}; pub struct IggyClient { client: IggySharedMut>, partitioner: Option>, - encryptor: Option>, + encryptor: Option>, } impl Default for IggyClient { @@ -96,7 +96,7 @@ impl IggyClient { pub fn create( client: Box, partitioner: Option>, - encryptor: Option>, + encryptor: Option>, ) -> Self { if partitioner.is_some() { info!("Partitioner is enabled."); diff --git a/sdk/src/clients/consumer.rs b/sdk/src/clients/consumer.rs index be6289192..33ac20f0f 100644 --- a/sdk/src/clients/consumer.rs +++ b/sdk/src/clients/consumer.rs @@ -7,7 +7,7 @@ use crate::locking::{IggySharedMut, IggySharedMutFn}; use crate::messages::poll_messages::{PollingKind, PollingStrategy}; use crate::models::messages::{PolledMessage, PolledMessages}; use crate::utils::byte_size::IggyByteSize; -use crate::utils::crypto::Encryptor; +use crate::utils::crypto::EncryptorKind; use crate::utils::duration::IggyDuration; use crate::utils::timestamp::IggyTimestamp; use bytes::Bytes; @@ -81,7 +81,7 @@ pub struct IggyConsumer { current_offsets: Arc>, poll_future: Option, buffered_messages: VecDeque, - encryptor: Option>, + encryptor: Option>, store_offset_sender: flume::Sender<(u32, u64)>, store_offset_after_each_message: bool, store_offset_after_all_messages: bool, @@ -107,7 +107,7 @@ impl IggyConsumer { auto_commit: AutoCommit, auto_join_consumer_group: bool, create_consumer_group_if_not_exists: bool, - encryptor: Option>, + encryptor: Option>, retry_interval: IggyDuration, allow_replay: bool, ) -> Self { @@ -866,7 +866,7 @@ pub struct IggyConsumerBuilder { auto_commit: AutoCommit, auto_join_consumer_group: bool, create_consumer_group_if_not_exists: bool, - encryptor: Option>, + encryptor: Option>, retry_interval: IggyDuration, allow_replay: bool, } @@ -880,7 +880,7 @@ impl IggyConsumerBuilder { stream_id: Identifier, topic_id: Identifier, partition_id: Option, - encryptor: Option>, + encryptor: Option>, polling_interval: Option, ) -> Self { Self { @@ -990,7 +990,7 @@ impl IggyConsumerBuilder { } /// Sets the encryptor for decrypting the messages' payloads. - pub fn encryptor(self, encryptor: Arc) -> Self { + pub fn encryptor(self, encryptor: Arc) -> Self { Self { encryptor: Some(encryptor), ..self diff --git a/sdk/src/clients/producer.rs b/sdk/src/clients/producer.rs index b6bbdedbb..cf3961739 100644 --- a/sdk/src/clients/producer.rs +++ b/sdk/src/clients/producer.rs @@ -6,7 +6,7 @@ use crate::identifier::{IdKind, Identifier}; use crate::locking::{IggySharedMut, IggySharedMutFn}; use crate::messages::send_messages::{Message, Partitioning}; use crate::partitioner::Partitioner; -use crate::utils::crypto::Encryptor; +use crate::utils::crypto::EncryptorKind; use crate::utils::duration::IggyDuration; use crate::utils::expiry::IggyExpiry; use crate::utils::timestamp::IggyTimestamp; @@ -35,7 +35,7 @@ pub struct IggyProducer { topic_name: String, batch_size: Option, partitioning: Option>, - encryptor: Option>, + encryptor: Option>, partitioner: Option>, send_interval_micros: u64, create_stream_if_not_exists: bool, @@ -60,7 +60,7 @@ impl IggyProducer { topic_name: String, batch_size: Option, partitioning: Option, - encryptor: Option>, + encryptor: Option>, partitioner: Option>, interval: Option, create_stream_if_not_exists: bool, @@ -423,7 +423,7 @@ pub struct IggyProducerBuilder { topic_name: String, batch_size: Option, partitioning: Option, - encryptor: Option>, + encryptor: Option>, partitioner: Option>, send_interval: Option, create_stream_if_not_exists: bool, @@ -443,7 +443,7 @@ impl IggyProducerBuilder { stream_name: String, topic: Identifier, topic_name: String, - encryptor: Option>, + encryptor: Option>, partitioner: Option>, ) -> Self { Self { @@ -514,7 +514,7 @@ impl IggyProducerBuilder { } /// Sets the encryptor for encrypting the messages' payloads. - pub fn encryptor(self, encryptor: Arc) -> Self { + pub fn encryptor(self, encryptor: Arc) -> Self { Self { encryptor: Some(encryptor), ..self diff --git a/sdk/src/utils/crypto.rs b/sdk/src/utils/crypto.rs index 9acf35a8f..a6926c26c 100644 --- a/sdk/src/utils/crypto.rs +++ b/sdk/src/utils/crypto.rs @@ -5,7 +5,25 @@ use aes_gcm::aead::{Aead, OsRng}; use aes_gcm::{AeadCore, Aes256Gcm, KeyInit}; use std::fmt::Debug; -pub trait Encryptor: Send + Sync + Debug { +#[derive(Debug)] +pub enum EncryptorKind { + Aes256Gcm(Aes256GcmEncryptor), +} + +impl EncryptorKind { + pub fn encrypt(&self, data: &[u8]) -> Result, IggyError> { + match self { + EncryptorKind::Aes256Gcm(e) => e.encrypt(data), + } + } + pub fn decrypt(&self, data: &[u8]) -> Result, IggyError> { + match self { + EncryptorKind::Aes256Gcm(e) => e.decrypt(data), + } + } +} + +pub trait Encryptor { fn encrypt(&self, data: &[u8]) -> Result, IggyError>; fn decrypt(&self, data: &[u8]) -> Result, IggyError>; } @@ -14,9 +32,6 @@ pub struct Aes256GcmEncryptor { cipher: Aes256Gcm, } -unsafe impl Send for Aes256GcmEncryptor {} -unsafe impl Sync for Aes256GcmEncryptor {} - impl Debug for Aes256GcmEncryptor { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Encryptor").finish() diff --git a/server/Cargo.toml b/server/Cargo.toml index abd555adb..6bb970d33 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.112" +version = "0.4.120" edition = "2021" build = "src/build.rs" license = "Apache-2.0" @@ -90,6 +90,9 @@ uuid = { version = "1.11.0", features = ["v7", "fast-rng", "zerocopy"] } xxhash-rust = { version = "0.8.15", features = ["xxh32"] } zip = "2.2.2" +[dev-dependencies] +mockall = "0.13.1" + [target.'cfg(not(target_env = "msvc"))'.dependencies] mimalloc = { version = "0.1", optional = true } diff --git a/server/src/http/jwt/jwt_manager.rs b/server/src/http/jwt/jwt_manager.rs index 0b609a09e..382bbb5f1 100644 --- a/server/src/http/jwt/jwt_manager.rs +++ b/server/src/http/jwt/jwt_manager.rs @@ -2,7 +2,7 @@ use crate::configs::http::HttpJwtConfig; use crate::http::jwt::json_web_token::{GeneratedToken, JwtClaims, RevokedAccessToken}; use crate::http::jwt::storage::TokenStorage; use crate::http::jwt::COMPONENT; -use crate::streaming::persistence::persister::Persister; +use crate::streaming::persistence::persister::PersisterKind; use error_set::ErrContext; use iggy::error::IggyError; use iggy::locking::IggySharedMut; @@ -42,7 +42,7 @@ pub struct JwtManager { impl JwtManager { pub fn new( - persister: Arc, + persister: Arc, path: &str, issuer: IssuerOptions, validator: ValidatorOptions, @@ -64,7 +64,7 @@ impl JwtManager { } pub fn from_config( - persister: Arc, + persister: Arc, path: &str, config: &HttpJwtConfig, ) -> Result { diff --git a/server/src/http/jwt/storage.rs b/server/src/http/jwt/storage.rs index 15b0d43fd..6e4d88285 100644 --- a/server/src/http/jwt/storage.rs +++ b/server/src/http/jwt/storage.rs @@ -1,7 +1,8 @@ -use crate::http::jwt::json_web_token::RevokedAccessToken; use crate::http::jwt::COMPONENT; -use crate::streaming::persistence::persister::Persister; use crate::streaming::utils::file; +use crate::{ + http::jwt::json_web_token::RevokedAccessToken, streaming::persistence::persister::PersisterKind, +}; use anyhow::Context; use bytes::{BufMut, BytesMut}; use error_set::ErrContext; @@ -13,12 +14,12 @@ use tracing::{error, info}; #[derive(Debug)] pub struct TokenStorage { - persister: Arc, + persister: Arc, path: String, } impl TokenStorage { - pub fn new(persister: Arc, path: &str) -> Self { + pub fn new(persister: Arc, path: &str) -> Self { Self { persister, path: path.to_owned(), diff --git a/server/src/state/file.rs b/server/src/state/file.rs index 18c1a7b12..2c10f0108 100644 --- a/server/src/state/file.rs +++ b/server/src/state/file.rs @@ -1,6 +1,6 @@ use crate::state::command::EntryCommand; use crate::state::{State, StateEntry, COMPONENT}; -use crate::streaming::persistence::persister::Persister; +use crate::streaming::persistence::persister::PersisterKind; use crate::streaming::utils::file; use crate::versioning::SemanticVersion; use async_trait::async_trait; @@ -9,7 +9,7 @@ use error_set::ErrContext; use iggy::bytes_serializable::BytesSerializable; use iggy::error::IggyError; use iggy::utils::byte_size::IggyByteSize; -use iggy::utils::crypto::Encryptor; +use iggy::utils::crypto::EncryptorKind; use iggy::utils::timestamp::IggyTimestamp; use std::fmt::Debug; use std::path::Path; @@ -29,16 +29,16 @@ pub struct FileState { term: AtomicU64, version: u32, path: String, - persister: Arc, - encryptor: Option>, + persister: Arc, + encryptor: Option>, } impl FileState { pub fn new( path: &str, version: &SemanticVersion, - persister: Arc, - encryptor: Option>, + persister: Arc, + encryptor: Option>, ) -> Self { Self { current_index: AtomicU64::new(0), diff --git a/server/src/streaming/partitions/storage.rs b/server/src/streaming/partitions/storage.rs index 2e59e6fc3..30152c84d 100644 --- a/server/src/streaming/partitions/storage.rs +++ b/server/src/streaming/partitions/storage.rs @@ -3,7 +3,7 @@ use crate::state::system::PartitionState; use crate::streaming::batching::batch_accumulator::BatchAccumulator; use crate::streaming::partitions::partition::{ConsumerOffset, Partition}; use crate::streaming::partitions::COMPONENT; -use crate::streaming::persistence::persister::Persister; +use crate::streaming::persistence::persister::PersisterKind; use crate::streaming::segments::segment::{Segment, INDEX_EXTENSION, LOG_EXTENSION}; use crate::streaming::storage::PartitionStorage; use crate::streaming::utils::file; @@ -22,11 +22,11 @@ use tracing::{error, info, trace, warn}; #[derive(Debug)] pub struct FilePartitionStorage { - persister: Arc, + persister: Arc, } impl FilePartitionStorage { - pub fn new(persister: Arc) -> Self { + pub fn new(persister: Arc) -> Self { Self { persister } } } diff --git a/server/src/streaming/persistence/persister.rs b/server/src/streaming/persistence/persister.rs index 28ea01638..be94ef97f 100644 --- a/server/src/streaming/persistence/persister.rs +++ b/server/src/streaming/persistence/persister.rs @@ -7,33 +7,60 @@ use std::fmt::Debug; use tokio::fs; use tokio::io::AsyncWriteExt; +#[cfg(test)] +use mockall::automock; + +#[derive(Debug)] +pub enum PersisterKind { + File(FilePersister), + FileWithSync(FileWithSyncPersister), + #[cfg(test)] + Mock(MockPersister), +} + +impl PersisterKind { + pub async fn append(&self, path: &str, bytes: &[u8]) -> Result<(), IggyError> { + match self { + PersisterKind::File(p) => p.append(path, bytes).await, + PersisterKind::FileWithSync(p) => p.append(path, bytes).await, + #[cfg(test)] + PersisterKind::Mock(p) => p.append(path, bytes).await, + } + } + + pub async fn overwrite(&self, path: &str, bytes: &[u8]) -> Result<(), IggyError> { + match self { + PersisterKind::File(p) => p.overwrite(path, bytes).await, + PersisterKind::FileWithSync(p) => p.overwrite(path, bytes).await, + #[cfg(test)] + PersisterKind::Mock(p) => p.overwrite(path, bytes).await, + } + } + + pub async fn delete(&self, path: &str) -> Result<(), IggyError> { + match self { + PersisterKind::File(p) => p.delete(path).await, + PersisterKind::FileWithSync(p) => p.delete(path).await, + #[cfg(test)] + PersisterKind::Mock(p) => p.delete(path).await, + } + } +} + #[async_trait] -pub trait Persister: Sync + Send { +#[cfg_attr(test, automock)] +pub trait Persister { async fn append(&self, path: &str, bytes: &[u8]) -> Result<(), IggyError>; async fn overwrite(&self, path: &str, bytes: &[u8]) -> Result<(), IggyError>; async fn delete(&self, path: &str) -> Result<(), IggyError>; } -impl Debug for dyn Persister { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Persister") - .field("type", &"Persister") - .finish() - } -} - #[derive(Debug)] pub struct FilePersister; #[derive(Debug)] pub struct FileWithSyncPersister; -unsafe impl Send for FilePersister {} -unsafe impl Sync for FilePersister {} - -unsafe impl Send for FileWithSyncPersister {} -unsafe impl Sync for FileWithSyncPersister {} - #[async_trait] impl Persister for FilePersister { async fn append(&self, path: &str, bytes: &[u8]) -> Result<(), IggyError> { diff --git a/server/src/streaming/persistence/task.rs b/server/src/streaming/persistence/task.rs index 6acc4c8a9..7bddb9e77 100644 --- a/server/src/streaming/persistence/task.rs +++ b/server/src/streaming/persistence/task.rs @@ -1,4 +1,3 @@ -use crate::streaming::persistence::persister::Persister; use crate::streaming::persistence::COMPONENT; use bytes::Bytes; use error_set::ErrContext; @@ -8,6 +7,8 @@ use std::{sync::Arc, time::Duration}; use tokio::task; use tracing::error; +use super::persister::PersisterKind; + #[derive(Debug)] pub struct LogPersisterTask { _sender: Option>, @@ -17,7 +18,7 @@ pub struct LogPersisterTask { impl LogPersisterTask { pub fn new( path: String, - persister: Arc, + persister: Arc, max_retries: u32, retry_sleep: Duration, ) -> Self { @@ -55,7 +56,7 @@ impl LogPersisterTask { async fn persist_with_retries( path: &str, - persister: &Arc, + persister: &Arc, data: Bytes, max_retries: u32, retry_sleep: Duration, diff --git a/server/src/streaming/segments/storage.rs b/server/src/streaming/segments/storage.rs index 79ab7fdb0..56feb02d3 100644 --- a/server/src/streaming/segments/storage.rs +++ b/server/src/streaming/segments/storage.rs @@ -1,7 +1,7 @@ use crate::streaming::batching::iterator::IntoMessagesIterator; use crate::streaming::batching::message_batch::RetainedMessageBatch; use crate::streaming::models::messages::RetainedMessage; -use crate::streaming::persistence::persister::Persister; +use crate::streaming::persistence::persister::PersisterKind; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; use crate::streaming::segments::COMPONENT; @@ -29,11 +29,11 @@ const BUF_READER_CAPACITY_BYTES: usize = 512 * 1000; #[derive(Debug)] pub struct FileSegmentStorage { - persister: Arc, + persister: Arc, } impl FileSegmentStorage { - pub fn new(persister: Arc) -> Self { + pub fn new(persister: Arc) -> Self { Self { persister } } } @@ -594,7 +594,7 @@ impl SegmentStorage for FileSegmentStorage { } } - fn persister(&self) -> Arc { + fn persister(&self) -> Arc { self.persister.clone() } } diff --git a/server/src/streaming/storage.rs b/server/src/streaming/storage.rs index 80674cf03..af6593136 100644 --- a/server/src/streaming/storage.rs +++ b/server/src/streaming/storage.rs @@ -1,9 +1,9 @@ use super::batching::message_batch::RetainedMessageBatch; +use super::persistence::persister::PersisterKind; use crate::configs::system::SystemConfig; use crate::state::system::{PartitionState, StreamState, TopicState}; use crate::streaming::partitions::partition::{ConsumerOffset, Partition}; use crate::streaming::partitions::storage::FilePartitionStorage; -use crate::streaming::persistence::persister::Persister; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; use crate::streaming::segments::storage::FileSegmentStorage; @@ -93,7 +93,7 @@ pub trait SegmentStorage: Send + Sync { segment: &Segment, timestamp: u64, ) -> Result, IggyError>; - fn persister(&self) -> Arc; + fn persister(&self) -> Arc; } #[derive(Debug)] @@ -103,11 +103,11 @@ pub struct SystemStorage { pub topic: Arc, pub partition: Arc, pub segment: Arc, - pub persister: Arc, + pub persister: Arc, } impl SystemStorage { - pub fn new(config: Arc, persister: Arc) -> Self { + pub fn new(config: Arc, persister: Arc) -> Self { Self { info: Arc::new(FileSystemInfoStorage::new( config.get_state_info_path(), @@ -155,6 +155,7 @@ impl Debug for dyn SegmentStorage { #[cfg(test)] pub(crate) mod tests { use crate::streaming::partitions::partition::Partition; + use crate::streaming::persistence::persister::MockPersister; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; use crate::streaming::storage::*; @@ -163,28 +164,12 @@ pub(crate) mod tests { use async_trait::async_trait; use std::sync::Arc; - struct TestPersister {} struct TestSystemInfoStorage {} struct TestStreamStorage {} struct TestTopicStorage {} struct TestPartitionStorage {} struct TestSegmentStorage { - persister: Arc, - } - - #[async_trait] - impl Persister for TestPersister { - async fn append(&self, _path: &str, _bytes: &[u8]) -> Result<(), IggyError> { - Ok(()) - } - - async fn overwrite(&self, _path: &str, _bytes: &[u8]) -> Result<(), IggyError> { - Ok(()) - } - - async fn delete(&self, _path: &str) -> Result<(), IggyError> { - Ok(()) - } + persister: Arc, } #[async_trait] @@ -339,13 +324,13 @@ pub(crate) mod tests { Ok(None) } - fn persister(&self) -> Arc { + fn persister(&self) -> Arc { self.persister.clone() } } pub fn get_test_system_storage() -> SystemStorage { - let persister = Arc::new(TestPersister {}); + let persister = Arc::new(PersisterKind::Mock(MockPersister::new())); SystemStorage { info: Arc::new(TestSystemInfoStorage {}), stream: Arc::new(TestStreamStorage {}), diff --git a/server/src/streaming/systems/storage.rs b/server/src/streaming/systems/storage.rs index 0efc1863e..75e3705ac 100644 --- a/server/src/streaming/systems/storage.rs +++ b/server/src/streaming/systems/storage.rs @@ -1,4 +1,4 @@ -use crate::streaming::persistence::persister::Persister; +use crate::streaming::persistence::persister::PersisterKind; use crate::streaming::storage::SystemInfoStorage; use crate::streaming::systems::info::SystemInfo; use crate::streaming::systems::COMPONENT; @@ -14,12 +14,12 @@ use tracing::info; #[derive(Debug)] pub struct FileSystemInfoStorage { - persister: Arc, + persister: Arc, path: String, } impl FileSystemInfoStorage { - pub fn new(path: String, persister: Arc) -> Self { + pub fn new(path: String, persister: Arc) -> Self { Self { path, persister } } } diff --git a/server/src/streaming/systems/system.rs b/server/src/streaming/systems/system.rs index 4f4086fb7..3e6543c38 100644 --- a/server/src/streaming/systems/system.rs +++ b/server/src/streaming/systems/system.rs @@ -12,7 +12,7 @@ use crate::streaming::users::permissioner::Permissioner; use error_set::ErrContext; use iggy::error::IggyError; use iggy::utils::byte_size::IggyByteSize; -use iggy::utils::crypto::{Aes256GcmEncryptor, Encryptor}; +use iggy::utils::crypto::{Aes256GcmEncryptor, EncryptorKind}; use std::collections::HashMap; use std::path::Path; use std::sync::Arc; @@ -72,7 +72,7 @@ pub struct System { pub(crate) users: HashMap, pub(crate) config: Arc, pub(crate) client_manager: IggySharedMut, - pub(crate) encryptor: Option>, + pub(crate) encryptor: Option>, pub(crate) metrics: Metrics, pub(crate) state: Arc, pub(crate) archiver: Option>, @@ -95,10 +95,10 @@ impl System { map_toggle_str(config.encryption.enabled) ); - let encryptor: Option> = match config.encryption.enabled { - true => Some(Arc::new( + let encryptor: Option> = match config.encryption.enabled { + true => Some(Arc::new(EncryptorKind::Aes256Gcm( Aes256GcmEncryptor::from_base64_key(&config.encryption.key).unwrap(), - )), + ))), false => None, }; @@ -121,10 +121,10 @@ impl System { ) } - fn resolve_persister(enforce_fsync: bool) -> Arc { + fn resolve_persister(enforce_fsync: bool) -> Arc { match enforce_fsync { - true => Arc::new(FileWithSyncPersister), - false => Arc::new(FilePersister), + true => Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister)), + false => Arc::new(PersisterKind::File(FilePersister)), } } @@ -132,7 +132,7 @@ impl System { system_config: Arc, storage: SystemStorage, state: Arc, - encryptor: Option>, + encryptor: Option>, data_maintenance_config: DataMaintenanceConfig, pat_config: PersonalAccessTokenConfig, ) -> System { diff --git a/tools/src/data-seeder/main.rs b/tools/src/data-seeder/main.rs index 777922032..fba6b03fa 100644 --- a/tools/src/data-seeder/main.rs +++ b/tools/src/data-seeder/main.rs @@ -7,7 +7,7 @@ use iggy::client::{Client, UserClient}; use iggy::client_provider; use iggy::client_provider::ClientProviderConfig; use iggy::clients::client::IggyClient; -use iggy::utils::crypto::{Aes256GcmEncryptor, Encryptor}; +use iggy::utils::crypto::{Aes256GcmEncryptor, EncryptorKind}; use std::error::Error; use std::sync::Arc; use tracing::info; @@ -37,11 +37,11 @@ async fn main() -> Result<(), Box> { .with(tracing_subscriber::fmt::layer()) .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) .init(); - let encryptor: Option> = match iggy_args.encryption_key.is_empty() { + let encryptor: Option> = match iggy_args.encryption_key.is_empty() { true => None, - false => Some(Arc::new( + false => Some(Arc::new(EncryptorKind::Aes256Gcm( Aes256GcmEncryptor::from_base64_key(&iggy_args.encryption_key).unwrap(), - )), + ))), }; info!("Selected transport: {}", iggy_args.transport); let username = args.username.clone();