Skip to content

Commit

Permalink
Refactor: Replace dyn Trait with Enum for Encryptor and Persister Types
Browse files Browse the repository at this point in the history
  • Loading branch information
haze518 committed Jan 16, 2025
1 parent 71928b5 commit 6b1341d
Show file tree
Hide file tree
Showing 21 changed files with 183 additions and 111 deletions.
39 changes: 39 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use iggy::cli::{
use iggy::cli_command::{CliCommand, PRINT_TARGET};
use iggy::client_provider::{self, ClientProviderConfig};
use iggy::clients::client::IggyClient;
use iggy::utils::crypto::{Aes256GcmEncryptor, Encryptor};
use iggy::utils::crypto::{Aes256GcmEncryptor, EncryptorKind};
use iggy::utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
use std::sync::Arc;
use tracing::{event, Level};
Expand Down Expand Up @@ -339,11 +339,11 @@ async fn main() -> Result<(), IggyCmdError> {
// Create credentials based on command line arguments and command
let mut credentials = IggyCredentials::new(&cli_options, &iggy_args, command.login_required())?;

let encryptor: Option<Arc<dyn Encryptor>> = match iggy_args.encryption_key.is_empty() {
let encryptor = match iggy_args.encryption_key.is_empty() {
true => None,
false => Some(Arc::new(
false => Some(Arc::new(EncryptorKind::Aes(
Aes256GcmEncryptor::from_base64_key(&iggy_args.encryption_key).unwrap(),
)),
))),
};
let client_provider_config = Arc::new(ClientProviderConfig::from_args_set_autologin(
iggy_args.clone(),
Expand Down
12 changes: 5 additions & 7 deletions integration/tests/state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use iggy::utils::crypto::{Aes256GcmEncryptor, Encryptor};
use iggy::utils::crypto::{Aes256GcmEncryptor, EncryptorKind};
use server::state::file::FileState;
use server::streaming::persistence::persister::FilePersister;
use server::streaming::persistence::persister::{FilePersister, PersisterKind};
use server::versioning::SemanticVersion;
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -31,11 +31,9 @@ impl StateSetup {
create_dir(&directory_path).await.unwrap();

let version = SemanticVersion::from_str("1.2.3").unwrap();
let persister = FilePersister {};
let encryptor: Option<Arc<dyn Encryptor>> = match encryption_key {
Some(key) => Some(Arc::new(Aes256GcmEncryptor::new(key).unwrap())),
None => None,
};
let persister = PersisterKind::File(FilePersister {});
let encryptor = encryption_key
.map(|key| Arc::new(EncryptorKind::Aes(Aes256GcmEncryptor::new(key).unwrap())));
let state = FileState::new(&log_path, &version, Arc::new(persister), encryptor);

Self {
Expand Down
4 changes: 2 additions & 2 deletions integration/tests/streaming/common/test_setup.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use server::configs::system::SystemConfig;
use server::streaming::persistence::persister::FilePersister;
use server::streaming::persistence::persister::{FilePersister, PersisterKind};
use server::streaming::storage::SystemStorage;
use std::sync::Arc;
use tokio::fs;
Expand All @@ -20,7 +20,7 @@ impl TestSetup {

let config = Arc::new(config);
fs::create_dir(config.get_system_path()).await.unwrap();
let persister = FilePersister {};
let persister = PersisterKind::File(FilePersister {});
let storage = Arc::new(SystemStorage::new(config.clone(), Arc::new(persister)));
TestSetup { config, storage }
}
Expand Down
6 changes: 3 additions & 3 deletions sdk/src/clients/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::quic::client::QuicClient;
use crate::quic::config::QuicClientConfigBuilder;
use crate::tcp::client::TcpClient;
use crate::tcp::config::TcpClientConfigBuilder;
use crate::utils::crypto::Encryptor;
use crate::utils::crypto::EncryptorKind;
use crate::utils::duration::IggyDuration;
use std::sync::Arc;
use tracing::error;
Expand All @@ -18,7 +18,7 @@ use tracing::error;
pub struct IggyClientBuilder {
client: Option<Box<dyn Client>>,
partitioner: Option<Arc<dyn Partitioner>>,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
}

impl IggyClientBuilder {
Expand Down Expand Up @@ -49,7 +49,7 @@ impl IggyClientBuilder {
}

/// Use the custom encryptor implementation.
pub fn with_encryptor(mut self, encryptor: Arc<dyn Encryptor>) -> Self {
pub fn with_encryptor(mut self, encryptor: Arc<EncryptorKind>) -> Self {
self.encryptor = Some(encryptor);
self
}
Expand Down
6 changes: 3 additions & 3 deletions sdk/src/clients/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::partitioner::Partitioner;
use crate::snapshot::{SnapshotCompression, SystemSnapshotType};
use crate::tcp::client::TcpClient;
use crate::utils::byte_size::IggyByteSize;
use crate::utils::crypto::Encryptor;
use crate::utils::crypto::EncryptorKind;
use crate::utils::duration::IggyDuration;
use crate::utils::expiry::IggyExpiry;
use crate::utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
Expand All @@ -55,7 +55,7 @@ use tracing::{debug, error, info};
pub struct IggyClient {
client: IggySharedMut<Box<dyn Client>>,
partitioner: Option<Arc<dyn Partitioner>>,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
}

impl Default for IggyClient {
Expand Down Expand Up @@ -96,7 +96,7 @@ impl IggyClient {
pub fn create(
client: Box<dyn Client>,
partitioner: Option<Arc<dyn Partitioner>>,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
) -> Self {
if partitioner.is_some() {
info!("Partitioner is enabled.");
Expand Down
12 changes: 6 additions & 6 deletions sdk/src/clients/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::locking::{IggySharedMut, IggySharedMutFn};
use crate::messages::poll_messages::{PollingKind, PollingStrategy};
use crate::models::messages::{PolledMessage, PolledMessages};
use crate::utils::byte_size::IggyByteSize;
use crate::utils::crypto::Encryptor;
use crate::utils::crypto::EncryptorKind;
use crate::utils::duration::IggyDuration;
use crate::utils::timestamp::IggyTimestamp;
use bytes::Bytes;
Expand Down Expand Up @@ -81,7 +81,7 @@ pub struct IggyConsumer {
current_offsets: Arc<DashMap<u32, AtomicU64>>,
poll_future: Option<PollMessagesFuture>,
buffered_messages: VecDeque<PolledMessage>,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
store_offset_sender: flume::Sender<(u32, u64)>,
store_offset_after_each_message: bool,
store_offset_after_all_messages: bool,
Expand All @@ -106,7 +106,7 @@ impl IggyConsumer {
auto_commit: AutoCommit,
auto_join_consumer_group: bool,
create_consumer_group_if_not_exists: bool,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
retry_interval: IggyDuration,
) -> Self {
let (store_offset_sender, _) = flume::unbounded();
Expand Down Expand Up @@ -842,7 +842,7 @@ pub struct IggyConsumerBuilder {
auto_commit: AutoCommit,
auto_join_consumer_group: bool,
create_consumer_group_if_not_exists: bool,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
retry_interval: IggyDuration,
}

Expand All @@ -855,7 +855,7 @@ impl IggyConsumerBuilder {
stream_id: Identifier,
topic_id: Identifier,
partition_id: Option<u32>,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
polling_interval: Option<IggyDuration>,
) -> Self {
Self {
Expand Down Expand Up @@ -964,7 +964,7 @@ impl IggyConsumerBuilder {
}

/// Sets the encryptor for decrypting the messages' payloads.
pub fn encryptor(self, encryptor: Arc<dyn Encryptor>) -> Self {
pub fn encryptor(self, encryptor: Arc<EncryptorKind>) -> Self {
Self {
encryptor: Some(encryptor),
..self
Expand Down
12 changes: 6 additions & 6 deletions sdk/src/clients/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::identifier::{IdKind, Identifier};
use crate::locking::{IggySharedMut, IggySharedMutFn};
use crate::messages::send_messages::{Message, Partitioning};
use crate::partitioner::Partitioner;
use crate::utils::crypto::Encryptor;
use crate::utils::crypto::EncryptorKind;
use crate::utils::duration::IggyDuration;
use crate::utils::expiry::IggyExpiry;
use crate::utils::timestamp::IggyTimestamp;
Expand Down Expand Up @@ -35,7 +35,7 @@ pub struct IggyProducer {
topic_name: String,
batch_size: Option<usize>,
partitioning: Option<Arc<Partitioning>>,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
partitioner: Option<Arc<dyn Partitioner>>,
send_interval_micros: u64,
create_stream_if_not_exists: bool,
Expand All @@ -60,7 +60,7 @@ impl IggyProducer {
topic_name: String,
batch_size: Option<usize>,
partitioning: Option<Partitioning>,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
partitioner: Option<Arc<dyn Partitioner>>,
interval: Option<IggyDuration>,
create_stream_if_not_exists: bool,
Expand Down Expand Up @@ -423,7 +423,7 @@ pub struct IggyProducerBuilder {
topic_name: String,
batch_size: Option<usize>,
partitioning: Option<Partitioning>,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
partitioner: Option<Arc<dyn Partitioner>>,
send_interval: Option<IggyDuration>,
create_stream_if_not_exists: bool,
Expand All @@ -443,7 +443,7 @@ impl IggyProducerBuilder {
stream_name: String,
topic: Identifier,
topic_name: String,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
partitioner: Option<Arc<dyn Partitioner>>,
) -> Self {
Self {
Expand Down Expand Up @@ -514,7 +514,7 @@ impl IggyProducerBuilder {
}

/// Sets the encryptor for encrypting the messages' payloads.
pub fn encryptor(self, encryptor: Arc<dyn Encryptor>) -> Self {
pub fn encryptor(self, encryptor: Arc<EncryptorKind>) -> Self {
Self {
encryptor: Some(encryptor),
..self
Expand Down
24 changes: 21 additions & 3 deletions sdk/src/utils/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,25 @@ use aes_gcm::aead::{Aead, OsRng};
use aes_gcm::{AeadCore, Aes256Gcm, KeyInit};
use std::fmt::Debug;

pub trait Encryptor: Send + Sync + Debug {
#[derive(Debug)]
pub enum EncryptorKind {
Aes(Aes256GcmEncryptor),
}

impl EncryptorKind {
pub fn encrypt(&self, data: &[u8]) -> Result<Vec<u8>, IggyError> {
match self {
EncryptorKind::Aes(e) => e.encrypt(data),
}
}
pub fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>, IggyError> {
match self {
EncryptorKind::Aes(e) => e.decrypt(data),
}
}
}

pub trait Encryptor {
fn encrypt(&self, data: &[u8]) -> Result<Vec<u8>, IggyError>;
fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>, IggyError>;
}
Expand All @@ -14,8 +32,8 @@ pub struct Aes256GcmEncryptor {
cipher: Aes256Gcm,
}

unsafe impl Send for Aes256GcmEncryptor {}
unsafe impl Sync for Aes256GcmEncryptor {}
// unsafe impl Send for Aes256GcmEncryptor {}
// unsafe impl Sync for Aes256GcmEncryptor {}

impl Debug for Aes256GcmEncryptor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand Down
3 changes: 3 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ uuid = { version = "1.11.0", features = ["v7", "fast-rng", "zerocopy"] }
xxhash-rust = { version = "0.8.15", features = ["xxh32"] }
zip = "2.2.2"

[dev-dependencies]
mockall = "0.13.1"

[target.'cfg(not(target_env = "msvc"))'.dependencies]
mimalloc = { version = "0.1", optional = true }

Expand Down
6 changes: 3 additions & 3 deletions server/src/http/jwt/jwt_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::configs::http::HttpJwtConfig;
use crate::http::jwt::json_web_token::{GeneratedToken, JwtClaims, RevokedAccessToken};
use crate::http::jwt::storage::TokenStorage;
use crate::http::jwt::COMPONENT;
use crate::streaming::persistence::persister::Persister;
use crate::streaming::persistence::persister::PersisterKind;
use error_set::ErrContext;
use iggy::error::IggyError;
use iggy::locking::IggySharedMut;
Expand Down Expand Up @@ -42,7 +42,7 @@ pub struct JwtManager {

impl JwtManager {
pub fn new(
persister: Arc<dyn Persister>,
persister: Arc<PersisterKind>,
path: &str,
issuer: IssuerOptions,
validator: ValidatorOptions,
Expand All @@ -64,7 +64,7 @@ impl JwtManager {
}

pub fn from_config(
persister: Arc<dyn Persister>,
persister: Arc<PersisterKind>,
path: &str,
config: &HttpJwtConfig,
) -> Result<Self, IggyError> {
Expand Down
9 changes: 5 additions & 4 deletions server/src/http/jwt/storage.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::http::jwt::json_web_token::RevokedAccessToken;
use crate::http::jwt::COMPONENT;
use crate::streaming::persistence::persister::Persister;
use crate::streaming::utils::file;
use crate::{
http::jwt::json_web_token::RevokedAccessToken, streaming::persistence::persister::PersisterKind,
};
use anyhow::Context;
use bytes::{BufMut, BytesMut};
use error_set::ErrContext;
Expand All @@ -13,12 +14,12 @@ use tracing::{error, info};

#[derive(Debug)]
pub struct TokenStorage {
persister: Arc<dyn Persister>,
persister: Arc<PersisterKind>,
path: String,
}

impl TokenStorage {
pub fn new(persister: Arc<dyn Persister>, path: &str) -> Self {
pub fn new(persister: Arc<PersisterKind>, path: &str) -> Self {
Self {
persister,
path: path.to_owned(),
Expand Down
Loading

0 comments on commit 6b1341d

Please sign in to comment.