Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Replace dyn Trait with Enum for Encryptor and Persister Types #1442

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use iggy::cli::{
use iggy::cli_command::{CliCommand, PRINT_TARGET};
use iggy::client_provider::{self, ClientProviderConfig};
use iggy::clients::client::IggyClient;
use iggy::utils::crypto::{Aes256GcmEncryptor, Encryptor};
use iggy::utils::crypto::{Aes256GcmEncryptor, EncryptorKind};
use iggy::utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
use std::sync::Arc;
use tracing::{event, Level};
Expand Down Expand Up @@ -339,11 +339,11 @@ async fn main() -> Result<(), IggyCmdError> {
// Create credentials based on command line arguments and command
let mut credentials = IggyCredentials::new(&cli_options, &iggy_args, command.login_required())?;

let encryptor: Option<Arc<dyn Encryptor>> = match iggy_args.encryption_key.is_empty() {
let encryptor = match iggy_args.encryption_key.is_empty() {
true => None,
false => Some(Arc::new(
false => Some(Arc::new(EncryptorKind::Aes256Gcm(
Aes256GcmEncryptor::from_base64_key(&iggy_args.encryption_key).unwrap(),
)),
))),
};
let client_provider_config = Arc::new(ClientProviderConfig::from_args_set_autologin(
iggy_args.clone(),
Expand Down
15 changes: 8 additions & 7 deletions integration/tests/state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use iggy::utils::crypto::{Aes256GcmEncryptor, Encryptor};
use iggy::utils::crypto::{Aes256GcmEncryptor, EncryptorKind};
use server::state::file::FileState;
use server::streaming::persistence::persister::FilePersister;
use server::streaming::persistence::persister::{FilePersister, PersisterKind};
use server::versioning::SemanticVersion;
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -31,11 +31,12 @@ impl StateSetup {
create_dir(&directory_path).await.unwrap();

let version = SemanticVersion::from_str("1.2.3").unwrap();
let persister = FilePersister {};
let encryptor: Option<Arc<dyn Encryptor>> = match encryption_key {
Some(key) => Some(Arc::new(Aes256GcmEncryptor::new(key).unwrap())),
None => None,
};
let persister = PersisterKind::File(FilePersister {});
let encryptor = encryption_key.map(|key| {
Arc::new(EncryptorKind::Aes256Gcm(
Aes256GcmEncryptor::new(key).unwrap(),
))
});
let state = FileState::new(&log_path, &version, Arc::new(persister), encryptor);

Self {
Expand Down
4 changes: 2 additions & 2 deletions integration/tests/streaming/common/test_setup.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use server::configs::system::SystemConfig;
use server::streaming::persistence::persister::FilePersister;
use server::streaming::persistence::persister::{FilePersister, PersisterKind};
use server::streaming::storage::SystemStorage;
use std::sync::Arc;
use tokio::fs;
Expand All @@ -20,7 +20,7 @@ impl TestSetup {

let config = Arc::new(config);
fs::create_dir(config.get_system_path()).await.unwrap();
let persister = FilePersister {};
let persister = PersisterKind::File(FilePersister {});
let storage = Arc::new(SystemStorage::new(config.clone(), Arc::new(persister)));
TestSetup { config, storage }
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.71"
version = "0.6.80"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "Apache-2.0"
Expand Down
6 changes: 3 additions & 3 deletions sdk/src/clients/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::quic::client::QuicClient;
use crate::quic::config::QuicClientConfigBuilder;
use crate::tcp::client::TcpClient;
use crate::tcp::config::TcpClientConfigBuilder;
use crate::utils::crypto::Encryptor;
use crate::utils::crypto::EncryptorKind;
use crate::utils::duration::IggyDuration;
use std::sync::Arc;
use tracing::error;
Expand All @@ -18,7 +18,7 @@ use tracing::error;
pub struct IggyClientBuilder {
client: Option<Box<dyn Client>>,
partitioner: Option<Arc<dyn Partitioner>>,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
}

impl IggyClientBuilder {
Expand Down Expand Up @@ -49,7 +49,7 @@ impl IggyClientBuilder {
}

/// Use the custom encryptor implementation.
pub fn with_encryptor(mut self, encryptor: Arc<dyn Encryptor>) -> Self {
pub fn with_encryptor(mut self, encryptor: Arc<EncryptorKind>) -> Self {
self.encryptor = Some(encryptor);
self
}
Expand Down
6 changes: 3 additions & 3 deletions sdk/src/clients/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::partitioner::Partitioner;
use crate::snapshot::{SnapshotCompression, SystemSnapshotType};
use crate::tcp::client::TcpClient;
use crate::utils::byte_size::IggyByteSize;
use crate::utils::crypto::Encryptor;
use crate::utils::crypto::EncryptorKind;
use crate::utils::duration::IggyDuration;
use crate::utils::expiry::IggyExpiry;
use crate::utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
Expand All @@ -55,7 +55,7 @@ use tracing::{debug, error, info};
pub struct IggyClient {
client: IggySharedMut<Box<dyn Client>>,
partitioner: Option<Arc<dyn Partitioner>>,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
}

impl Default for IggyClient {
Expand Down Expand Up @@ -96,7 +96,7 @@ impl IggyClient {
pub fn create(
client: Box<dyn Client>,
partitioner: Option<Arc<dyn Partitioner>>,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
) -> Self {
if partitioner.is_some() {
info!("Partitioner is enabled.");
Expand Down
12 changes: 6 additions & 6 deletions sdk/src/clients/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::locking::{IggySharedMut, IggySharedMutFn};
use crate::messages::poll_messages::{PollingKind, PollingStrategy};
use crate::models::messages::{PolledMessage, PolledMessages};
use crate::utils::byte_size::IggyByteSize;
use crate::utils::crypto::Encryptor;
use crate::utils::crypto::EncryptorKind;
use crate::utils::duration::IggyDuration;
use crate::utils::timestamp::IggyTimestamp;
use bytes::Bytes;
Expand Down Expand Up @@ -81,7 +81,7 @@ pub struct IggyConsumer {
current_offsets: Arc<DashMap<u32, AtomicU64>>,
poll_future: Option<PollMessagesFuture>,
buffered_messages: VecDeque<PolledMessage>,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
store_offset_sender: flume::Sender<(u32, u64)>,
store_offset_after_each_message: bool,
store_offset_after_all_messages: bool,
Expand All @@ -107,7 +107,7 @@ impl IggyConsumer {
auto_commit: AutoCommit,
auto_join_consumer_group: bool,
create_consumer_group_if_not_exists: bool,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
retry_interval: IggyDuration,
allow_replay: bool,
) -> Self {
Expand Down Expand Up @@ -866,7 +866,7 @@ pub struct IggyConsumerBuilder {
auto_commit: AutoCommit,
auto_join_consumer_group: bool,
create_consumer_group_if_not_exists: bool,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
retry_interval: IggyDuration,
allow_replay: bool,
}
Expand All @@ -880,7 +880,7 @@ impl IggyConsumerBuilder {
stream_id: Identifier,
topic_id: Identifier,
partition_id: Option<u32>,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
polling_interval: Option<IggyDuration>,
) -> Self {
Self {
Expand Down Expand Up @@ -990,7 +990,7 @@ impl IggyConsumerBuilder {
}

/// Sets the encryptor for decrypting the messages' payloads.
pub fn encryptor(self, encryptor: Arc<dyn Encryptor>) -> Self {
pub fn encryptor(self, encryptor: Arc<EncryptorKind>) -> Self {
Self {
encryptor: Some(encryptor),
..self
Expand Down
12 changes: 6 additions & 6 deletions sdk/src/clients/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::identifier::{IdKind, Identifier};
use crate::locking::{IggySharedMut, IggySharedMutFn};
use crate::messages::send_messages::{Message, Partitioning};
use crate::partitioner::Partitioner;
use crate::utils::crypto::Encryptor;
use crate::utils::crypto::EncryptorKind;
use crate::utils::duration::IggyDuration;
use crate::utils::expiry::IggyExpiry;
use crate::utils::timestamp::IggyTimestamp;
Expand Down Expand Up @@ -35,7 +35,7 @@ pub struct IggyProducer {
topic_name: String,
batch_size: Option<usize>,
partitioning: Option<Arc<Partitioning>>,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
partitioner: Option<Arc<dyn Partitioner>>,
send_interval_micros: u64,
create_stream_if_not_exists: bool,
Expand All @@ -60,7 +60,7 @@ impl IggyProducer {
topic_name: String,
batch_size: Option<usize>,
partitioning: Option<Partitioning>,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
partitioner: Option<Arc<dyn Partitioner>>,
interval: Option<IggyDuration>,
create_stream_if_not_exists: bool,
Expand Down Expand Up @@ -423,7 +423,7 @@ pub struct IggyProducerBuilder {
topic_name: String,
batch_size: Option<usize>,
partitioning: Option<Partitioning>,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
partitioner: Option<Arc<dyn Partitioner>>,
send_interval: Option<IggyDuration>,
create_stream_if_not_exists: bool,
Expand All @@ -443,7 +443,7 @@ impl IggyProducerBuilder {
stream_name: String,
topic: Identifier,
topic_name: String,
encryptor: Option<Arc<dyn Encryptor>>,
encryptor: Option<Arc<EncryptorKind>>,
partitioner: Option<Arc<dyn Partitioner>>,
) -> Self {
Self {
Expand Down Expand Up @@ -514,7 +514,7 @@ impl IggyProducerBuilder {
}

/// Sets the encryptor for encrypting the messages' payloads.
pub fn encryptor(self, encryptor: Arc<dyn Encryptor>) -> Self {
pub fn encryptor(self, encryptor: Arc<EncryptorKind>) -> Self {
Self {
encryptor: Some(encryptor),
..self
Expand Down
23 changes: 19 additions & 4 deletions sdk/src/utils/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,25 @@ use aes_gcm::aead::{Aead, OsRng};
use aes_gcm::{AeadCore, Aes256Gcm, KeyInit};
use std::fmt::Debug;

pub trait Encryptor: Send + Sync + Debug {
#[derive(Debug)]
pub enum EncryptorKind {
Aes256Gcm(Aes256GcmEncryptor),
}

impl EncryptorKind {
pub fn encrypt(&self, data: &[u8]) -> Result<Vec<u8>, IggyError> {
match self {
EncryptorKind::Aes256Gcm(e) => e.encrypt(data),
}
}
pub fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>, IggyError> {
match self {
EncryptorKind::Aes256Gcm(e) => e.decrypt(data),
}
}
}

pub trait Encryptor {
fn encrypt(&self, data: &[u8]) -> Result<Vec<u8>, IggyError>;
fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>, IggyError>;
}
Expand All @@ -14,9 +32,6 @@ pub struct Aes256GcmEncryptor {
cipher: Aes256Gcm,
}

unsafe impl Send for Aes256GcmEncryptor {}
unsafe impl Sync for Aes256GcmEncryptor {}

impl Debug for Aes256GcmEncryptor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Encryptor").finish()
Expand Down
5 changes: 4 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.112"
version = "0.4.120"
edition = "2021"
build = "src/build.rs"
license = "Apache-2.0"
Expand Down Expand Up @@ -90,6 +90,9 @@ uuid = { version = "1.11.0", features = ["v7", "fast-rng", "zerocopy"] }
xxhash-rust = { version = "0.8.15", features = ["xxh32"] }
zip = "2.2.2"

[dev-dependencies]
mockall = "0.13.1"

[target.'cfg(not(target_env = "msvc"))'.dependencies]
mimalloc = { version = "0.1", optional = true }

Expand Down
Loading
Loading