diff --git a/Cargo.lock b/Cargo.lock index 8a7594332..61abb3e8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2319,7 +2319,7 @@ dependencies = [ [[package]] name = "iggy" -version = "0.6.63" +version = "0.6.70" dependencies = [ "aes-gcm", "ahash 0.8.11", @@ -4355,7 +4355,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.101" +version = "0.4.110" dependencies = [ "ahash 0.8.11", "anyhow", diff --git a/integration/tests/server/scenarios/system_scenario.rs b/integration/tests/server/scenarios/system_scenario.rs index ad64b0c60..8c13904a6 100644 --- a/integration/tests/server/scenarios/system_scenario.rs +++ b/integration/tests/server/scenarios/system_scenario.rs @@ -316,7 +316,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { .unwrap(); assert!(polled_messages.messages.is_empty()); - // 22. Get the existing customer offset and ensure it's 0 + // 22. Get the customer offset and ensure it's none let offset = client .get_consumer_offset( &consumer, @@ -325,11 +325,8 @@ pub async fn run(client_factory: &dyn ClientFactory) { Some(PARTITION_ID), ) .await - .unwrap() .expect("Failed to get consumer offset"); - assert_eq!(offset.partition_id, PARTITION_ID); - assert_eq!(offset.current_offset, (MESSAGES_COUNT - 1) as u64); - assert_eq!(offset.stored_offset, 0); + assert!(offset.is_none()); // 23. Store the consumer offset let stored_offset = 10; @@ -359,7 +356,42 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(offset.current_offset, (MESSAGES_COUNT - 1) as u64); assert_eq!(offset.stored_offset, stored_offset); - // 25. Poll messages from the specific partition in topic using next with auto commit + // 25. Delete the consumer offset + client + .delete_consumer_offset( + &consumer, + &Identifier::numeric(STREAM_ID).unwrap(), + &Identifier::numeric(TOPIC_ID).unwrap(), + Some(PARTITION_ID), + ) + .await + .unwrap(); + + // 26. Get the customer offset and ensure it's none + let offset = client + .get_consumer_offset( + &consumer, + &Identifier::numeric(STREAM_ID).unwrap(), + &Identifier::numeric(TOPIC_ID).unwrap(), + Some(PARTITION_ID), + ) + .await + .expect("Failed to get consumer offset"); + + assert!(offset.is_none()); + + client + .store_consumer_offset( + &consumer, + &Identifier::numeric(STREAM_ID).unwrap(), + &Identifier::numeric(TOPIC_ID).unwrap(), + Some(PARTITION_ID), + stored_offset, + ) + .await + .unwrap(); + + // 27. Poll messages from the specific partition in topic using next with auto commit let messages_count = 10; let polled_messages = client .poll_messages( @@ -380,7 +412,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(first_offset, stored_offset + 1); assert_eq!(last_offset, expected_last_offset); - // 26. Get the existing customer offset and ensure that auto commit during poll has worked + // 28. Get the existing customer offset and ensure that auto commit during poll has worked let offset = client .get_consumer_offset( &consumer, @@ -395,7 +427,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(offset.current_offset, (MESSAGES_COUNT - 1) as u64); assert_eq!(offset.stored_offset, expected_last_offset); - // 27. Get the consumer groups and validate that there are no groups + // 29. Get the consumer groups and validate that there are no groups let consumer_groups = client .get_consumer_groups( &Identifier::numeric(STREAM_ID).unwrap(), @@ -406,7 +438,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert!(consumer_groups.is_empty()); - // 28. Create the consumer group + // 30. Create the consumer group let consumer_group = client .create_consumer_group( &Identifier::numeric(STREAM_ID).unwrap(), @@ -420,7 +452,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(consumer_group.id, CONSUMER_GROUP_ID); assert_eq!(consumer_group.name, CONSUMER_GROUP_NAME); - // 29. Get the consumer groups and validate that there is one group + // 31. Get the consumer groups and validate that there is one group let consumer_groups = client .get_consumer_groups( &Identifier::numeric(STREAM_ID).unwrap(), @@ -435,7 +467,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(consumer_group.partitions_count, PARTITIONS_COUNT); assert_eq!(consumer_group.members_count, 0); - // 30. Get the consumer group details + // 32. Get the consumer group details let consumer_group = client .get_consumer_group( &Identifier::numeric(STREAM_ID).unwrap(), @@ -451,7 +483,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(consumer_group.members_count, 0); assert!(consumer_group.members.is_empty()); - // 31. Join the consumer group and then leave it if the feature is available + // 33. Join the consumer group and then leave it if the feature is available let result = client .join_consumer_group( &Identifier::numeric(STREAM_ID).unwrap(), @@ -493,7 +525,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { Err(e) => assert_eq!(e.as_code(), IggyError::FeatureUnavailable.as_code()), } - // 32. Get the stats and validate that there is one stream + // 34. Get the stats and validate that there is one stream let stats = client.get_stats().await.unwrap(); assert!(!stats.hostname.is_empty()); assert!(!stats.os_name.is_empty()); @@ -505,7 +537,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(stats.segments_count, PARTITIONS_COUNT); assert_eq!(stats.messages_count, MESSAGES_COUNT as u64); - // 33. Delete the consumer group + // 35. Delete the consumer group client .delete_consumer_group( &Identifier::numeric(STREAM_ID).unwrap(), @@ -515,7 +547,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { .await .unwrap(); - // 34. Create new partitions and validate that the number of partitions is increased + // 36. Create new partitions and validate that the number of partitions is increased client .create_partitions( &Identifier::numeric(STREAM_ID).unwrap(), @@ -536,7 +568,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(topic.partitions_count, 2 * PARTITIONS_COUNT); - // 35. Delete the partitions and validate that the number of partitions is decreased + // 37. Delete the partitions and validate that the number of partitions is decreased client .delete_partitions( &Identifier::numeric(STREAM_ID).unwrap(), @@ -557,7 +589,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(topic.partitions_count, PARTITIONS_COUNT); - // 36. Update the existing topic and ensure it's updated + // 38. Update the existing topic and ensure it's updated let updated_topic_name = format!("{}-updated", TOPIC_NAME); let updated_message_expiry = 1000; let message_expiry_duration = updated_message_expiry.into(); @@ -598,7 +630,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(updated_topic.max_topic_size, updated_max_topic_size); assert_eq!(updated_topic.replication_factor, updated_replication_factor); - // 37. Purge the existing topic and ensure it has no messages + // 39. Purge the existing topic and ensure it has no messages client .purge_topic( &Identifier::numeric(STREAM_ID).unwrap(), @@ -622,7 +654,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(polled_messages.current_offset, 0); assert!(polled_messages.messages.is_empty()); - // 38. Update the existing stream and ensure it's updated + // 40. Update the existing stream and ensure it's updated let updated_stream_name = format!("{}-updated", STREAM_NAME); client @@ -641,7 +673,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(updated_stream.name, updated_stream_name); - // 39. Purge the existing stream and ensure it has no messages + // 41. Purge the existing stream and ensure it has no messages let mut messages = create_messages(); client .send_messages( @@ -673,7 +705,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(polled_messages.current_offset, 0); assert!(polled_messages.messages.is_empty()); - // 40. Delete the existing topic and ensure it doesn't exist anymore + // 42. Delete the existing topic and ensure it doesn't exist anymore client .delete_topic( &Identifier::numeric(STREAM_ID).unwrap(), @@ -687,7 +719,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { .unwrap(); assert!(topics.is_empty()); - // 41. Create the stream with automatically generated ID on the server + // 43. Create the stream with automatically generated ID on the server let stream_name = format!("{}-auto", STREAM_NAME); let stream_id = STREAM_ID + 1; client.create_stream(&stream_name, None).await.unwrap(); @@ -701,7 +733,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(stream.id, stream_id); assert_eq!(stream.name, stream_name); - // 42. Create the topic with automatically generated ID on the server + // 44. Create the topic with automatically generated ID on the server let topic_name = format!("{}-auto", TOPIC_NAME); let topic_id = 1; client @@ -730,7 +762,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(topic.id, topic_id); assert_eq!(topic.name, topic_name); - // 43. Delete the existing streams and ensure there's no streams left + // 45. Delete the existing streams and ensure there's no streams left let streams = client.get_streams().await.unwrap(); assert_eq!(streams.len(), 2); @@ -744,7 +776,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { let streams = client.get_streams().await.unwrap(); assert!(streams.is_empty()); - // 44. Get clients and ensure that there's 0 (HTTP) or 1 (TCP, QUIC) client + // 46. Get clients and ensure that there's 0 (HTTP) or 1 (TCP, QUIC) client let clients = client.get_clients().await.unwrap(); assert!(clients.len() <= 1); diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index a01484f16..07332ffe4 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iggy" -version = "0.6.63" +version = "0.6.70" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2021" license = "Apache-2.0" diff --git a/sdk/src/binary/consumer_offsets.rs b/sdk/src/binary/consumer_offsets.rs index c10ac14a8..d14f41808 100644 --- a/sdk/src/binary/consumer_offsets.rs +++ b/sdk/src/binary/consumer_offsets.rs @@ -2,6 +2,7 @@ use crate::binary::binary_client::BinaryClient; use crate::binary::{fail_if_not_authenticated, mapper}; use crate::client::ConsumerOffsetClient; use crate::consumer::Consumer; +use crate::consumer_offsets::delete_consumer_offset::DeleteConsumerOffset; use crate::consumer_offsets::get_consumer_offset::GetConsumerOffset; use crate::consumer_offsets::store_consumer_offset::StoreConsumerOffset; use crate::error::IggyError; @@ -52,4 +53,22 @@ impl ConsumerOffsetClient for B { mapper::map_consumer_offset(response).map(Some) } + + async fn delete_consumer_offset( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option, + ) -> Result<(), IggyError> { + fail_if_not_authenticated(self).await?; + self.send_with_response(&DeleteConsumerOffset { + consumer: consumer.clone(), + stream_id: stream_id.clone(), + topic_id: topic_id.clone(), + partition_id, + }) + .await?; + Ok(()) + } } diff --git a/sdk/src/client.rs b/sdk/src/client.rs index bd866e796..dab88fa4e 100644 --- a/sdk/src/client.rs +++ b/sdk/src/client.rs @@ -375,6 +375,16 @@ pub trait ConsumerOffsetClient { topic_id: &Identifier, partition_id: Option, ) -> Result, IggyError>; + /// Delete the consumer offset for a specific consumer or consumer group for the given stream and topic by unique IDs or names. + /// + /// Authentication is required, and the permission to poll the messages. + async fn delete_consumer_offset( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option, + ) -> Result<(), IggyError>; } /// This trait defines the methods to interact with the consumer group module. diff --git a/sdk/src/clients/client.rs b/sdk/src/clients/client.rs index 06481a7a0..66a237eb2 100644 --- a/sdk/src/clients/client.rs +++ b/sdk/src/clients/client.rs @@ -644,6 +644,20 @@ impl ConsumerOffsetClient for IggyClient { .get_consumer_offset(consumer, stream_id, topic_id, partition_id) .await } + + async fn delete_consumer_offset( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option, + ) -> Result<(), IggyError> { + self.client + .read() + .await + .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) + .await + } } #[async_trait] diff --git a/sdk/src/clients/consumer.rs b/sdk/src/clients/consumer.rs index 2deca0b16..be6289192 100644 --- a/sdk/src/clients/consumer.rs +++ b/sdk/src/clients/consumer.rs @@ -89,6 +89,7 @@ pub struct IggyConsumer { last_polled_at: Arc, current_partition_id: Arc, retry_interval: IggyDuration, + allow_replay: bool, } impl IggyConsumer { @@ -108,6 +109,7 @@ impl IggyConsumer { create_consumer_group_if_not_exists: bool, encryptor: Option>, retry_interval: IggyDuration, + allow_replay: bool, ) -> Self { let (store_offset_sender, _) = flume::unbounded(); Self { @@ -159,6 +161,7 @@ impl IggyConsumer { last_polled_at: Arc::new(AtomicU64::new(0)), current_partition_id: Arc::new(AtomicU32::new(0)), retry_interval, + allow_replay, } } @@ -201,10 +204,24 @@ impl IggyConsumer { partition_id, offset, &self.last_stored_offsets, + self.allow_replay, ) .await } + /// Deletes the consumer offset on the server either for the current partition or the provided partition ID. + pub async fn delete_offset(&self, partition_id: Option) -> Result<(), IggyError> { + let client = self.client.read().await; + client + .delete_consumer_offset( + &self.consumer, + &self.stream_id, + &self.topic_id, + partition_id, + ) + .await + } + /// Initializes the consumer by subscribing to diagnostic events, initializing the consumer group if needed, storing the offsets in the background etc. /// /// Note: This method must be called before polling messages. @@ -261,6 +278,7 @@ impl IggyConsumer { partition_id, offset, &last_stored_offsets, + false, ) .await } @@ -270,6 +288,7 @@ impl IggyConsumer { Ok(()) } + #[allow(clippy::too_many_arguments)] async fn store_consumer_offset( client: &IggySharedMut>, consumer: &Consumer, @@ -278,6 +297,7 @@ impl IggyConsumer { partition_id: u32, offset: u64, last_stored_offsets: &DashMap, + allow_replay: bool, ) -> Result<(), IggyError> { trace!("Storing offset: {offset} for consumer: {consumer}, partition ID: {partition_id}, topic: {topic_id}, stream: {stream_id}..."); let stored_offset; @@ -288,7 +308,7 @@ impl IggyConsumer { last_stored_offsets.insert(partition_id, AtomicU64::new(0)); } - if offset <= stored_offset && offset >= 1 { + if !allow_replay && (offset <= stored_offset && offset >= 1) { trace!("Offset: {offset} is less than or equal to the last stored offset: {stored_offset} for consumer: {consumer}, partition ID: {partition_id}, topic: {topic_id}, stream: {stream_id}. Skipping storing the offset."); return Ok(()); } @@ -331,6 +351,7 @@ impl IggyConsumer { partition_id, consumed_offset, &last_stored_offsets, + false, ) .await; } @@ -480,6 +501,7 @@ impl IggyConsumer { let retry_interval = self.retry_interval; let last_stored_offset = self.last_stored_offsets.clone(); let last_consumed_offset = self.last_consumed_offsets.clone(); + let allow_replay = self.allow_replay; async move { if interval > 0 { @@ -524,7 +546,7 @@ impl IggyConsumer { last_consumed_offset.insert(partition_id, AtomicU64::new(0)); } - if has_consumed_offset { + if !allow_replay && has_consumed_offset { polled_messages .messages .retain(|message| message.offset > consumed_offset); @@ -559,7 +581,9 @@ impl IggyConsumer { polled_messages.current_offset ); - if has_consumed_offset && polled_messages.current_offset == consumed_offset { + if !allow_replay + && (has_consumed_offset && polled_messages.current_offset == consumed_offset) + { trace!("No new messages to consume in partition ID: {partition_id}, topic: {topic_id}, stream: {stream_id}, consumer: {consumer}"); if auto_commit_enabled && stored_offset < consumed_offset { trace!("Auto-committing the offset: {consumed_offset} in partition ID: {partition_id}, topic: {topic_id}, stream: {stream_id}, consumer: {consumer}"); @@ -844,6 +868,7 @@ pub struct IggyConsumerBuilder { create_consumer_group_if_not_exists: bool, encryptor: Option>, retry_interval: IggyDuration, + allow_replay: bool, } impl IggyConsumerBuilder { @@ -876,6 +901,7 @@ impl IggyConsumerBuilder { encryptor, polling_interval, retry_interval: IggyDuration::ONE_SECOND, + allow_replay: false, } } @@ -987,6 +1013,14 @@ impl IggyConsumerBuilder { } } + /// Allows replaying the messages, `false` by default. + pub fn allow_replay(self) -> Self { + Self { + allow_replay: true, + ..self + } + } + /// Builds the consumer. /// /// Note: After building the consumer, `init()` must be invoked before producing messages. @@ -1006,6 +1040,7 @@ impl IggyConsumerBuilder { self.create_consumer_group_if_not_exists, self.encryptor, self.retry_interval, + self.allow_replay, ) } } diff --git a/sdk/src/command.rs b/sdk/src/command.rs index b402e654f..09acf18de 100644 --- a/sdk/src/command.rs +++ b/sdk/src/command.rs @@ -55,6 +55,8 @@ pub const GET_CONSUMER_OFFSET: &str = "consumer_offset.get"; pub const GET_CONSUMER_OFFSET_CODE: u32 = 120; pub const STORE_CONSUMER_OFFSET: &str = "consumer_offset.store"; pub const STORE_CONSUMER_OFFSET_CODE: u32 = 121; +pub const DELETE_CONSUMER_OFFSET: &str = "consumer_offset.delete"; +pub const DELETE_CONSUMER_OFFSET_CODE: u32 = 122; pub const GET_STREAM: &str = "stream.get"; pub const GET_STREAM_CODE: u32 = 200; pub const GET_STREAMS: &str = "stream.list"; diff --git a/sdk/src/consumer_offsets/delete_consumer_offset.rs b/sdk/src/consumer_offsets/delete_consumer_offset.rs new file mode 100644 index 000000000..97c7d563b --- /dev/null +++ b/sdk/src/consumer_offsets/delete_consumer_offset.rs @@ -0,0 +1,187 @@ +use crate::bytes_serializable::BytesSerializable; +use crate::command::{Command, DELETE_CONSUMER_OFFSET_CODE}; +use crate::consumer::{Consumer, ConsumerKind}; +use crate::error::IggyError; +use crate::identifier::Identifier; +use crate::utils::sizeable::Sizeable; +use crate::validatable::Validatable; +use bytes::{BufMut, Bytes, BytesMut}; +use serde::{Deserialize, Serialize}; +use std::fmt::Display; + +/// `DeleteConsumerOffset` command deletes the offset of a consumer from a given partition on the server. +/// It has additional payload: +/// - `consumer` - the consumer that is deleting the offset, either the regular consumer or the consumer group. +/// - `stream_id` - unique stream ID (numeric or name). +/// - `topic_id` - unique topic ID (numeric or name). +/// - `partition_id` - partition ID on which the offset is stored. Has to be specified for the regular consumer. For consumer group it is ignored (use `None`). +#[derive(Debug, Serialize, Deserialize, PartialEq)] +pub struct DeleteConsumerOffset { + /// The consumer that is storing the offset, either the regular consumer or the consumer group. + #[serde(skip)] + pub consumer: Consumer, + /// Unique stream ID (numeric or name). + #[serde(skip)] + pub stream_id: Identifier, + /// Unique topic ID (numeric or name). + #[serde(skip)] + pub topic_id: Identifier, + /// Partition ID on which the offset is stored. Has to be specified for the regular consumer. For consumer group it is ignored (use `None`). + pub partition_id: Option, +} + +impl Default for DeleteConsumerOffset { + fn default() -> Self { + DeleteConsumerOffset { + consumer: Consumer::default(), + stream_id: Identifier::default(), + topic_id: Identifier::default(), + partition_id: Some(1), + } + } +} + +impl Command for DeleteConsumerOffset { + fn code(&self) -> u32 { + DELETE_CONSUMER_OFFSET_CODE + } +} + +impl Validatable for DeleteConsumerOffset { + fn validate(&self) -> Result<(), IggyError> { + Ok(()) + } +} + +impl BytesSerializable for DeleteConsumerOffset { + fn to_bytes(&self) -> Bytes { + let consumer_bytes = self.consumer.to_bytes(); + let stream_id_bytes = self.stream_id.to_bytes(); + let topic_id_bytes = self.topic_id.to_bytes(); + let mut bytes = BytesMut::with_capacity( + 12 + consumer_bytes.len() + stream_id_bytes.len() + topic_id_bytes.len(), + ); + bytes.put_slice(&consumer_bytes); + bytes.put_slice(&stream_id_bytes); + bytes.put_slice(&topic_id_bytes); + if let Some(partition_id) = self.partition_id { + bytes.put_u32_le(partition_id); + } else { + bytes.put_u32_le(0); + } + bytes.freeze() + } + + fn from_bytes(bytes: Bytes) -> Result { + if bytes.len() < 15 { + return Err(IggyError::InvalidCommand); + } + + let mut position = 0; + let consumer_kind = ConsumerKind::from_code(bytes[0])?; + let consumer_id = Identifier::from_bytes(bytes.slice(1..))?; + position += 1 + consumer_id.get_size_bytes().as_bytes_usize(); + let consumer = Consumer { + kind: consumer_kind, + id: consumer_id, + }; + let stream_id = Identifier::from_bytes(bytes.slice(position..))?; + position += stream_id.get_size_bytes().as_bytes_usize(); + let topic_id = Identifier::from_bytes(bytes.slice(position..))?; + position += topic_id.get_size_bytes().as_bytes_usize(); + let partition_id = u32::from_le_bytes( + bytes[position..position + 4] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ); + let partition_id = if partition_id == 0 { + None + } else { + Some(partition_id) + }; + let command = DeleteConsumerOffset { + consumer, + stream_id, + topic_id, + partition_id, + }; + Ok(command) + } +} + +impl Display for DeleteConsumerOffset { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}|{}|{}|{}", + self.consumer, + self.stream_id, + self.topic_id, + self.partition_id.unwrap_or(0), + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn should_be_serialized_as_bytes() { + let command = DeleteConsumerOffset { + consumer: Consumer::new(Identifier::numeric(1).unwrap()), + stream_id: Identifier::numeric(2).unwrap(), + topic_id: Identifier::numeric(3).unwrap(), + partition_id: Some(4), + }; + + let bytes = command.to_bytes(); + let mut position = 0; + let consumer_kind = ConsumerKind::from_code(bytes[0]).unwrap(); + let consumer_id = Identifier::from_bytes(bytes.slice(1..)).unwrap(); + position += 1 + consumer_id.get_size_bytes().as_bytes_usize(); + let consumer = Consumer { + kind: consumer_kind, + id: consumer_id, + }; + let stream_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); + position += stream_id.get_size_bytes().as_bytes_usize(); + let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); + position += topic_id.get_size_bytes().as_bytes_usize(); + let partition_id = u32::from_le_bytes(bytes[position..position + 4].try_into().unwrap()); + + assert!(!bytes.is_empty()); + assert_eq!(consumer, command.consumer); + assert_eq!(stream_id, command.stream_id); + assert_eq!(topic_id, command.topic_id); + assert_eq!(Some(partition_id), command.partition_id); + } + + #[test] + fn should_be_deserialized_from_bytes() { + let consumer = Consumer::new(Identifier::numeric(1).unwrap()); + let stream_id = Identifier::numeric(2).unwrap(); + let topic_id = Identifier::numeric(3).unwrap(); + let partition_id = 4u32; + + let consumer_bytes = consumer.to_bytes(); + let stream_id_bytes = stream_id.to_bytes(); + let topic_id_bytes = topic_id.to_bytes(); + let mut bytes = BytesMut::with_capacity( + 12 + consumer_bytes.len() + stream_id_bytes.len() + topic_id_bytes.len(), + ); + bytes.put_slice(&consumer_bytes); + bytes.put_slice(&stream_id_bytes); + bytes.put_slice(&topic_id_bytes); + bytes.put_u32_le(partition_id); + + let command = DeleteConsumerOffset::from_bytes(bytes.freeze()); + assert!(command.is_ok()); + + let command = command.unwrap(); + assert_eq!(command.consumer, consumer); + assert_eq!(command.stream_id, stream_id); + assert_eq!(command.topic_id, topic_id); + assert_eq!(command.partition_id, Some(partition_id)); + } +} diff --git a/sdk/src/consumer_offsets/mod.rs b/sdk/src/consumer_offsets/mod.rs index d2ca7b2e2..4a37bc615 100644 --- a/sdk/src/consumer_offsets/mod.rs +++ b/sdk/src/consumer_offsets/mod.rs @@ -1,2 +1,3 @@ +pub mod delete_consumer_offset; pub mod get_consumer_offset; pub mod store_consumer_offset; diff --git a/sdk/src/error.rs b/sdk/src/error.rs index 0fc56abab..e4c01741f 100644 --- a/sdk/src/error.rs +++ b/sdk/src/error.rs @@ -266,6 +266,8 @@ pub enum IggyError { CannotCreateConsumerOffsetsDirectory(String) = 3012, #[error("Failed to read consumers offsets from path: {0}")] CannotReadConsumerOffsets(String) = 3020, + #[error("Consumer offset for consumer with ID: {0} was not found.")] + ConsumerOffsetNotFound(u32) = 3021, #[error("Segment not found")] SegmentNotFound = 4000, #[error("Segment with start offset: {0} and partition with ID: {1} is closed")] diff --git a/sdk/src/http/client.rs b/sdk/src/http/client.rs index c62354d64..69f02eb2c 100644 --- a/sdk/src/http/client.rs +++ b/sdk/src/http/client.rs @@ -8,7 +8,7 @@ use crate::models::identity_info::IdentityInfo; use crate::utils::duration::IggyDuration; use async_broadcast::{broadcast, Receiver, Sender}; use async_trait::async_trait; -use reqwest::{Response, Url}; +use reqwest::{Response, StatusCode, Url}; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; use serde::Serialize; @@ -268,12 +268,18 @@ impl HttpClient { } async fn handle_response(response: Response) -> Result { - match response.status().is_success() { + let status = response.status(); + match status.is_success() { true => Ok(response), - false => Err(IggyError::HttpResponseError( - response.status().as_u16(), - response.text().await.unwrap_or("error".to_string()), - )), + false => { + let reason = response.text().await.unwrap_or("error".to_string()); + match status { + StatusCode::UNAUTHORIZED => Err(IggyError::Unauthenticated), + StatusCode::FORBIDDEN => Err(IggyError::Unauthorized), + StatusCode::NOT_FOUND => Err(IggyError::ResourceNotFound(reason)), + _ => Err(IggyError::HttpResponseError(status.as_u16(), reason)), + } + } } } diff --git a/sdk/src/http/consumer_groups.rs b/sdk/src/http/consumer_groups.rs index 462522d1f..1cd97f97d 100644 --- a/sdk/src/http/consumer_groups.rs +++ b/sdk/src/http/consumer_groups.rs @@ -21,12 +21,16 @@ impl ConsumerGroupClient for HttpClient { get_path(&stream_id.as_cow_str(), &topic_id.as_cow_str()), group_id )) - .await?; - if response.status() == 404 { - return Ok(None); + .await; + if let Err(error) = response { + if matches!(error, IggyError::ResourceNotFound(_)) { + return Ok(None); + } + + return Err(error); } - let consumer_group = response + let consumer_group = response? .json() .await .map_err(|_| IggyError::InvalidJsonResponse)?; diff --git a/sdk/src/http/consumer_offsets.rs b/sdk/src/http/consumer_offsets.rs index f4fa4afd7..5c522fa47 100644 --- a/sdk/src/http/consumer_offsets.rs +++ b/sdk/src/http/consumer_offsets.rs @@ -50,17 +50,40 @@ impl ConsumerOffsetClient for HttpClient { partition_id, }, ) - .await?; - if response.status() == 404 { - return Ok(None); + .await; + if let Err(error) = response { + if matches!(error, IggyError::ResourceNotFound(_)) { + return Ok(None); + } + + return Err(error); } - let offset = response + let offset = response? .json() .await .map_err(|_| IggyError::InvalidJsonResponse)?; Ok(Some(offset)) } + + async fn delete_consumer_offset( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option, + ) -> Result<(), IggyError> { + let partition_id = partition_id + .map(|id| format!("?partition_id={id}")) + .unwrap_or_default(); + let path = format!( + "{}/{}{partition_id}", + get_path(&stream_id.as_cow_str(), &topic_id.as_cow_str()), + consumer.id + ); + self.delete(&path).await?; + Ok(()) + } } fn get_path(stream_id: &str, topic_id: &str) -> String { diff --git a/sdk/src/http/streams.rs b/sdk/src/http/streams.rs index 38f413fd8..24ff0ea91 100644 --- a/sdk/src/http/streams.rs +++ b/sdk/src/http/streams.rs @@ -13,12 +13,16 @@ const PATH: &str = "/streams"; #[async_trait] impl StreamClient for HttpClient { async fn get_stream(&self, stream_id: &Identifier) -> Result, IggyError> { - let response = self.get(&get_details_path(&stream_id.as_cow_str())).await?; - if response.status() == 404 { - return Ok(None); + let response = self.get(&get_details_path(&stream_id.as_cow_str())).await; + if let Err(error) = response { + if matches!(error, IggyError::ResourceNotFound(_)) { + return Ok(None); + } + + return Err(error); } - let stream = response + let stream = response? .json() .await .map_err(|_| IggyError::InvalidJsonResponse)?; diff --git a/sdk/src/http/system.rs b/sdk/src/http/system.rs index 56763fa52..061b5c73c 100644 --- a/sdk/src/http/system.rs +++ b/sdk/src/http/system.rs @@ -31,12 +31,16 @@ impl SystemClient for HttpClient { } async fn get_client(&self, client_id: u32) -> Result, IggyError> { - let response = self.get(&format!("{}/{}", CLIENTS, client_id)).await?; - if response.status() == 404 { - return Ok(None); + let response = self.get(&format!("{}/{}", CLIENTS, client_id)).await; + if let Err(error) = response { + if matches!(error, IggyError::ResourceNotFound(_)) { + return Ok(None); + } + + return Err(error); } - let client = response + let client = response? .json() .await .map_err(|_| IggyError::InvalidJsonResponse)?; diff --git a/sdk/src/http/topics.rs b/sdk/src/http/topics.rs index 62247d87a..4f75c6775 100644 --- a/sdk/src/http/topics.rs +++ b/sdk/src/http/topics.rs @@ -23,12 +23,16 @@ impl TopicClient for HttpClient { &stream_id.as_cow_str(), &topic_id.as_cow_str(), )) - .await?; - if response.status() == 404 { - return Ok(None); + .await; + if let Err(error) = response { + if matches!(error, IggyError::ResourceNotFound(_)) { + return Ok(None); + } + + return Err(error); } - let topic = response + let topic = response? .json() .await .map_err(|_| IggyError::InvalidJsonResponse)?; diff --git a/sdk/src/http/users.rs b/sdk/src/http/users.rs index 3307919de..b7bf11a78 100644 --- a/sdk/src/http/users.rs +++ b/sdk/src/http/users.rs @@ -19,12 +19,16 @@ const PATH: &str = "/users"; #[async_trait] impl UserClient for HttpClient { async fn get_user(&self, user_id: &Identifier) -> Result, IggyError> { - let response = self.get(&format!("{PATH}/{}", user_id)).await?; - if response.status() == 404 { - return Ok(None); + let response = self.get(&format!("{PATH}/{}", user_id)).await; + if let Err(error) = response { + if matches!(error, IggyError::ResourceNotFound(_)) { + return Ok(None); + } + + return Err(error); } - let user = response + let user = response? .json() .await .map_err(|_| IggyError::InvalidJsonResponse)?; diff --git a/server/Cargo.toml b/server/Cargo.toml index dbb850252..437cc929c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.101" +version = "0.4.110" edition = "2021" build = "src/build.rs" license = "Apache-2.0" diff --git a/server/server.http b/server/server.http index 45dd45702..067bd254f 100644 --- a/server/server.http +++ b/server/server.http @@ -307,6 +307,10 @@ Content-Type: application/json GET {{url}}/streams/{{stream_id}}/topics/{{topic_id}}/consumer-offsets?consumer_id={{consumer_id}}&partition_id={{partition_id}} Authorization: Bearer {{access_token}} +### +DELETE {{url}}/streams/{{stream_id}}/topics/{{topic_id}}/consumer-offsets/{{consumer_id}}?partition_id={{partition_id}} +Authorization: Bearer {{access_token}} + ### GET {{url}}/streams/{{stream_id}}/topics/{{topic_id}}/consumer-groups Authorization: Bearer {{access_token}} diff --git a/server/src/binary/command.rs b/server/src/binary/command.rs index 6da26bf62..f4ffaa1ef 100644 --- a/server/src/binary/command.rs +++ b/server/src/binary/command.rs @@ -137,6 +137,9 @@ async fn try_handle( ServerCommand::StoreConsumerOffset(command) => { store_consumer_offset_handler::handle(command, sender, session, system).await } + ServerCommand::DeleteConsumerOffset(command) => { + delete_consumer_offset_handler::handle(command, sender, session, system).await + } ServerCommand::GetStream(command) => { get_stream_handler::handle(command, sender, session, system).await } diff --git a/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs b/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs new file mode 100644 index 000000000..ba36f9eaf --- /dev/null +++ b/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs @@ -0,0 +1,33 @@ +use crate::binary::handlers::consumer_offsets::COMPONENT; +use crate::binary::sender::Sender; +use crate::streaming::session::Session; +use crate::streaming::systems::system::SharedSystem; +use anyhow::Result; +use error_set::ErrContext; +use iggy::consumer_offsets::delete_consumer_offset::DeleteConsumerOffset; +use iggy::error::IggyError; +use tracing::debug; + +pub async fn handle( + command: DeleteConsumerOffset, + sender: &mut dyn Sender, + session: &Session, + system: &SharedSystem, +) -> Result<(), IggyError> { + debug!("session: {session}, command: {command}"); + let system = system.read().await; + system + .delete_consumer_offset( + session, + command.consumer, + &command.stream_id, + &command.topic_id, + command.partition_id, + ) + .await + .with_error_context(|_| format!("{COMPONENT} - failed to delete consumer offset for stream_id: {}, topic_id: {}, partition_id: {:?}, session: {}", + command.stream_id, command.topic_id, command.partition_id, session + ))?; + sender.send_empty_ok_response().await?; + Ok(()) +} diff --git a/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs b/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs index 6e4ff375d..68d17b44f 100644 --- a/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs +++ b/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs @@ -29,7 +29,12 @@ pub async fn handle( return Ok(()); } - let offset = mapper::map_consumer_offset(&offset?); + let Some(offset) = offset? else { + sender.send_empty_ok_response().await?; + return Ok(()); + }; + + let offset = mapper::map_consumer_offset(&offset); sender.send_ok_response(&offset).await?; Ok(()) } diff --git a/server/src/binary/handlers/consumer_offsets/mod.rs b/server/src/binary/handlers/consumer_offsets/mod.rs index 1028f57fc..fbffb2c0b 100644 --- a/server/src/binary/handlers/consumer_offsets/mod.rs +++ b/server/src/binary/handlers/consumer_offsets/mod.rs @@ -1,3 +1,4 @@ +pub mod delete_consumer_offset_handler; pub mod get_consumer_offset_handler; pub mod store_consumer_offset_handler; diff --git a/server/src/command.rs b/server/src/command.rs index b34c20888..e419ba1d4 100644 --- a/server/src/command.rs +++ b/server/src/command.rs @@ -6,6 +6,7 @@ use iggy::consumer_groups::get_consumer_group::GetConsumerGroup; use iggy::consumer_groups::get_consumer_groups::GetConsumerGroups; use iggy::consumer_groups::join_consumer_group::JoinConsumerGroup; use iggy::consumer_groups::leave_consumer_group::LeaveConsumerGroup; +use iggy::consumer_offsets::delete_consumer_offset::DeleteConsumerOffset; use iggy::consumer_offsets::get_consumer_offset::GetConsumerOffset; use iggy::consumer_offsets::store_consumer_offset::StoreConsumerOffset; use iggy::error::IggyError; @@ -76,6 +77,7 @@ pub enum ServerCommand { FlushUnsavedBuffer(FlushUnsavedBuffer), GetConsumerOffset(GetConsumerOffset), StoreConsumerOffset(StoreConsumerOffset), + DeleteConsumerOffset(DeleteConsumerOffset), GetStream(GetStream), GetStreams(GetStreams), CreateStream(CreateStream), @@ -123,6 +125,7 @@ impl BytesSerializable for ServerCommand { ServerCommand::SendMessages(payload) => as_bytes(payload), ServerCommand::PollMessages(payload) => as_bytes(payload), ServerCommand::StoreConsumerOffset(payload) => as_bytes(payload), + ServerCommand::DeleteConsumerOffset(payload) => as_bytes(payload), ServerCommand::GetConsumerOffset(payload) => as_bytes(payload), ServerCommand::GetStream(payload) => as_bytes(payload), ServerCommand::GetStreams(payload) => as_bytes(payload), @@ -201,6 +204,9 @@ impl BytesSerializable for ServerCommand { STORE_CONSUMER_OFFSET_CODE => Ok(ServerCommand::StoreConsumerOffset( StoreConsumerOffset::from_bytes(payload)?, )), + DELETE_CONSUMER_OFFSET_CODE => Ok(ServerCommand::DeleteConsumerOffset( + DeleteConsumerOffset::from_bytes(payload)?, + )), GET_CONSUMER_OFFSET_CODE => Ok(ServerCommand::GetConsumerOffset( GetConsumerOffset::from_bytes(payload)?, )), @@ -294,6 +300,7 @@ impl Validatable for ServerCommand { ServerCommand::SendMessages(command) => command.validate(), ServerCommand::PollMessages(command) => command.validate(), ServerCommand::StoreConsumerOffset(command) => command.validate(), + ServerCommand::DeleteConsumerOffset(command) => command.validate(), ServerCommand::GetConsumerOffset(command) => command.validate(), ServerCommand::GetStream(command) => command.validate(), ServerCommand::GetStreams(command) => command.validate(), @@ -377,6 +384,9 @@ impl Display for ServerCommand { ServerCommand::StoreConsumerOffset(payload) => { write!(formatter, "{STORE_CONSUMER_OFFSET}|{payload}") } + ServerCommand::DeleteConsumerOffset(payload) => { + write!(formatter, "{DELETE_CONSUMER_OFFSET}|{payload}") + } ServerCommand::GetConsumerOffset(payload) => { write!(formatter, "{GET_CONSUMER_OFFSET}|{payload}") } diff --git a/server/src/http/consumer_offsets.rs b/server/src/http/consumer_offsets.rs index 2f91e7007..72f9a9d5e 100644 --- a/server/src/http/consumer_offsets.rs +++ b/server/src/http/consumer_offsets.rs @@ -5,10 +5,11 @@ use crate::http::COMPONENT; use crate::streaming::session::Session; use axum::extract::{Path, Query, State}; use axum::http::StatusCode; -use axum::routing::get; +use axum::routing::{delete, get}; use axum::{Extension, Json, Router}; use error_set::ErrContext; use iggy::consumer::Consumer; +use iggy::consumer_offsets::delete_consumer_offset::DeleteConsumerOffset; use iggy::consumer_offsets::get_consumer_offset::GetConsumerOffset; use iggy::consumer_offsets::store_consumer_offset::StoreConsumerOffset; use iggy::identifier::Identifier; @@ -22,6 +23,10 @@ pub fn router(state: Arc) -> Router { "/streams/{stream_id}/topics/{topic_id}/consumer-offsets", get(get_consumer_offset).put(store_consumer_offset), ) + .route( + "/streams/{stream_id}/topics/{topic_id}/consumer-offsets/{consumer_id}", + delete(delete_consumer_offset), + ) .with_state(state) } @@ -50,7 +55,11 @@ async fn get_consumer_offset( return Err(CustomError::ResourceNotFound); } - Ok(Json(offset?)) + let Some(offset) = offset? else { + return Err(CustomError::ResourceNotFound); + }; + + Ok(Json(offset)) } async fn store_consumer_offset( @@ -77,3 +86,24 @@ async fn store_consumer_offset( .with_error_context(|_| format!("{COMPONENT} - failed to store consumer offset, stream ID: {}, topic ID: {}, partition ID: {:?}", stream_id, topic_id, command.0.partition_id))?; Ok(StatusCode::NO_CONTENT) } + +async fn delete_consumer_offset( + State(state): State>, + Extension(identity): Extension, + Path((stream_id, topic_id, consumer_id)): Path<(String, String, String)>, + query: Query, +) -> Result { + let consumer = Consumer::new(consumer_id.try_into()?); + let system = state.system.read().await; + system + .delete_consumer_offset( + &Session::stateless(identity.user_id, identity.ip_address), + consumer, + &query.stream_id, + &query.topic_id, + query.partition_id, + ) + .await + .with_error_context(|_| format!("{COMPONENT} - failed to delete consumer offset, stream ID: {}, topic ID: {}, partition ID: {:?}", stream_id, topic_id, query.partition_id))?; + Ok(StatusCode::NO_CONTENT) +} diff --git a/server/src/http/error.rs b/server/src/http/error.rs index 216280baa..2a6bdc4b4 100644 --- a/server/src/http/error.rs +++ b/server/src/http/error.rs @@ -36,6 +36,7 @@ impl IntoResponse for CustomError { IggyError::ConsumerGroupIdNotFound(_, _) => StatusCode::NOT_FOUND, IggyError::ConsumerGroupNameNotFound(_, _) => StatusCode::NOT_FOUND, IggyError::ConsumerGroupMemberNotFound(_, _, _) => StatusCode::NOT_FOUND, + IggyError::ConsumerOffsetNotFound(_) => StatusCode::NOT_FOUND, IggyError::ResourceNotFound(_) => StatusCode::NOT_FOUND, IggyError::Unauthenticated => StatusCode::UNAUTHORIZED, IggyError::AccessTokenMissing => StatusCode::UNAUTHORIZED, diff --git a/server/src/streaming/partitions/consumer_offsets.rs b/server/src/streaming/partitions/consumer_offsets.rs index eccc0b254..be8dbb5c4 100644 --- a/server/src/streaming/partitions/consumer_offsets.rs +++ b/server/src/streaming/partitions/consumer_offsets.rs @@ -8,7 +8,10 @@ use iggy::error::IggyError; use tracing::trace; impl Partition { - pub async fn get_consumer_offset(&self, consumer: PollingConsumer) -> Result { + pub async fn get_consumer_offset( + &self, + consumer: PollingConsumer, + ) -> Result, IggyError> { trace!( "Getting consumer offset for {}, partition: {}, current: {}...", consumer, @@ -20,18 +23,18 @@ impl Partition { PollingConsumer::Consumer(consumer_id, _) => { let consumer_offset = self.consumer_offsets.get(&consumer_id); if let Some(consumer_offset) = consumer_offset { - return Ok(consumer_offset.offset); + return Ok(Some(consumer_offset.offset)); } } PollingConsumer::ConsumerGroup(consumer_group_id, _) => { let consumer_offset = self.consumer_offsets.get(&consumer_group_id); if let Some(consumer_offset) = consumer_offset { - return Ok(consumer_offset.offset); + return Ok(Some(consumer_offset.offset)); } } } - Ok(0) + Ok(None) } pub async fn store_consumer_offset( @@ -109,11 +112,11 @@ impl Partition { pub async fn load_consumer_offsets(&mut self) -> Result<(), IggyError> { trace!( - "Loading consumer offsets for partition with ID: {} for topic with ID: {} and stream with ID: {}...", - self.partition_id, - self.topic_id, - self.stream_id - ); + "Loading consumer offsets for partition with ID: {} for topic with ID: {} and stream with ID: {}...", + self.partition_id, + self.topic_id, + self.stream_id + ); self.load_consumer_offsets_from_storage(ConsumerKind::Consumer) .await .with_error_context(|_| { @@ -164,4 +167,34 @@ impl Partition { self.stream_id ); } + + pub async fn delete_consumer_offset( + &mut self, + consumer: PollingConsumer, + ) -> Result<(), IggyError> { + let partition_id = self.partition_id; + trace!( + "Deleting consumer offset for consumer: {consumer}, partition ID: {partition_id}..." + ); + match consumer { + PollingConsumer::Consumer(consumer_id, _) => { + let (_, offset) = self + .consumer_offsets + .remove(&consumer_id) + .ok_or(IggyError::ConsumerOffsetNotFound(consumer_id))?; + self.storage.partition.delete_consumer_offset(&offset.path).await + .with_error_context(|_| format!("{COMPONENT} - failed to delete consumer offset, consumer ID: {consumer_id}, partition ID: {partition_id}"))?; + } + PollingConsumer::ConsumerGroup(consumer_id, _) => { + let (_, offset) = self + .consumer_group_offsets + .remove(&consumer_id) + .ok_or(IggyError::ConsumerOffsetNotFound(consumer_id))?; + self.storage.partition.delete_consumer_offset(&offset.path).await + .with_error_context(|_| format!("{COMPONENT} - failed to delete consumer group offset, consumer ID: {consumer_id}, partition ID: {partition_id}"))?; + } + }; + trace!("Deleted consumer offset for consumer: {consumer}, partition ID: {partition_id}."); + Ok(()) + } } diff --git a/server/src/streaming/systems/consumer_offsets.rs b/server/src/streaming/systems/consumer_offsets.rs index 76b38b5cf..38f84c847 100644 --- a/server/src/streaming/systems/consumer_offsets.rs +++ b/server/src/streaming/systems/consumer_offsets.rs @@ -37,7 +37,7 @@ impl System { stream_id: &Identifier, topic_id: &Identifier, partition_id: Option, - ) -> Result { + ) -> Result, IggyError> { self.ensure_authenticated(session)?; let topic = self.find_topic(session, stream_id, topic_id).with_error_context(|_| format!("{COMPONENT} - topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?; self.permissioner.get_consumer_offset( @@ -57,4 +57,25 @@ impl System { .get_consumer_offset(consumer, partition_id, session.client_id) .await } + + pub async fn delete_consumer_offset( + &self, + session: &Session, + consumer: Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option, + ) -> Result<(), IggyError> { + self.ensure_authenticated(session)?; + let topic = self.find_topic(session, stream_id, topic_id).with_error_context(|_| format!("{COMPONENT} - topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?; + self.permissioner.delete_consumer_offset( + session.get_user_id(), + topic.stream_id, + topic.topic_id, + )?; + + topic + .delete_consumer_offset(consumer, partition_id, session.client_id) + .await + } } diff --git a/server/src/streaming/topics/consumer_offsets.rs b/server/src/streaming/topics/consumer_offsets.rs index 825bb64db..460b34de3 100644 --- a/server/src/streaming/topics/consumer_offsets.rs +++ b/server/src/streaming/topics/consumer_offsets.rs @@ -20,7 +20,7 @@ impl Topic { .await .with_error_context(|_| format!("{COMPONENT} - failed to resolve consumer with partition id, consumer ID: {}, client ID: {}, partition ID: {:?}", consumer.id, client_id, partition_id))?; let partition = self.get_partition(partition_id).with_error_context(|_| { - format!("{COMPONENT} - failed to get partiton with id: {partition_id}") + format!("{COMPONENT} - failed to get partition with id: {partition_id}") })?; let partition = partition.read().await; partition @@ -47,7 +47,7 @@ impl Topic { consumer: &Consumer, partition_id: Option, client_id: u32, - ) -> Result { + ) -> Result, IggyError> { let (polling_consumer, partition_id) = self .resolve_consumer_with_partition_id(consumer, client_id, partition_id, false) .await @@ -62,10 +62,38 @@ impl Topic { .with_error_context(|_| { format!("{COMPONENT} - failed to get consumer offset, consumer: {polling_consumer}") })?; - Ok(ConsumerOffsetInfo { + let Some(offset) = offset else { + return Ok(None); + }; + + Ok(Some(ConsumerOffsetInfo { partition_id: partition.partition_id, current_offset: partition.current_offset, stored_offset: offset, - }) + })) + } + + pub async fn delete_consumer_offset( + &self, + consumer: Consumer, + partition_id: Option, + client_id: u32, + ) -> Result<(), IggyError> { + let (polling_consumer, partition_id) = self + .resolve_consumer_with_partition_id(&consumer, client_id, partition_id, false) + .await + .with_error_context(|_| format!("{COMPONENT} - failed to resolve consumer with partition id, consumer ID: {}, client ID: {}, partition ID: {:?}", consumer.id, client_id, partition_id))?; + let partition = self.get_partition(partition_id).with_error_context(|_| { + format!("{COMPONENT} - failed to get partition with id: {partition_id}") + })?; + let mut partition = partition.write().await; + partition + .delete_consumer_offset(polling_consumer) + .await + .with_error_context(|_| { + format!( + "{COMPONENT} - failed to delete consumer offset, consumer: {polling_consumer}" + ) + }) } } diff --git a/server/src/streaming/topics/partitions.rs b/server/src/streaming/topics/partitions.rs index 610b90cd4..3820b4e39 100644 --- a/server/src/streaming/topics/partitions.rs +++ b/server/src/streaming/topics/partitions.rs @@ -63,7 +63,7 @@ impl Topic { let partition = partition.read().await; partition.persist().await.with_error_context(|_| { format!( - "{COMPONENT} - failed to persist partiton with id: {}", + "{COMPONENT} - failed to persist partition with id: {}", partition.partition_id ) })?; @@ -94,7 +94,7 @@ impl Topic { messages_count += partition_messages_count; partition.delete().await.with_error_context(|_| { format!( - "{COMPONENT} - failed to delete partiton with id: {}", + "{COMPONENT} - failed to delete partition with id: {}", partition.partition_id ) })?; diff --git a/server/src/streaming/topics/storage.rs b/server/src/streaming/topics/storage.rs index b6966f996..ae08e1127 100644 --- a/server/src/streaming/topics/storage.rs +++ b/server/src/streaming/topics/storage.rs @@ -153,7 +153,7 @@ impl TopicStorage for FileTopicStorage { ) .await; partition.persist().await.with_error_context(|_| { - format!("{COMPONENT} - failed to persist partiton: {partition}") + format!("{COMPONENT} - failed to persist partition: {partition}") })?; partition.segments.clear(); unloaded_partitions.push(partition); diff --git a/server/src/streaming/users/permissioner_rules/consumer_offsets.rs b/server/src/streaming/users/permissioner_rules/consumer_offsets.rs index d504796a2..c6fa00a19 100644 --- a/server/src/streaming/users/permissioner_rules/consumer_offsets.rs +++ b/server/src/streaming/users/permissioner_rules/consumer_offsets.rs @@ -19,4 +19,13 @@ impl Permissioner { ) -> Result<(), IggyError> { self.poll_messages(user_id, stream_id, topic_id) } + + pub fn delete_consumer_offset( + &self, + user_id: u32, + stream_id: u32, + topic_id: u32, + ) -> Result<(), IggyError> { + self.poll_messages(user_id, stream_id, topic_id) + } }