diff --git a/Cargo.lock b/Cargo.lock index 85c7294ed..f460461c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1856,7 +1856,7 @@ dependencies = [ [[package]] name = "iggy" -version = "0.1.8" +version = "0.2.0" dependencies = [ "aes-gcm", "anyhow", @@ -1895,7 +1895,7 @@ dependencies = [ [[package]] name = "iggy_examples" -version = "0.0.1" +version = "0.0.2" dependencies = [ "anyhow", "bytes", @@ -3538,7 +3538,7 @@ dependencies = [ [[package]] name = "server" -version = "0.1.9" +version = "0.2.0" dependencies = [ "aes-gcm", "anyhow", diff --git a/bench/src/benchmarks/benchmark.rs b/bench/src/benchmarks/benchmark.rs index 12f10f252..93f9bb0e0 100644 --- a/bench/src/benchmarks/benchmark.rs +++ b/bench/src/benchmarks/benchmark.rs @@ -70,7 +70,10 @@ pub trait Benchmarkable { info!("Creating the test stream {}", stream_id); let name = format!("stream {}", stream_id); client - .create_stream(&CreateStream { stream_id, name }) + .create_stream(&CreateStream { + stream_id: Some(stream_id), + name, + }) .await?; info!( @@ -81,7 +84,7 @@ pub trait Benchmarkable { client .create_topic(&CreateTopic { stream_id: Identifier::numeric(stream_id)?, - topic_id, + topic_id: Some(topic_id), partitions_count, name, message_expiry: None, diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 7164d995b..14e2fe74e 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iggy_examples" -version = "0.0.1" +version = "0.0.2" edition = "2021" [[example]] diff --git a/examples/src/getting-started/producer/main.rs b/examples/src/getting-started/producer/main.rs index cbe6b097b..64204b40f 100644 --- a/examples/src/getting-started/producer/main.rs +++ b/examples/src/getting-started/producer/main.rs @@ -50,7 +50,7 @@ async fn main() -> Result<(), Box> { async fn init_system(client: &IggyClient) { match client .create_stream(&CreateStream { - stream_id: STREAM_ID, + stream_id: Some(STREAM_ID), name: "sample-stream".to_string(), }) .await @@ -62,7 +62,7 @@ async fn init_system(client: &IggyClient) { match client .create_topic(&CreateTopic { stream_id: Identifier::numeric(STREAM_ID).unwrap(), - topic_id: TOPIC_ID, + topic_id: Some(TOPIC_ID), partitions_count: 1, name: "sample-topic".to_string(), message_expiry: None, diff --git a/examples/src/shared/system.rs b/examples/src/shared/system.rs index 5414c0353..ed7db7d19 100644 --- a/examples/src/shared/system.rs +++ b/examples/src/shared/system.rs @@ -79,14 +79,14 @@ pub async fn init_by_producer(args: &Args, client: &dyn Client) -> Result<(), Er info!("Stream does not exist, creating..."); client .create_stream(&CreateStream { - stream_id: args.stream_id, + stream_id: Some(args.stream_id), name: "sample".to_string(), }) .await?; client .create_topic(&CreateTopic { stream_id: Identifier::numeric(args.stream_id).unwrap(), - topic_id: args.topic_id, + topic_id: Some(args.topic_id), partitions_count: args.partitions_count, name: "orders".to_string(), message_expiry: None, diff --git a/integration/tests/cli/consumer_group/test_consumer_group_create_command.rs b/integration/tests/cli/consumer_group/test_consumer_group_create_command.rs index ef18aecbd..5fd5d4666 100644 --- a/integration/tests/cli/consumer_group/test_consumer_group_create_command.rs +++ b/integration/tests/cli/consumer_group/test_consumer_group_create_command.rs @@ -71,7 +71,7 @@ impl IggyCmdTestCase for TestConsumerGroupCreateCmd { async fn prepare_server_state(&mut self, client: &dyn Client) { let stream = client .create_stream(&CreateStream { - stream_id: self.stream_id, + stream_id: Some(self.stream_id), name: self.stream_name.clone(), }) .await; @@ -80,7 +80,7 @@ impl IggyCmdTestCase for TestConsumerGroupCreateCmd { let topic = client .create_topic(&CreateTopic { stream_id: Identifier::numeric(self.stream_id).unwrap(), - topic_id: self.topic_id, + topic_id: Some(self.topic_id), partitions_count: 0, name: self.topic_name.clone(), message_expiry: None, diff --git a/integration/tests/cli/consumer_group/test_consumer_group_delete_command.rs b/integration/tests/cli/consumer_group/test_consumer_group_delete_command.rs index 89896026a..f136dc583 100644 --- a/integration/tests/cli/consumer_group/test_consumer_group_delete_command.rs +++ b/integration/tests/cli/consumer_group/test_consumer_group_delete_command.rs @@ -77,7 +77,7 @@ impl IggyCmdTestCase for TestConsumerGroupDeleteCmd { async fn prepare_server_state(&mut self, client: &dyn Client) { let stream = client .create_stream(&CreateStream { - stream_id: self.stream_id, + stream_id: Some(self.stream_id), name: self.stream_name.clone(), }) .await; @@ -86,7 +86,7 @@ impl IggyCmdTestCase for TestConsumerGroupDeleteCmd { let topic = client .create_topic(&CreateTopic { stream_id: Identifier::numeric(self.stream_id).unwrap(), - topic_id: self.topic_id, + topic_id: Some(self.topic_id), partitions_count: 0, name: self.topic_name.clone(), message_expiry: None, diff --git a/integration/tests/cli/consumer_group/test_consumer_group_get_command.rs b/integration/tests/cli/consumer_group/test_consumer_group_get_command.rs index 84ba94d3d..d37c9517c 100644 --- a/integration/tests/cli/consumer_group/test_consumer_group_get_command.rs +++ b/integration/tests/cli/consumer_group/test_consumer_group_get_command.rs @@ -77,7 +77,7 @@ impl IggyCmdTestCase for TestConsumerGroupGetCmd { async fn prepare_server_state(&mut self, client: &dyn Client) { let stream = client .create_stream(&CreateStream { - stream_id: self.stream_id, + stream_id: Some(self.stream_id), name: self.stream_name.clone(), }) .await; @@ -86,7 +86,7 @@ impl IggyCmdTestCase for TestConsumerGroupGetCmd { let topic = client .create_topic(&CreateTopic { stream_id: Identifier::numeric(self.stream_id).unwrap(), - topic_id: self.topic_id, + topic_id: Some(self.topic_id), partitions_count: 0, name: self.topic_name.clone(), message_expiry: None, diff --git a/integration/tests/cli/consumer_group/test_consumer_group_list_command.rs b/integration/tests/cli/consumer_group/test_consumer_group_list_command.rs index 444777c32..e8d6a73f3 100644 --- a/integration/tests/cli/consumer_group/test_consumer_group_list_command.rs +++ b/integration/tests/cli/consumer_group/test_consumer_group_list_command.rs @@ -74,7 +74,7 @@ impl IggyCmdTestCase for TestConsumerGroupListCmd { async fn prepare_server_state(&mut self, client: &dyn Client) { let stream = client .create_stream(&CreateStream { - stream_id: self.stream_id, + stream_id: Some(self.stream_id), name: self.stream_name.clone(), }) .await; @@ -83,7 +83,7 @@ impl IggyCmdTestCase for TestConsumerGroupListCmd { let topic = client .create_topic(&CreateTopic { stream_id: Identifier::numeric(self.stream_id).unwrap(), - topic_id: self.topic_id, + topic_id: Some(self.topic_id), partitions_count: 0, name: self.topic_name.clone(), message_expiry: None, diff --git a/integration/tests/cli/message/test_message_poll_command.rs b/integration/tests/cli/message/test_message_poll_command.rs index 9a595d002..b9bb0b982 100644 --- a/integration/tests/cli/message/test_message_poll_command.rs +++ b/integration/tests/cli/message/test_message_poll_command.rs @@ -99,7 +99,7 @@ impl IggyCmdTestCase for TestMessagePollCmd { async fn prepare_server_state(&mut self, client: &dyn Client) { let stream = client .create_stream(&CreateStream { - stream_id: self.stream_id, + stream_id: Some(self.stream_id), name: self.stream_name.clone(), }) .await; @@ -108,7 +108,7 @@ impl IggyCmdTestCase for TestMessagePollCmd { let topic = client .create_topic(&CreateTopic { stream_id: Identifier::numeric(self.stream_id).unwrap(), - topic_id: self.topic_id, + topic_id: Some(self.topic_id), partitions_count: self.partitions_count, name: self.topic_name.clone(), message_expiry: None, diff --git a/integration/tests/cli/message/test_message_send_command.rs b/integration/tests/cli/message/test_message_send_command.rs index 05ac58567..21525b8ef 100644 --- a/integration/tests/cli/message/test_message_send_command.rs +++ b/integration/tests/cli/message/test_message_send_command.rs @@ -128,7 +128,7 @@ impl IggyCmdTestCase for TestMessageSendCmd { async fn prepare_server_state(&mut self, client: &dyn Client) { let stream = client .create_stream(&CreateStream { - stream_id: self.stream_id, + stream_id: Some(self.stream_id), name: self.stream_name.clone(), }) .await; @@ -137,7 +137,7 @@ impl IggyCmdTestCase for TestMessageSendCmd { let topic = client .create_topic(&CreateTopic { stream_id: Identifier::numeric(self.stream_id).unwrap(), - topic_id: self.topic_id, + topic_id: Some(self.topic_id), partitions_count: self.partitions_count, name: self.topic_name.clone(), message_expiry: None, diff --git a/integration/tests/cli/partition/test_partition_create_command.rs b/integration/tests/cli/partition/test_partition_create_command.rs index 1ffd8fa10..4f63d1fc5 100644 --- a/integration/tests/cli/partition/test_partition_create_command.rs +++ b/integration/tests/cli/partition/test_partition_create_command.rs @@ -70,7 +70,7 @@ impl IggyCmdTestCase for TestPartitionCreateCmd { async fn prepare_server_state(&mut self, client: &dyn Client) { let stream = client .create_stream(&CreateStream { - stream_id: self.stream_id, + stream_id: Some(self.stream_id), name: self.stream_name.clone(), }) .await; @@ -79,7 +79,7 @@ impl IggyCmdTestCase for TestPartitionCreateCmd { let topic = client .create_topic(&CreateTopic { stream_id: Identifier::numeric(self.stream_id).unwrap(), - topic_id: self.topic_id, + topic_id: Some(self.topic_id), partitions_count: self.partitions_count, name: self.topic_name.clone(), message_expiry: None, diff --git a/integration/tests/cli/partition/test_partition_delete_command.rs b/integration/tests/cli/partition/test_partition_delete_command.rs index 29d2e4a76..72cded8d5 100644 --- a/integration/tests/cli/partition/test_partition_delete_command.rs +++ b/integration/tests/cli/partition/test_partition_delete_command.rs @@ -70,7 +70,7 @@ impl IggyCmdTestCase for TestPartitionDeleteCmd { async fn prepare_server_state(&mut self, client: &dyn Client) { let stream = client .create_stream(&CreateStream { - stream_id: self.stream_id, + stream_id: Some(self.stream_id), name: self.stream_name.clone(), }) .await; @@ -79,7 +79,7 @@ impl IggyCmdTestCase for TestPartitionDeleteCmd { let topic = client .create_topic(&CreateTopic { stream_id: Identifier::numeric(self.stream_id).unwrap(), - topic_id: self.topic_id, + topic_id: Some(self.topic_id), partitions_count: self.partitions_count, name: self.topic_name.clone(), message_expiry: None, diff --git a/integration/tests/cli/stream/test_stream_delete_command.rs b/integration/tests/cli/stream/test_stream_delete_command.rs index c61511f73..cdaccd7c4 100644 --- a/integration/tests/cli/stream/test_stream_delete_command.rs +++ b/integration/tests/cli/stream/test_stream_delete_command.rs @@ -38,7 +38,7 @@ impl IggyCmdTestCase for TestStreamDeleteCmd { async fn prepare_server_state(&mut self, client: &dyn Client) { let stream = client .create_stream(&CreateStream { - stream_id: self.stream_id, + stream_id: Some(self.stream_id), name: self.name.clone(), }) .await; diff --git a/integration/tests/cli/stream/test_stream_get_command.rs b/integration/tests/cli/stream/test_stream_get_command.rs index 5d3228f82..ac24c92f4 100644 --- a/integration/tests/cli/stream/test_stream_get_command.rs +++ b/integration/tests/cli/stream/test_stream_get_command.rs @@ -37,7 +37,7 @@ impl IggyCmdTestCase for TestStreamGetCmd { async fn prepare_server_state(&mut self, client: &dyn Client) { let stream = client .create_stream(&CreateStream { - stream_id: self.stream_id, + stream_id: Some(self.stream_id), name: self.name.clone(), }) .await; diff --git a/integration/tests/cli/stream/test_stream_list_command.rs b/integration/tests/cli/stream/test_stream_list_command.rs index 53d4f4e07..30c98a1b1 100644 --- a/integration/tests/cli/stream/test_stream_list_command.rs +++ b/integration/tests/cli/stream/test_stream_list_command.rs @@ -34,7 +34,7 @@ impl IggyCmdTestCase for TestStreamListCmd { async fn prepare_server_state(&mut self, client: &dyn Client) { let stream = client .create_stream(&CreateStream { - stream_id: self.stream_id, + stream_id: Some(self.stream_id), name: self.name.clone(), }) .await; diff --git a/integration/tests/cli/stream/test_stream_update_command.rs b/integration/tests/cli/stream/test_stream_update_command.rs index 145183d7c..5ed1e37b3 100644 --- a/integration/tests/cli/stream/test_stream_update_command.rs +++ b/integration/tests/cli/stream/test_stream_update_command.rs @@ -42,7 +42,7 @@ impl IggyCmdTestCase for TestStreamUpdateCmd { async fn prepare_server_state(&mut self, client: &dyn Client) { let stream = client .create_stream(&CreateStream { - stream_id: self.stream_id, + stream_id: Some(self.stream_id), name: self.name.clone(), }) .await; diff --git a/integration/tests/cli/system/test_stats_command.rs b/integration/tests/cli/system/test_stats_command.rs index 4bea2ef36..b379309b2 100644 --- a/integration/tests/cli/system/test_stats_command.rs +++ b/integration/tests/cli/system/test_stats_command.rs @@ -15,7 +15,7 @@ impl IggyCmdTestCase for TestStatsCmd { let stream_id = Identifier::from_str_value("logs").unwrap(); let stream = client .create_stream(&CreateStream { - stream_id: 1, + stream_id: Some(1), name: stream_id.as_string(), }) .await; @@ -23,7 +23,7 @@ impl IggyCmdTestCase for TestStatsCmd { let topic = client .create_topic(&CreateTopic { - topic_id: 1, + topic_id: Some(1), stream_id, partitions_count: 5, message_expiry: None, diff --git a/integration/tests/cli/topic/test_topic_create_command.rs b/integration/tests/cli/topic/test_topic_create_command.rs index f9ff448ae..e4f16734d 100644 --- a/integration/tests/cli/topic/test_topic_create_command.rs +++ b/integration/tests/cli/topic/test_topic_create_command.rs @@ -72,7 +72,7 @@ impl IggyCmdTestCase for TestTopicCreateCmd { async fn prepare_server_state(&mut self, client: &dyn Client) { let stream = client .create_stream(&CreateStream { - stream_id: self.stream_id, + stream_id: Some(self.stream_id), name: self.stream_name.clone(), }) .await; diff --git a/integration/tests/cli/topic/test_topic_delete_command.rs b/integration/tests/cli/topic/test_topic_delete_command.rs index a1dadfd0c..0e911e5a1 100644 --- a/integration/tests/cli/topic/test_topic_delete_command.rs +++ b/integration/tests/cli/topic/test_topic_delete_command.rs @@ -59,7 +59,7 @@ impl IggyCmdTestCase for TestTopicDeleteCmd { async fn prepare_server_state(&mut self, client: &dyn Client) { let stream = client .create_stream(&CreateStream { - stream_id: self.stream_id, + stream_id: Some(self.stream_id), name: self.stream_name.clone(), }) .await; @@ -68,7 +68,7 @@ impl IggyCmdTestCase for TestTopicDeleteCmd { let topic = client .create_topic(&CreateTopic { stream_id: Identifier::numeric(self.stream_id).unwrap(), - topic_id: self.topic_id, + topic_id: Some(self.topic_id), partitions_count: 1, name: self.topic_name.clone(), message_expiry: None, diff --git a/integration/tests/cli/topic/test_topic_get_command.rs b/integration/tests/cli/topic/test_topic_get_command.rs index 315489d13..ce9ac05f4 100644 --- a/integration/tests/cli/topic/test_topic_get_command.rs +++ b/integration/tests/cli/topic/test_topic_get_command.rs @@ -60,7 +60,7 @@ impl IggyCmdTestCase for TestTopicGetCmd { async fn prepare_server_state(&mut self, client: &dyn Client) { let stream = client .create_stream(&CreateStream { - stream_id: self.stream_id, + stream_id: Some(self.stream_id), name: self.stream_name.clone(), }) .await; @@ -69,7 +69,7 @@ impl IggyCmdTestCase for TestTopicGetCmd { let topic = client .create_topic(&CreateTopic { stream_id: Identifier::numeric(self.stream_id).unwrap(), - topic_id: self.topic_id, + topic_id: Some(self.topic_id), partitions_count: 1, name: self.topic_name.clone(), message_expiry: None, diff --git a/integration/tests/cli/topic/test_topic_list_command.rs b/integration/tests/cli/topic/test_topic_list_command.rs index 28ef66569..67d5a12b9 100644 --- a/integration/tests/cli/topic/test_topic_list_command.rs +++ b/integration/tests/cli/topic/test_topic_list_command.rs @@ -57,7 +57,7 @@ impl IggyCmdTestCase for TestTopicListCmd { async fn prepare_server_state(&mut self, client: &dyn Client) { let stream = client .create_stream(&CreateStream { - stream_id: self.stream_id, + stream_id: Some(self.stream_id), name: self.stream_name.clone(), }) .await; @@ -66,7 +66,7 @@ impl IggyCmdTestCase for TestTopicListCmd { let topic = client .create_topic(&CreateTopic { stream_id: Identifier::numeric(self.stream_id).unwrap(), - topic_id: self.topic_id, + topic_id: Some(self.topic_id), partitions_count: 1, name: self.topic_name.clone(), message_expiry: None, diff --git a/integration/tests/cli/topic/test_topic_update_command.rs b/integration/tests/cli/topic/test_topic_update_command.rs index e16fd00d3..d15f4b627 100644 --- a/integration/tests/cli/topic/test_topic_update_command.rs +++ b/integration/tests/cli/topic/test_topic_update_command.rs @@ -104,7 +104,7 @@ impl IggyCmdTestCase for TestTopicUpdateCmd { async fn prepare_server_state(&mut self, client: &dyn Client) { let stream = client .create_stream(&CreateStream { - stream_id: self.stream_id, + stream_id: Some(self.stream_id), name: self.stream_name.clone(), }) .await; @@ -125,7 +125,7 @@ impl IggyCmdTestCase for TestTopicUpdateCmd { let topic = client .create_topic(&CreateTopic { stream_id: Identifier::numeric(self.stream_id).unwrap(), - topic_id: self.topic_id, + topic_id: Some(self.topic_id), partitions_count: 1, name: self.topic_name.clone(), message_expiry, diff --git a/integration/tests/examples/mod.rs b/integration/tests/examples/mod.rs index df14327b1..07bc32f5e 100644 --- a/integration/tests/examples/mod.rs +++ b/integration/tests/examples/mod.rs @@ -122,7 +122,7 @@ impl<'a> IggyExampleTest<'a> { if existing_stream_and_topic { self.client .create_stream(&CreateStream { - stream_id: 1, + stream_id: Some(1), name: "sample-stream".to_string(), }) .await @@ -130,7 +130,7 @@ impl<'a> IggyExampleTest<'a> { self.client .create_topic(&CreateTopic { stream_id: Identifier::numeric(1).unwrap(), - topic_id: 1, + topic_id: Some(1), partitions_count: 1, name: "sample-topic".to_string(), message_expiry: None, diff --git a/integration/tests/examples/test_getting_started.rs b/integration/tests/examples/test_getting_started.rs index 10413cdab..083766ccc 100644 --- a/integration/tests/examples/test_getting_started.rs +++ b/integration/tests/examples/test_getting_started.rs @@ -57,9 +57,9 @@ async fn should_succeed_with_preexisting_stream_and_topic() { iggy_example_test .execute_test(TestGettingStarted { expected_producer_output: vec![ - "Received an invalid response with status: 1011 (stream_id_already_exists).", + "Received an invalid response with status: 1012 (stream_name_already_exists).", "Stream already exists and will not be created again.", - "Received an invalid response with status: 2012 (topic_id_already_exists).", + "Received an invalid response with status: 2013 (topic_name_already_exists).", "Topic already exists and will not be created again.", "Sent 10 message(s).", ], diff --git a/integration/tests/server/scenarios/consumer_group_join_scenario.rs b/integration/tests/server/scenarios/consumer_group_join_scenario.rs index 7f78c5f6f..dd099ae5f 100644 --- a/integration/tests/server/scenarios/consumer_group_join_scenario.rs +++ b/integration/tests/server/scenarios/consumer_group_join_scenario.rs @@ -34,7 +34,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { // 1. Create the stream let create_stream = CreateStream { - stream_id: STREAM_ID, + stream_id: Some(STREAM_ID), name: STREAM_NAME.to_string(), }; system_client.create_stream(&create_stream).await.unwrap(); @@ -42,7 +42,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { // 2. Create the topic let create_topic = CreateTopic { stream_id: Identifier::numeric(STREAM_ID).unwrap(), - topic_id: TOPIC_ID, + topic_id: Some(TOPIC_ID), partitions_count: PARTITIONS_COUNT, name: TOPIC_NAME.to_string(), message_expiry: None, diff --git a/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs b/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs index e8571f03a..e1f90fecd 100644 --- a/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs +++ b/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs @@ -58,7 +58,7 @@ async fn init_system( ) { // 1. Create the stream let create_stream = CreateStream { - stream_id: STREAM_ID, + stream_id: Some(STREAM_ID), name: STREAM_NAME.to_string(), }; system_client.create_stream(&create_stream).await.unwrap(); @@ -66,7 +66,7 @@ async fn init_system( // 2. Create the topic let create_topic = CreateTopic { stream_id: Identifier::numeric(STREAM_ID).unwrap(), - topic_id: TOPIC_ID, + topic_id: Some(TOPIC_ID), partitions_count: PARTITIONS_COUNT, name: TOPIC_NAME.to_string(), message_expiry: None, diff --git a/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs b/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs index fed4cd863..247bf302c 100644 --- a/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs +++ b/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs @@ -45,7 +45,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { async fn init_system(client: &IggyClient) { // 1. Create the stream let create_stream = CreateStream { - stream_id: STREAM_ID, + stream_id: Some(STREAM_ID), name: STREAM_NAME.to_string(), }; client.create_stream(&create_stream).await.unwrap(); @@ -53,7 +53,7 @@ async fn init_system(client: &IggyClient) { // 2. Create the topic let create_topic = CreateTopic { stream_id: Identifier::numeric(STREAM_ID).unwrap(), - topic_id: TOPIC_ID, + topic_id: Some(TOPIC_ID), partitions_count: PARTITIONS_COUNT, name: TOPIC_NAME.to_string(), message_expiry: None, diff --git a/integration/tests/server/scenarios/message_headers_scenario.rs b/integration/tests/server/scenarios/message_headers_scenario.rs index f27d52f29..306ef1c2d 100644 --- a/integration/tests/server/scenarios/message_headers_scenario.rs +++ b/integration/tests/server/scenarios/message_headers_scenario.rs @@ -97,7 +97,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { async fn init_system(client: &IggyClient) { // 1. Create the stream let create_stream = CreateStream { - stream_id: STREAM_ID, + stream_id: Some(STREAM_ID), name: STREAM_NAME.to_string(), }; client.create_stream(&create_stream).await.unwrap(); @@ -105,7 +105,7 @@ async fn init_system(client: &IggyClient) { // 2. Create the topic let create_topic = CreateTopic { stream_id: Identifier::numeric(STREAM_ID).unwrap(), - topic_id: TOPIC_ID, + topic_id: Some(TOPIC_ID), partitions_count: PARTITIONS_COUNT, name: TOPIC_NAME.to_string(), message_expiry: None, diff --git a/integration/tests/server/scenarios/system_scenario.rs b/integration/tests/server/scenarios/system_scenario.rs index 4a9d60414..e4cb4437f 100644 --- a/integration/tests/server/scenarios/system_scenario.rs +++ b/integration/tests/server/scenarios/system_scenario.rs @@ -75,7 +75,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { // 3. Create the stream let mut create_stream = CreateStream { - stream_id: STREAM_ID, + stream_id: Some(STREAM_ID), name: STREAM_NAME.to_string(), }; client.create_stream(&create_stream).await.unwrap(); @@ -120,7 +120,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert!(create_stream_result.is_err()); // 8. Try to create the stream with the same name but the different ID and validate that it fails - create_stream.stream_id = STREAM_ID + 1; + create_stream.stream_id = Some(STREAM_ID + 1); create_stream.name = STREAM_NAME.to_string(); let create_stream_result = client.create_stream(&create_stream).await; assert!(create_stream_result.is_err()); @@ -128,7 +128,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { // 9. Create the topic let mut create_topic = CreateTopic { stream_id: Identifier::numeric(STREAM_ID).unwrap(), - topic_id: TOPIC_ID, + topic_id: Some(TOPIC_ID), partitions_count: PARTITIONS_COUNT, name: TOPIC_NAME.to_string(), message_expiry: None, @@ -215,7 +215,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert!(create_topic_result.is_err()); // 16. Try to create the topic with the different ID but the same name and validate that it fails - create_topic.topic_id = TOPIC_ID + 1; + create_topic.topic_id = Some(TOPIC_ID + 1); create_topic.name = TOPIC_NAME.to_string(); let create_topic_result = client.create_topic(&create_topic).await; assert!(create_topic_result.is_err()); @@ -684,17 +684,68 @@ pub async fn run(client_factory: &dyn ClientFactory) { .unwrap(); assert!(topics.is_empty()); - // 41. Delete the existing stream and ensure it doesn't exist anymore - client - .delete_stream(&DeleteStream { - stream_id: Identifier::numeric(STREAM_ID).unwrap(), + // 41. Create the stream with automatically generated ID on the server + let stream_name = format!("{}-auto", STREAM_NAME); + let stream_id = STREAM_ID + 1; + let create_stream = CreateStream { + stream_id: None, + name: stream_name.clone(), + }; + client.create_stream(&create_stream).await.unwrap(); + + let stream = client + .get_stream(&GetStream { + stream_id: Identifier::numeric(stream_id).unwrap(), }) .await .unwrap(); + + assert_eq!(stream.id, stream_id); + assert_eq!(stream.name, stream_name); + + // 42. Create the topic with automatically generated ID on the server + let topic_name = format!("{}-auto", TOPIC_NAME); + let topic_id = 1; + let create_topic = CreateTopic { + stream_id: Identifier::numeric(stream_id).unwrap(), + topic_id: None, + partitions_count: PARTITIONS_COUNT, + name: topic_name.clone(), + message_expiry: None, + max_topic_size: None, + replication_factor: 1, + }; + + client.create_topic(&create_topic).await.unwrap(); + + let topic = client + .get_topic(&GetTopic { + stream_id: Identifier::numeric(stream_id).unwrap(), + topic_id: Identifier::numeric(topic_id).unwrap(), + }) + .await + .unwrap(); + + assert_eq!(topic.id, topic_id); + assert_eq!(topic.name, topic_name); + + // 43. Delete the existing streams and ensure there's no streams left + let streams = client.get_streams(&GetStreams {}).await.unwrap(); + assert_eq!(streams.len(), 2); + + for stream in streams { + client + .delete_stream(&DeleteStream { + stream_id: Identifier::numeric(stream.id).unwrap(), + }) + .await + .unwrap(); + } + let streams = client.get_streams(&GetStreams {}).await.unwrap(); assert!(streams.is_empty()); - // 42. Get clients and ensure that there's 0 (HTTP) or 1 (TCP, QUIC) client + // 44. Get clients and ensure that there's 0 (HTTP) or 1 (TCP, QUIC) client let clients = client.get_clients(&GetClients {}).await.unwrap(); assert!(clients.len() <= 1); diff --git a/integration/tests/streaming/stream.rs b/integration/tests/streaming/stream.rs index 94ea94be1..e69cbe994 100644 --- a/integration/tests/streaming/stream.rs +++ b/integration/tests/streaming/stream.rs @@ -94,7 +94,7 @@ async fn should_purge_existing_stream_on_disk() { let topic_id = 1; stream - .create_topic(topic_id, "test", 1, None, None, 1) + .create_topic(Some(topic_id), "test", 1, None, None, 1) .await .unwrap(); diff --git a/integration/tests/streaming/system.rs b/integration/tests/streaming/system.rs index ceade35cd..0e8e6d969 100644 --- a/integration/tests/streaming/system.rs +++ b/integration/tests/streaming/system.rs @@ -44,7 +44,28 @@ async fn should_create_and_persist_stream() { system.init().await.unwrap(); system - .create_stream(&session, stream_id, stream_name) + .create_stream(&session, Some(stream_id), stream_name) + .await + .unwrap(); + + assert_persisted_stream(&setup.config.get_streams_path(), stream_id).await; +} + +#[tokio::test] +async fn should_create_and_persist_stream_with_automatically_generated_id() { + let setup = TestSetup::init().await; + let mut system = System::new( + setup.config.clone(), + Some(setup.db.clone()), + PersonalAccessTokenConfig::default(), + ); + let stream_id = 1; + let stream_name = "test"; + let session = Session::new(1, 1, SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234)); + system.init().await.unwrap(); + + system + .create_stream(&session, None, stream_name) .await .unwrap(); @@ -64,7 +85,7 @@ async fn should_delete_persisted_stream() { let session = Session::new(1, 1, SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234)); system.init().await.unwrap(); system - .create_stream(&session, stream_id, stream_name) + .create_stream(&session, Some(stream_id), stream_name) .await .unwrap(); assert_persisted_stream(&setup.config.get_streams_path(), stream_id).await; diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index 0066859f8..adf3036da 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iggy" -version = "0.1.8" +version = "0.2.0" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2021" license = "MIT" diff --git a/sdk/src/cli/streams/create_stream.rs b/sdk/src/cli/streams/create_stream.rs index cdc970850..e7feacca3 100644 --- a/sdk/src/cli/streams/create_stream.rs +++ b/sdk/src/cli/streams/create_stream.rs @@ -12,7 +12,10 @@ pub struct CreateStreamCmd { impl CreateStreamCmd { pub fn new(stream_id: u32, name: String) -> Self { Self { - create_stream: CreateStream { stream_id, name }, + create_stream: CreateStream { + stream_id: Some(stream_id), + name, + }, } } } @@ -22,7 +25,8 @@ impl CliCommand for CreateStreamCmd { fn explain(&self) -> String { format!( "create stream with ID: {} and name: {}", - self.create_stream.stream_id, self.create_stream.name + self.create_stream.stream_id.unwrap_or(0), + self.create_stream.name ) } @@ -33,13 +37,14 @@ impl CliCommand for CreateStreamCmd { .with_context(|| { format!( "Problem creating stream (ID: {} and name: {})", - self.create_stream.stream_id, self.create_stream.name + self.create_stream.stream_id.unwrap_or(0), + self.create_stream.name ) })?; event!(target: PRINT_TARGET, Level::INFO, "Stream with ID: {} and name: {} created", - self.create_stream.stream_id, self.create_stream.name + self.create_stream.stream_id.unwrap_or(0), self.create_stream.name ); Ok(()) diff --git a/sdk/src/cli/topics/create_topic.rs b/sdk/src/cli/topics/create_topic.rs index 74da3834e..bb8af2955 100644 --- a/sdk/src/cli/topics/create_topic.rs +++ b/sdk/src/cli/topics/create_topic.rs @@ -29,7 +29,7 @@ impl CreateTopicCmd { Self { create_topic: CreateTopic { stream_id, - topic_id, + topic_id: Some(topic_id), partitions_count, name, message_expiry: message_expiry.clone().into(), @@ -56,13 +56,13 @@ impl CliCommand for CreateTopicCmd { .with_context(|| { format!( "Problem creating topic (ID: {}, name: {}, partitions count: {}) in stream with ID: {}", - self.create_topic.topic_id, self.create_topic.name, self.create_topic.partitions_count, self.create_topic.stream_id + self.create_topic.topic_id.unwrap_or(0), self.create_topic.name, self.create_topic.partitions_count, self.create_topic.stream_id ) })?; event!(target: PRINT_TARGET, Level::INFO, "Topic with ID: {}, name: {}, partitions count: {}, message expiry: {}, max topic size: {}, replication factor: {} created in stream with ID: {}", - self.create_topic.topic_id, + self.create_topic.topic_id.unwrap_or(0), self.create_topic.name, self.create_topic.partitions_count, self.message_expiry, @@ -77,7 +77,7 @@ impl CliCommand for CreateTopicCmd { impl fmt::Display for CreateTopicCmd { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result { - let topic_id = &self.create_topic.topic_id; + let topic_id = &self.create_topic.topic_id.unwrap_or(0); let topic_name = &self.create_topic.name; let message_expiry = &self.message_expiry; let max_topic_size = &self.max_topic_size.as_human_string_with_zero_as_unlimited(); diff --git a/sdk/src/streams/create_stream.rs b/sdk/src/streams/create_stream.rs index 9d5a02954..f0896af79 100644 --- a/sdk/src/streams/create_stream.rs +++ b/sdk/src/streams/create_stream.rs @@ -15,8 +15,8 @@ use std::str::from_utf8; /// - `name` - unique stream name (string), max length is 255 characters. #[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct CreateStream { - /// Unique stream ID (numeric). - pub stream_id: u32, + /// Unique stream ID (numeric), if None is provided then the server will automatically assign it. + pub stream_id: Option, /// Unique stream name (string), max length is 255 characters. pub name: String, } @@ -26,7 +26,7 @@ impl CommandPayload for CreateStream {} impl Default for CreateStream { fn default() -> Self { CreateStream { - stream_id: 1, + stream_id: Some(1), name: "stream".to_string(), } } @@ -34,8 +34,10 @@ impl Default for CreateStream { impl Validatable for CreateStream { fn validate(&self) -> Result<(), Error> { - if self.stream_id == 0 { - return Err(Error::InvalidStreamId); + if let Some(stream_id) = self.stream_id { + if stream_id == 0 { + return Err(Error::InvalidStreamId); + } } if self.name.is_empty() || self.name.len() > MAX_NAME_LENGTH { @@ -53,7 +55,7 @@ impl Validatable for CreateStream { impl BytesSerializable for CreateStream { fn as_bytes(&self) -> Vec { let mut bytes = Vec::with_capacity(5 + self.name.len()); - bytes.put_u32_le(self.stream_id); + bytes.put_u32_le(self.stream_id.unwrap_or(0)); #[allow(clippy::cast_possible_truncation)] bytes.put_u8(self.name.len() as u8); bytes.extend(self.name.as_bytes()); @@ -66,6 +68,11 @@ impl BytesSerializable for CreateStream { } let stream_id = u32::from_le_bytes(bytes[..4].try_into()?); + let stream_id = if stream_id == 0 { + None + } else { + Some(stream_id) + }; let name_length = bytes[4]; let name = from_utf8(&bytes[5..5 + name_length as usize])?.to_string(); if name.len() != name_length as usize { @@ -80,7 +87,7 @@ impl BytesSerializable for CreateStream { impl Display for CreateStream { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}|{}", self.stream_id, self.name) + write!(f, "{}|{}", self.stream_id.unwrap_or(0), self.name) } } @@ -91,7 +98,7 @@ mod tests { #[test] fn should_be_serialized_as_bytes() { let command = CreateStream { - stream_id: 1, + stream_id: Some(1), name: "test".to_string(), }; @@ -101,7 +108,7 @@ mod tests { let name = from_utf8(&bytes[5..5 + name_length as usize]).unwrap(); assert!(!bytes.is_empty()); - assert_eq!(stream_id, command.stream_id); + assert_eq!(stream_id, command.stream_id.unwrap()); assert_eq!(name, command.name); } @@ -118,7 +125,7 @@ mod tests { assert!(command.is_ok()); let command = command.unwrap(); - assert_eq!(command.stream_id, stream_id); + assert_eq!(command.stream_id.unwrap(), stream_id); assert_eq!(command.name, name); } } diff --git a/sdk/src/topics/create_topic.rs b/sdk/src/topics/create_topic.rs index 2c35af03c..ae508f98b 100644 --- a/sdk/src/topics/create_topic.rs +++ b/sdk/src/topics/create_topic.rs @@ -26,8 +26,8 @@ pub struct CreateTopic { /// Unique stream ID (numeric or name). #[serde(skip)] pub stream_id: Identifier, - /// Unique topic ID (numeric). - pub topic_id: u32, + /// Unique topic ID (numeric), if None is provided then the server will automatically assign it. + pub topic_id: Option, /// Number of partitions in the topic, max value is 1000. pub partitions_count: u32, /// Optional message expiry in seconds, if `None` then messages will never expire. @@ -46,7 +46,7 @@ impl Default for CreateTopic { fn default() -> Self { CreateTopic { stream_id: Identifier::default(), - topic_id: 1, + topic_id: Some(1), partitions_count: 1, message_expiry: None, max_topic_size: None, @@ -58,8 +58,10 @@ impl Default for CreateTopic { impl Validatable for CreateTopic { fn validate(&self) -> Result<(), Error> { - if self.topic_id == 0 { - return Err(Error::InvalidTopicId); + if let Some(topic_id) = self.topic_id { + if topic_id == 0 { + return Err(Error::InvalidTopicId); + } } if self.name.is_empty() || self.name.len() > MAX_NAME_LENGTH { @@ -87,7 +89,7 @@ impl BytesSerializable for CreateTopic { let stream_id_bytes = self.stream_id.as_bytes(); let mut bytes = Vec::with_capacity(22 + stream_id_bytes.len() + self.name.len()); bytes.extend(stream_id_bytes); - bytes.put_u32_le(self.topic_id); + bytes.put_u32_le(self.topic_id.unwrap_or(0)); bytes.put_u32_le(self.partitions_count); match self.message_expiry { Some(message_expiry) => bytes.put_u32_le(message_expiry), @@ -112,6 +114,7 @@ impl BytesSerializable for CreateTopic { let stream_id = Identifier::from_bytes(bytes)?; position += stream_id.get_size_bytes() as usize; let topic_id = u32::from_le_bytes(bytes[position..position + 4].try_into()?); + let topic_id = if topic_id == 0 { None } else { Some(topic_id) }; let partitions_count = u32::from_le_bytes(bytes[position + 4..position + 8].try_into()?); let message_expiry = match u32::from_le_bytes(bytes[position + 8..position + 12].try_into()?) { @@ -154,7 +157,7 @@ impl Display for CreateTopic { f, "{}|{}|{}|{}|{}|{}|{}", self.stream_id, - self.topic_id, + self.topic_id.unwrap_or(0), self.partitions_count, self.message_expiry.unwrap_or(0), max_topic_size, @@ -173,7 +176,7 @@ mod tests { fn should_be_serialized_as_bytes() { let command = CreateTopic { stream_id: Identifier::numeric(1).unwrap(), - topic_id: 2, + topic_id: Some(2), partitions_count: 3, message_expiry: Some(10), max_topic_size: Some(IggyByteSize::from(100)), @@ -205,7 +208,7 @@ mod tests { assert!(!bytes.is_empty()); assert_eq!(stream_id, command.stream_id); - assert_eq!(topic_id, command.topic_id); + assert_eq!(topic_id, command.topic_id.unwrap()); assert_eq!(partitions_count, command.partitions_count); assert_eq!(message_expiry, command.message_expiry); assert_eq!(max_topic_size, command.max_topic_size); @@ -240,10 +243,12 @@ mod tests { let command = command.unwrap(); assert_eq!(command.stream_id, stream_id); - assert_eq!(command.topic_id, topic_id); + assert_eq!(command.topic_id.unwrap(), topic_id); + assert_eq!(command.name, name); assert_eq!(command.partitions_count, partitions_count); assert_eq!(command.message_expiry, Some(message_expiry)); - assert_eq!(command.topic_id, topic_id); + assert_eq!(command.max_topic_size, Some(max_topic_size)); + assert_eq!(command.replication_factor, replication_factor); assert_eq!(command.partitions_count, partitions_count); } } diff --git a/server/Cargo.toml b/server/Cargo.toml index a3ea84e33..3f47901c8 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.1.9" +version = "0.2.0" edition = "2021" build = "src/build.rs" diff --git a/server/src/streaming/streams/stream.rs b/server/src/streaming/streams/stream.rs index 0c624e1da..148dcc2e1 100644 --- a/server/src/streaming/streams/stream.rs +++ b/server/src/streaming/streams/stream.rs @@ -4,6 +4,7 @@ use crate::streaming::topics::topic::Topic; use iggy::utils::byte_size::IggyByteSize; use iggy::utils::timestamp::IggyTimestamp; use std::collections::HashMap; +use std::sync::atomic::AtomicU32; use std::sync::Arc; #[derive(Debug)] @@ -13,6 +14,7 @@ pub struct Stream { pub path: String, pub topics_path: String, pub created_at: u64, + pub current_topic_id: AtomicU32, pub(crate) topics: HashMap, pub(crate) topics_ids: HashMap, pub(crate) config: Arc, @@ -39,6 +41,7 @@ impl Stream { path, topics_path, config, + current_topic_id: AtomicU32::new(1), topics: HashMap::new(), topics_ids: HashMap::new(), storage, diff --git a/server/src/streaming/streams/topics.rs b/server/src/streaming/streams/topics.rs index fd4ef4988..8950b5662 100644 --- a/server/src/streaming/streams/topics.rs +++ b/server/src/streaming/streams/topics.rs @@ -4,6 +4,7 @@ use iggy::error::Error; use iggy::identifier::{IdKind, Identifier}; use iggy::utils::byte_size::IggyByteSize; use iggy::utils::text; +use std::sync::atomic::Ordering; use tracing::{debug, info}; impl Stream { @@ -13,24 +14,40 @@ impl Stream { pub async fn create_topic( &mut self, - id: u32, + topic_id: Option, name: &str, partitions_count: u32, message_expiry: Option, max_topic_size: Option, replication_factor: u8, ) -> Result<(), Error> { - if self.topics.contains_key(&id) { - return Err(Error::TopicIdAlreadyExists(id, self.stream_id)); - } - let name = text::to_lowercase_non_whitespace(name); if self.topics_ids.contains_key(&name) { return Err(Error::TopicNameAlreadyExists(name, self.stream_id)); } - // TODO: check if max_topic_size is not lower than system.segment.size + let mut id; + if topic_id.is_none() { + id = self.current_topic_id.fetch_add(1, Ordering::SeqCst); + loop { + if self.topics.contains_key(&id) { + if id == u32::MAX { + return Err(Error::TopicIdAlreadyExists(id, self.stream_id)); + } + id = self.current_topic_id.fetch_add(1, Ordering::SeqCst); + } else { + break; + } + } + } else { + id = topic_id.unwrap(); + } + + if self.topics.contains_key(&id) { + return Err(Error::TopicIdAlreadyExists(id, self.stream_id)); + } + // TODO: check if max_topic_size is not lower than system.segment.size let topic = Topic::create( self.stream_id, id, @@ -208,7 +225,14 @@ mod tests { let storage = Arc::new(get_test_system_storage()); let mut stream = Stream::create(stream_id, stream_name, config, storage); stream - .create_topic(topic_id, topic_name, 1, message_expiry, max_topic_size, 1) + .create_topic( + Some(topic_id), + topic_name, + 1, + message_expiry, + max_topic_size, + 1, + ) .await .unwrap(); diff --git a/server/src/streaming/systems/streams.rs b/server/src/streaming/systems/streams.rs index 85c883916..a2be3ee26 100644 --- a/server/src/streaming/systems/streams.rs +++ b/server/src/streaming/systems/streams.rs @@ -6,9 +6,12 @@ use iggy::error::Error; use iggy::identifier::{IdKind, Identifier}; use iggy::utils::text; use std::cell::RefCell; +use std::sync::atomic::{AtomicU32, Ordering}; use tokio::fs::read_dir; use tracing::{error, info}; +static CURRENT_STREAM_ID: AtomicU32 = AtomicU32::new(1); + impl System { pub(crate) async fn load_streams(&mut self) -> Result<(), Error> { info!("Loading streams from disk..."); @@ -153,23 +156,40 @@ impl System { pub async fn create_stream( &mut self, session: &Session, - stream_id: u32, + stream_id: Option, name: &str, ) -> Result<(), Error> { self.ensure_authenticated(session)?; self.permissioner.create_stream(session.get_user_id())?; - if self.streams.contains_key(&stream_id) { - return Err(Error::StreamIdAlreadyExists(stream_id)); - } - let name = text::to_lowercase_non_whitespace(name); if self.streams_ids.contains_key(&name) { return Err(Error::StreamNameAlreadyExists(name.to_string())); } - let stream = Stream::create(stream_id, &name, self.config.clone(), self.storage.clone()); + let mut id; + if stream_id.is_none() { + id = CURRENT_STREAM_ID.fetch_add(1, Ordering::SeqCst); + loop { + if self.streams.contains_key(&id) { + if id == u32::MAX { + return Err(Error::StreamIdAlreadyExists(id)); + } + id = CURRENT_STREAM_ID.fetch_add(1, Ordering::SeqCst); + } else { + break; + } + } + } else { + id = stream_id.unwrap(); + } + + if self.streams.contains_key(&id) { + return Err(Error::StreamIdAlreadyExists(id)); + } + + let stream = Stream::create(id, &name, self.config.clone(), self.storage.clone()); stream.persist().await?; - info!("Created stream with ID: {}, name: '{}'.", stream_id, name); + info!("Created stream with ID: {id}, name: '{name}'."); self.streams_ids.insert(name, stream.stream_id); self.streams.insert(stream.stream_id, stream); self.metrics.increment_streams(1); @@ -294,7 +314,7 @@ mod tests { ); system.permissioner.init_permissions_for_user(root); system - .create_stream(&session, stream_id, stream_name) + .create_stream(&session, Some(stream_id), stream_name) .await .unwrap(); diff --git a/server/src/streaming/systems/topics.rs b/server/src/streaming/systems/topics.rs index ae3b8ba1b..319b05c83 100644 --- a/server/src/streaming/systems/topics.rs +++ b/server/src/streaming/systems/topics.rs @@ -37,7 +37,7 @@ impl System { &mut self, session: &Session, stream_id: &Identifier, - topic_id: u32, + topic_id: Option, name: &str, partitions_count: u32, message_expiry: Option, diff --git a/tools/src/data-seeder/seeder.rs b/tools/src/data-seeder/seeder.rs index 6930ad9ea..a20ae4c01 100644 --- a/tools/src/data-seeder/seeder.rs +++ b/tools/src/data-seeder/seeder.rs @@ -25,19 +25,19 @@ pub async fn seed(client: &IggyClient) -> Result<(), Error> { async fn create_streams(client: &IggyClient) -> Result<(), Error> { client .create_stream(&CreateStream { - stream_id: PROD_STREAM_ID, + stream_id: Some(PROD_STREAM_ID), name: "prod".to_string(), }) .await?; client .create_stream(&CreateStream { - stream_id: TEST_STREAM_ID, + stream_id: Some(TEST_STREAM_ID), name: "test".to_string(), }) .await?; client .create_stream(&CreateStream { - stream_id: DEV_STREAM_ID, + stream_id: Some(DEV_STREAM_ID), name: "dev".to_string(), }) .await?; @@ -50,7 +50,7 @@ async fn create_topics(client: &IggyClient) -> Result<(), Error> { client .create_topic(&CreateTopic { stream_id: Identifier::numeric(stream_id)?, - topic_id: 1, + topic_id: Some(1), name: "orders".to_string(), partitions_count: 1, message_expiry: None, @@ -62,7 +62,7 @@ async fn create_topics(client: &IggyClient) -> Result<(), Error> { client .create_topic(&CreateTopic { stream_id: Identifier::numeric(stream_id)?, - topic_id: 2, + topic_id: Some(2), name: "users".to_string(), partitions_count: 2, message_expiry: None, @@ -74,7 +74,7 @@ async fn create_topics(client: &IggyClient) -> Result<(), Error> { client .create_topic(&CreateTopic { stream_id: Identifier::numeric(stream_id)?, - topic_id: 3, + topic_id: Some(3), name: "notifications".to_string(), partitions_count: 3, message_expiry: None, @@ -86,7 +86,7 @@ async fn create_topics(client: &IggyClient) -> Result<(), Error> { client .create_topic(&CreateTopic { stream_id: Identifier::numeric(stream_id)?, - topic_id: 4, + topic_id: Some(4), name: "payments".to_string(), partitions_count: 2, message_expiry: None, @@ -98,7 +98,7 @@ async fn create_topics(client: &IggyClient) -> Result<(), Error> { client .create_topic(&CreateTopic { stream_id: Identifier::numeric(stream_id)?, - topic_id: 5, + topic_id: Some(5), name: "deliveries".to_string(), partitions_count: 1, message_expiry: None,