Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow replaying messages for high-level consumer, add delete consumer offset #1436

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

82 changes: 57 additions & 25 deletions integration/tests/server/scenarios/system_scenario.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
.unwrap();
assert!(polled_messages.messages.is_empty());

// 22. Get the existing customer offset and ensure it's 0
// 22. Get the customer offset and ensure it's none
let offset = client
.get_consumer_offset(
&consumer,
Expand All @@ -325,11 +325,8 @@ pub async fn run(client_factory: &dyn ClientFactory) {
Some(PARTITION_ID),
)
.await
.unwrap()
.expect("Failed to get consumer offset");
assert_eq!(offset.partition_id, PARTITION_ID);
assert_eq!(offset.current_offset, (MESSAGES_COUNT - 1) as u64);
assert_eq!(offset.stored_offset, 0);
assert!(offset.is_none());

// 23. Store the consumer offset
let stored_offset = 10;
Expand Down Expand Up @@ -359,7 +356,42 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(offset.current_offset, (MESSAGES_COUNT - 1) as u64);
assert_eq!(offset.stored_offset, stored_offset);

// 25. Poll messages from the specific partition in topic using next with auto commit
// 25. Delete the consumer offset
client
.delete_consumer_offset(
&consumer,
&Identifier::numeric(STREAM_ID).unwrap(),
&Identifier::numeric(TOPIC_ID).unwrap(),
Some(PARTITION_ID),
)
.await
.unwrap();

// 26. Get the customer offset and ensure it's none
let offset = client
.get_consumer_offset(
&consumer,
&Identifier::numeric(STREAM_ID).unwrap(),
&Identifier::numeric(TOPIC_ID).unwrap(),
Some(PARTITION_ID),
)
.await
.expect("Failed to get consumer offset");

assert!(offset.is_none());

client
.store_consumer_offset(
&consumer,
&Identifier::numeric(STREAM_ID).unwrap(),
&Identifier::numeric(TOPIC_ID).unwrap(),
Some(PARTITION_ID),
stored_offset,
)
.await
.unwrap();

// 27. Poll messages from the specific partition in topic using next with auto commit
let messages_count = 10;
let polled_messages = client
.poll_messages(
Expand All @@ -380,7 +412,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(first_offset, stored_offset + 1);
assert_eq!(last_offset, expected_last_offset);

// 26. Get the existing customer offset and ensure that auto commit during poll has worked
// 28. Get the existing customer offset and ensure that auto commit during poll has worked
let offset = client
.get_consumer_offset(
&consumer,
Expand All @@ -395,7 +427,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(offset.current_offset, (MESSAGES_COUNT - 1) as u64);
assert_eq!(offset.stored_offset, expected_last_offset);

// 27. Get the consumer groups and validate that there are no groups
// 29. Get the consumer groups and validate that there are no groups
let consumer_groups = client
.get_consumer_groups(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand All @@ -406,7 +438,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {

assert!(consumer_groups.is_empty());

// 28. Create the consumer group
// 30. Create the consumer group
let consumer_group = client
.create_consumer_group(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand All @@ -420,7 +452,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(consumer_group.id, CONSUMER_GROUP_ID);
assert_eq!(consumer_group.name, CONSUMER_GROUP_NAME);

// 29. Get the consumer groups and validate that there is one group
// 31. Get the consumer groups and validate that there is one group
let consumer_groups = client
.get_consumer_groups(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand All @@ -435,7 +467,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(consumer_group.partitions_count, PARTITIONS_COUNT);
assert_eq!(consumer_group.members_count, 0);

// 30. Get the consumer group details
// 32. Get the consumer group details
let consumer_group = client
.get_consumer_group(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand All @@ -451,7 +483,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(consumer_group.members_count, 0);
assert!(consumer_group.members.is_empty());

// 31. Join the consumer group and then leave it if the feature is available
// 33. Join the consumer group and then leave it if the feature is available
let result = client
.join_consumer_group(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand Down Expand Up @@ -493,7 +525,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
Err(e) => assert_eq!(e.as_code(), IggyError::FeatureUnavailable.as_code()),
}

// 32. Get the stats and validate that there is one stream
// 34. Get the stats and validate that there is one stream
let stats = client.get_stats().await.unwrap();
assert!(!stats.hostname.is_empty());
assert!(!stats.os_name.is_empty());
Expand All @@ -505,7 +537,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(stats.segments_count, PARTITIONS_COUNT);
assert_eq!(stats.messages_count, MESSAGES_COUNT as u64);

// 33. Delete the consumer group
// 35. Delete the consumer group
client
.delete_consumer_group(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand All @@ -515,7 +547,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
.await
.unwrap();

// 34. Create new partitions and validate that the number of partitions is increased
// 36. Create new partitions and validate that the number of partitions is increased
client
.create_partitions(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand All @@ -536,7 +568,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {

assert_eq!(topic.partitions_count, 2 * PARTITIONS_COUNT);

// 35. Delete the partitions and validate that the number of partitions is decreased
// 37. Delete the partitions and validate that the number of partitions is decreased
client
.delete_partitions(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand All @@ -557,7 +589,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {

assert_eq!(topic.partitions_count, PARTITIONS_COUNT);

// 36. Update the existing topic and ensure it's updated
// 38. Update the existing topic and ensure it's updated
let updated_topic_name = format!("{}-updated", TOPIC_NAME);
let updated_message_expiry = 1000;
let message_expiry_duration = updated_message_expiry.into();
Expand Down Expand Up @@ -598,7 +630,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(updated_topic.max_topic_size, updated_max_topic_size);
assert_eq!(updated_topic.replication_factor, updated_replication_factor);

// 37. Purge the existing topic and ensure it has no messages
// 39. Purge the existing topic and ensure it has no messages
client
.purge_topic(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand All @@ -622,7 +654,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(polled_messages.current_offset, 0);
assert!(polled_messages.messages.is_empty());

// 38. Update the existing stream and ensure it's updated
// 40. Update the existing stream and ensure it's updated
let updated_stream_name = format!("{}-updated", STREAM_NAME);

client
Expand All @@ -641,7 +673,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {

assert_eq!(updated_stream.name, updated_stream_name);

// 39. Purge the existing stream and ensure it has no messages
// 41. Purge the existing stream and ensure it has no messages
let mut messages = create_messages();
client
.send_messages(
Expand Down Expand Up @@ -673,7 +705,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(polled_messages.current_offset, 0);
assert!(polled_messages.messages.is_empty());

// 40. Delete the existing topic and ensure it doesn't exist anymore
// 42. Delete the existing topic and ensure it doesn't exist anymore
client
.delete_topic(
&Identifier::numeric(STREAM_ID).unwrap(),
Expand All @@ -687,7 +719,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
.unwrap();
assert!(topics.is_empty());

// 41. Create the stream with automatically generated ID on the server
// 43. Create the stream with automatically generated ID on the server
let stream_name = format!("{}-auto", STREAM_NAME);
let stream_id = STREAM_ID + 1;
client.create_stream(&stream_name, None).await.unwrap();
Expand All @@ -701,7 +733,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(stream.id, stream_id);
assert_eq!(stream.name, stream_name);

// 42. Create the topic with automatically generated ID on the server
// 44. Create the topic with automatically generated ID on the server
let topic_name = format!("{}-auto", TOPIC_NAME);
let topic_id = 1;
client
Expand Down Expand Up @@ -730,7 +762,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(topic.id, topic_id);
assert_eq!(topic.name, topic_name);

// 43. Delete the existing streams and ensure there's no streams left
// 45. Delete the existing streams and ensure there's no streams left
let streams = client.get_streams().await.unwrap();
assert_eq!(streams.len(), 2);

Expand All @@ -744,7 +776,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
let streams = client.get_streams().await.unwrap();
assert!(streams.is_empty());

// 44. Get clients and ensure that there's 0 (HTTP) or 1 (TCP, QUIC) client
// 46. Get clients and ensure that there's 0 (HTTP) or 1 (TCP, QUIC) client
let clients = client.get_clients().await.unwrap();

assert!(clients.len() <= 1);
Expand Down
2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.63"
version = "0.6.70"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "Apache-2.0"
Expand Down
19 changes: 19 additions & 0 deletions sdk/src/binary/consumer_offsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::binary::binary_client::BinaryClient;
use crate::binary::{fail_if_not_authenticated, mapper};
use crate::client::ConsumerOffsetClient;
use crate::consumer::Consumer;
use crate::consumer_offsets::delete_consumer_offset::DeleteConsumerOffset;
use crate::consumer_offsets::get_consumer_offset::GetConsumerOffset;
use crate::consumer_offsets::store_consumer_offset::StoreConsumerOffset;
use crate::error::IggyError;
Expand Down Expand Up @@ -52,4 +53,22 @@ impl<B: BinaryClient> ConsumerOffsetClient for B {

mapper::map_consumer_offset(response).map(Some)
}

async fn delete_consumer_offset(
&self,
consumer: &Consumer,
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: Option<u32>,
) -> Result<(), IggyError> {
fail_if_not_authenticated(self).await?;
self.send_with_response(&DeleteConsumerOffset {
consumer: consumer.clone(),
stream_id: stream_id.clone(),
topic_id: topic_id.clone(),
partition_id,
})
.await?;
Ok(())
}
}
10 changes: 10 additions & 0 deletions sdk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,16 @@ pub trait ConsumerOffsetClient {
topic_id: &Identifier,
partition_id: Option<u32>,
) -> Result<Option<ConsumerOffsetInfo>, IggyError>;
/// Delete the consumer offset for a specific consumer or consumer group for the given stream and topic by unique IDs or names.
///
/// Authentication is required, and the permission to poll the messages.
async fn delete_consumer_offset(
&self,
consumer: &Consumer,
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: Option<u32>,
) -> Result<(), IggyError>;
}

/// This trait defines the methods to interact with the consumer group module.
Expand Down
14 changes: 14 additions & 0 deletions sdk/src/clients/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,20 @@ impl ConsumerOffsetClient for IggyClient {
.get_consumer_offset(consumer, stream_id, topic_id, partition_id)
.await
}

async fn delete_consumer_offset(
&self,
consumer: &Consumer,
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: Option<u32>,
) -> Result<(), IggyError> {
self.client
.read()
.await
.delete_consumer_offset(consumer, stream_id, topic_id, partition_id)
.await
}
}

#[async_trait]
Expand Down
Loading
Loading