Skip to content

Commit

Permalink
Allow replaying messages for high-level consumer, add delete consumer…
Browse files Browse the repository at this point in the history
… offset (#1436)
  • Loading branch information
spetz authored Jan 14, 2025
1 parent 71928b5 commit 7155d50
Show file tree
Hide file tree
Showing 33 changed files with 611 additions and 82 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

82 changes: 57 additions & 25 deletions integration/tests/server/scenarios/system_scenario.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
.unwrap();
assert!(polled_messages.messages.is_empty());

// 22. Get the existing customer offset and ensure it's 0
// 22. Get the customer offset and ensure it's none
let offset = client
.get_consumer_offset(
&consumer,
Expand All @@ -325,11 +325,8 @@ pub async fn run(client_factory: &dyn ClientFactory) {
Some(PARTITION_ID),
)
.await
.unwrap()
.expect("Failed to get consumer offset");
assert_eq!(offset.partition_id, PARTITION_ID);
assert_eq!(offset.current_offset, (MESSAGES_COUNT - 1) as u64);
assert_eq!(offset.stored_offset, 0);
assert!(offset.is_none());

// 23. Store the consumer offset
let stored_offset = 10;
Expand Down Expand Up @@ -359,7 +356,42 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(offset.current_offset, (MESSAGES_COUNT - 1) as u64);
assert_eq!(offset.stored_offset, stored_offset);

// 25. Poll messages from the specific partition in topic using next with auto commit
// 25. Delete the consumer offset
client
.delete_consumer_offset(
&consumer,
&Identifier::numeric(STREAM_ID).unwrap(),
&Identifier::numeric(TOPIC_ID).unwrap(),
Some(PARTITION_ID),
)
.await
.unwrap();

// 26. Get the customer offset and ensure it's none
let offset = client
.get_consumer_offset(
&consumer,
&Identifier::numeric(STREAM_ID).unwrap(),
&Identifier::numeric(TOPIC_ID).unwrap(),
Some(PARTITION_ID),
)
.await
.expect("Failed to get consumer offset");

assert!(offset.is_none());

client
.store_consumer_offset(
&consumer,
&Identifier::numeric(STREAM_ID).unwrap(),
&Identifier::numeric(TOPIC_ID).unwrap(),
Some(PARTITION_ID),
stored_offset,
)
.await
.unwrap();

// 27. Poll messages from the specific partition in topic using next with auto commit
let messages_count = 10;
let polled_messages = client
.poll_messages(
Expand All @@ -380,7 +412,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(first_offset, stored_offset + 1);
assert_eq!(last_offset, expected_last_offset);

// 26. Get the existing customer offset and ensure that auto commit during poll has worked
// 28. Get the existing customer offset and ensure that auto commit during poll has worked
let offset = client
.get_consumer_offset(
&consumer,
Expand All @@ -395,7 +427,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(offset.current_offset, (MESSAGES_COUNT - 1) as u64);
assert_eq!(offset.stored_offset, expected_last_offset);

// 27. Get the consumer groups and validate that there are no groups
// 29. Get the consumer groups and validate that there are no groups
let consumer_groups = client
.get_consumer_groups(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand All @@ -406,7 +438,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {

assert!(consumer_groups.is_empty());

// 28. Create the consumer group
// 30. Create the consumer group
let consumer_group = client
.create_consumer_group(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand All @@ -420,7 +452,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(consumer_group.id, CONSUMER_GROUP_ID);
assert_eq!(consumer_group.name, CONSUMER_GROUP_NAME);

// 29. Get the consumer groups and validate that there is one group
// 31. Get the consumer groups and validate that there is one group
let consumer_groups = client
.get_consumer_groups(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand All @@ -435,7 +467,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(consumer_group.partitions_count, PARTITIONS_COUNT);
assert_eq!(consumer_group.members_count, 0);

// 30. Get the consumer group details
// 32. Get the consumer group details
let consumer_group = client
.get_consumer_group(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand All @@ -451,7 +483,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(consumer_group.members_count, 0);
assert!(consumer_group.members.is_empty());

// 31. Join the consumer group and then leave it if the feature is available
// 33. Join the consumer group and then leave it if the feature is available
let result = client
.join_consumer_group(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand Down Expand Up @@ -493,7 +525,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
Err(e) => assert_eq!(e.as_code(), IggyError::FeatureUnavailable.as_code()),
}

// 32. Get the stats and validate that there is one stream
// 34. Get the stats and validate that there is one stream
let stats = client.get_stats().await.unwrap();
assert!(!stats.hostname.is_empty());
assert!(!stats.os_name.is_empty());
Expand All @@ -505,7 +537,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(stats.segments_count, PARTITIONS_COUNT);
assert_eq!(stats.messages_count, MESSAGES_COUNT as u64);

// 33. Delete the consumer group
// 35. Delete the consumer group
client
.delete_consumer_group(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand All @@ -515,7 +547,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
.await
.unwrap();

// 34. Create new partitions and validate that the number of partitions is increased
// 36. Create new partitions and validate that the number of partitions is increased
client
.create_partitions(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand All @@ -536,7 +568,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {

assert_eq!(topic.partitions_count, 2 * PARTITIONS_COUNT);

// 35. Delete the partitions and validate that the number of partitions is decreased
// 37. Delete the partitions and validate that the number of partitions is decreased
client
.delete_partitions(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand All @@ -557,7 +589,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {

assert_eq!(topic.partitions_count, PARTITIONS_COUNT);

// 36. Update the existing topic and ensure it's updated
// 38. Update the existing topic and ensure it's updated
let updated_topic_name = format!("{}-updated", TOPIC_NAME);
let updated_message_expiry = 1000;
let message_expiry_duration = updated_message_expiry.into();
Expand Down Expand Up @@ -598,7 +630,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(updated_topic.max_topic_size, updated_max_topic_size);
assert_eq!(updated_topic.replication_factor, updated_replication_factor);

// 37. Purge the existing topic and ensure it has no messages
// 39. Purge the existing topic and ensure it has no messages
client
.purge_topic(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand All @@ -622,7 +654,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(polled_messages.current_offset, 0);
assert!(polled_messages.messages.is_empty());

// 38. Update the existing stream and ensure it's updated
// 40. Update the existing stream and ensure it's updated
let updated_stream_name = format!("{}-updated", STREAM_NAME);

client
Expand All @@ -641,7 +673,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {

assert_eq!(updated_stream.name, updated_stream_name);

// 39. Purge the existing stream and ensure it has no messages
// 41. Purge the existing stream and ensure it has no messages
let mut messages = create_messages();
client
.send_messages(
Expand Down Expand Up @@ -673,7 +705,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(polled_messages.current_offset, 0);
assert!(polled_messages.messages.is_empty());

// 40. Delete the existing topic and ensure it doesn't exist anymore
// 42. Delete the existing topic and ensure it doesn't exist anymore
client
.delete_topic(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand All @@ -687,7 +719,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
.unwrap();
assert!(topics.is_empty());

// 41. Create the stream with automatically generated ID on the server
// 43. Create the stream with automatically generated ID on the server
let stream_name = format!("{}-auto", STREAM_NAME);
let stream_id = STREAM_ID + 1;
client.create_stream(&stream_name, None).await.unwrap();
Expand All @@ -701,7 +733,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(stream.id, stream_id);
assert_eq!(stream.name, stream_name);

// 42. Create the topic with automatically generated ID on the server
// 44. Create the topic with automatically generated ID on the server
let topic_name = format!("{}-auto", TOPIC_NAME);
let topic_id = 1;
client
Expand Down Expand Up @@ -730,7 +762,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(topic.id, topic_id);
assert_eq!(topic.name, topic_name);

// 43. Delete the existing streams and ensure there's no streams left
// 45. Delete the existing streams and ensure there's no streams left
let streams = client.get_streams().await.unwrap();
assert_eq!(streams.len(), 2);

Expand All @@ -744,7 +776,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
let streams = client.get_streams().await.unwrap();
assert!(streams.is_empty());

// 44. Get clients and ensure that there's 0 (HTTP) or 1 (TCP, QUIC) client
// 46. Get clients and ensure that there's 0 (HTTP) or 1 (TCP, QUIC) client
let clients = client.get_clients().await.unwrap();

assert!(clients.len() <= 1);
Expand Down
2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.63"
version = "0.6.70"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "Apache-2.0"
Expand Down
19 changes: 19 additions & 0 deletions sdk/src/binary/consumer_offsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::binary::binary_client::BinaryClient;
use crate::binary::{fail_if_not_authenticated, mapper};
use crate::client::ConsumerOffsetClient;
use crate::consumer::Consumer;
use crate::consumer_offsets::delete_consumer_offset::DeleteConsumerOffset;
use crate::consumer_offsets::get_consumer_offset::GetConsumerOffset;
use crate::consumer_offsets::store_consumer_offset::StoreConsumerOffset;
use crate::error::IggyError;
Expand Down Expand Up @@ -52,4 +53,22 @@ impl<B: BinaryClient> ConsumerOffsetClient for B {

mapper::map_consumer_offset(response).map(Some)
}

async fn delete_consumer_offset(
&self,
consumer: &Consumer,
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: Option<u32>,
) -> Result<(), IggyError> {
fail_if_not_authenticated(self).await?;
self.send_with_response(&DeleteConsumerOffset {
consumer: consumer.clone(),
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
partition_id,
})
.await?;
Ok(())
}
}
10 changes: 10 additions & 0 deletions sdk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,16 @@ pub trait ConsumerOffsetClient {
topic_id: &Identifier,
partition_id: Option<u32>,
) -> Result<Option<ConsumerOffsetInfo>, IggyError>;
/// Delete the consumer offset for a specific consumer or consumer group for the given stream and topic by unique IDs or names.
///
/// Authentication is required, and the permission to poll the messages.
async fn delete_consumer_offset(
&self,
consumer: &Consumer,
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: Option<u32>,
) -> Result<(), IggyError>;
}

/// This trait defines the methods to interact with the consumer group module.
Expand Down
14 changes: 14 additions & 0 deletions sdk/src/clients/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,20 @@ impl ConsumerOffsetClient for IggyClient {
.get_consumer_offset(consumer, stream_id, topic_id, partition_id)
.await
}

async fn delete_consumer_offset(
&self,
consumer: &Consumer,
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: Option<u32>,
) -> Result<(), IggyError> {
self.client
.read()
.await
.delete_consumer_offset(consumer, stream_id, topic_id, partition_id)
.await
}
}

#[async_trait]
Expand Down
Loading

0 comments on commit 7155d50

Please sign in to comment.