Skip to content

Commit

Permalink
Add purge topic messages feature (#437)
Browse files Browse the repository at this point in the history
Close #431
  • Loading branch information
spetz authored Jan 2, 2024
1 parent c1be028 commit b3ab65f
Show file tree
Hide file tree
Showing 34 changed files with 522 additions and 51 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ For the detailed information about current progress, please refer to the [projec
- [Node](https://github.com/iggy-rs/iggy-node-client)
- [Python](https://github.com/iggy-rs/iggy-python-client)
- [Java](https://github.com/iggy-rs/iggy-java-client)
- [C++](https://github.com/iggy-rs/iggy-cpp-client)

---

Expand Down
1 change: 1 addition & 0 deletions cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub async fn handle(input: &str, client: &IggyClient) -> Result<(), ClientError>
Command::CreateTopic(payload) => topics::create_topic(&payload, client).await,
Command::DeleteTopic(payload) => topics::delete_topic(&payload, client).await,
Command::UpdateTopic(payload) => topics::update_topic(&payload, client).await,
Command::PurgeTopic(payload) => topics::purge_topic(&payload, client).await,
Command::CreatePartitions(payload) => partitions::create_partitions(&payload, client).await,
Command::DeletePartitions(payload) => partitions::delete_partitions(&payload, client).await,
Command::GetConsumerGroup(payload) => {
Expand Down
6 changes: 6 additions & 0 deletions cli/src/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use iggy::topics::create_topic::CreateTopic;
use iggy::topics::delete_topic::DeleteTopic;
use iggy::topics::get_topic::GetTopic;
use iggy::topics::get_topics::GetTopics;
use iggy::topics::purge_topic::PurgeTopic;
use iggy::topics::update_topic::UpdateTopic;
use tracing::info;

Expand Down Expand Up @@ -38,3 +39,8 @@ pub async fn update_topic(command: &UpdateTopic, client: &dyn Client) -> Result<
client.update_topic(command).await?;
Ok(())
}

pub async fn purge_topic(command: &PurgeTopic, client: &dyn Client) -> Result<(), ClientError> {
client.purge_topic(command).await?;
Ok(())
}
2 changes: 1 addition & 1 deletion iggy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.1.2"
version = "0.1.3"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down
12 changes: 11 additions & 1 deletion iggy/src/binary/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ use crate::binary::binary_client::BinaryClient;
use crate::binary::{fail_if_not_authenticated, mapper};
use crate::bytes_serializable::BytesSerializable;
use crate::command::{
CREATE_TOPIC_CODE, DELETE_TOPIC_CODE, GET_TOPICS_CODE, GET_TOPIC_CODE, UPDATE_TOPIC_CODE,
CREATE_TOPIC_CODE, DELETE_TOPIC_CODE, GET_TOPICS_CODE, GET_TOPIC_CODE, PURGE_TOPIC_CODE,
UPDATE_TOPIC_CODE,
};
use crate::error::Error;
use crate::models::topic::{Topic, TopicDetails};
use crate::topics::create_topic::CreateTopic;
use crate::topics::delete_topic::DeleteTopic;
use crate::topics::get_topic::GetTopic;
use crate::topics::get_topics::GetTopics;
use crate::topics::purge_topic::PurgeTopic;
use crate::topics::update_topic::UpdateTopic;

pub async fn get_topic(
Expand Down Expand Up @@ -56,3 +58,11 @@ pub async fn update_topic(client: &dyn BinaryClient, command: &UpdateTopic) -> R
.await?;
Ok(())
}

pub async fn purge_topic(client: &dyn BinaryClient, command: &PurgeTopic) -> Result<(), Error> {
fail_if_not_authenticated(client).await?;
client
.send_with_response(PURGE_TOPIC_CODE, &command.as_bytes())
.await?;
Ok(())
}
5 changes: 5 additions & 0 deletions iggy/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::topics::create_topic::CreateTopic;
use crate::topics::delete_topic::DeleteTopic;
use crate::topics::get_topic::GetTopic;
use crate::topics::get_topics::GetTopics;
use crate::topics::purge_topic::PurgeTopic;
use crate::topics::update_topic::UpdateTopic;
use crate::users::change_password::ChangePassword;
use crate::users::create_user::CreateUser;
Expand Down Expand Up @@ -211,6 +212,10 @@ pub trait TopicClient {
///
/// Authentication is required, and the permission to manage the topics.
async fn delete_topic(&self, command: &DeleteTopic) -> Result<(), Error>;
/// Purge a topic by unique ID or name.
///
/// Authentication is required, and the permission to manage the topics.
async fn purge_topic(&self, command: &PurgeTopic) -> Result<(), Error>;
}

/// This trait defines the methods to interact with the partition module.
Expand Down
5 changes: 5 additions & 0 deletions iggy/src/clients/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::topics::create_topic::CreateTopic;
use crate::topics::delete_topic::DeleteTopic;
use crate::topics::get_topic::GetTopic;
use crate::topics::get_topics::GetTopics;
use crate::topics::purge_topic::PurgeTopic;
use crate::topics::update_topic::UpdateTopic;
use crate::users::change_password::ChangePassword;
use crate::users::create_user::CreateUser;
Expand Down Expand Up @@ -654,6 +655,10 @@ impl TopicClient for IggyClient {
async fn delete_topic(&self, command: &DeleteTopic) -> Result<(), Error> {
self.client.read().await.delete_topic(command).await
}

async fn purge_topic(&self, command: &PurgeTopic) -> Result<(), Error> {
self.client.read().await.purge_topic(command).await
}
}

#[async_trait]
Expand Down
18 changes: 18 additions & 0 deletions iggy/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::topics::create_topic::CreateTopic;
use crate::topics::delete_topic::DeleteTopic;
use crate::topics::get_topic::GetTopic;
use crate::topics::get_topics::GetTopics;
use crate::topics::purge_topic::PurgeTopic;
use crate::topics::update_topic::UpdateTopic;
use crate::users::change_password::ChangePassword;
use crate::users::create_user::CreateUser;
Expand Down Expand Up @@ -108,6 +109,8 @@ pub const DELETE_TOPIC: &str = "topic.delete";
pub const DELETE_TOPIC_CODE: u32 = 303;
pub const UPDATE_TOPIC: &str = "topic.update";
pub const UPDATE_TOPIC_CODE: u32 = 304;
pub const PURGE_TOPIC: &str = "topic.purge";
pub const PURGE_TOPIC_CODE: u32 = 305;
pub const CREATE_PARTITIONS: &str = "partition.create";
pub const CREATE_PARTITIONS_CODE: u32 = 402;
pub const DELETE_PARTITIONS: &str = "partition.delete";
Expand Down Expand Up @@ -159,6 +162,7 @@ pub enum Command {
CreateTopic(CreateTopic),
DeleteTopic(DeleteTopic),
UpdateTopic(UpdateTopic),
PurgeTopic(PurgeTopic),
CreatePartitions(CreatePartitions),
DeletePartitions(DeletePartitions),
GetConsumerGroup(GetConsumerGroup),
Expand Down Expand Up @@ -221,6 +225,7 @@ impl BytesSerializable for Command {
Command::CreateTopic(payload) => as_bytes(CREATE_TOPIC_CODE, &payload.as_bytes()),
Command::DeleteTopic(payload) => as_bytes(DELETE_TOPIC_CODE, &payload.as_bytes()),
Command::UpdateTopic(payload) => as_bytes(UPDATE_TOPIC_CODE, &payload.as_bytes()),
Command::PurgeTopic(payload) => as_bytes(PURGE_TOPIC_CODE, &payload.as_bytes()),
Command::CreatePartitions(payload) => {
as_bytes(CREATE_PARTITIONS_CODE, &payload.as_bytes())
}
Expand Down Expand Up @@ -300,6 +305,7 @@ impl BytesSerializable for Command {
CREATE_TOPIC_CODE => Ok(Command::CreateTopic(CreateTopic::from_bytes(payload)?)),
DELETE_TOPIC_CODE => Ok(Command::DeleteTopic(DeleteTopic::from_bytes(payload)?)),
UPDATE_TOPIC_CODE => Ok(Command::UpdateTopic(UpdateTopic::from_bytes(payload)?)),
PURGE_TOPIC_CODE => Ok(Command::PurgeTopic(PurgeTopic::from_bytes(payload)?)),
CREATE_PARTITIONS_CODE => Ok(Command::CreatePartitions(CreatePartitions::from_bytes(
payload,
)?)),
Expand Down Expand Up @@ -387,6 +393,7 @@ impl FromStr for Command {
CREATE_TOPIC => Ok(Command::CreateTopic(CreateTopic::from_str(payload)?)),
DELETE_TOPIC => Ok(Command::DeleteTopic(DeleteTopic::from_str(payload)?)),
UPDATE_TOPIC => Ok(Command::UpdateTopic(UpdateTopic::from_str(payload)?)),
PURGE_TOPIC => Ok(Command::PurgeTopic(PurgeTopic::from_str(payload)?)),
CREATE_PARTITIONS => Ok(Command::CreatePartitions(CreatePartitions::from_str(
payload,
)?)),
Expand Down Expand Up @@ -459,6 +466,7 @@ impl Display for Command {
Command::CreateTopic(payload) => write!(formatter, "{CREATE_TOPIC}|{payload}"),
Command::DeleteTopic(payload) => write!(formatter, "{DELETE_TOPIC}|{payload}"),
Command::UpdateTopic(payload) => write!(formatter, "{UPDATE_TOPIC}|{payload}"),
Command::PurgeTopic(payload) => write!(formatter, "{PURGE_TOPIC}|{payload}"),
Command::CreatePartitions(payload) => {
write!(formatter, "{CREATE_PARTITIONS}|{payload}")
}
Expand Down Expand Up @@ -661,6 +669,11 @@ mod tests {
UPDATE_TOPIC_CODE,
&UpdateTopic::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&Command::PurgeTopic(PurgeTopic::default()),
PURGE_TOPIC_CODE,
&PurgeTopic::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&Command::CreatePartitions(CreatePartitions::default()),
CREATE_PARTITIONS_CODE,
Expand Down Expand Up @@ -857,6 +870,11 @@ mod tests {
UPDATE_TOPIC,
&UpdateTopic::default(),
);
assert_read_from_string(
&Command::PurgeTopic(PurgeTopic::default()),
PURGE_TOPIC,
&PurgeTopic::default(),
);
assert_read_from_string(
&Command::CreatePartitions(CreatePartitions::default()),
CREATE_PARTITIONS,
Expand Down
13 changes: 13 additions & 0 deletions iggy/src/http/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::topics::create_topic::CreateTopic;
use crate::topics::delete_topic::DeleteTopic;
use crate::topics::get_topic::GetTopic;
use crate::topics::get_topics::GetTopics;
use crate::topics::purge_topic::PurgeTopic;
use crate::topics::update_topic::UpdateTopic;
use async_trait::async_trait;

Expand Down Expand Up @@ -55,6 +56,18 @@ impl TopicClient for HttpClient {
.await?;
Ok(())
}

async fn purge_topic(&self, command: &PurgeTopic) -> Result<(), Error> {
self.delete(&format!(
"{}/purge",
&get_details_path(
&command.stream_id.as_string(),
&command.topic_id.as_string(),
)
))
.await?;
Ok(())
}
}

fn get_path(stream_id: &str) -> String {
Expand Down
5 changes: 5 additions & 0 deletions iggy/src/quic/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::topics::create_topic::CreateTopic;
use crate::topics::delete_topic::DeleteTopic;
use crate::topics::get_topic::GetTopic;
use crate::topics::get_topics::GetTopics;
use crate::topics::purge_topic::PurgeTopic;
use crate::topics::update_topic::UpdateTopic;
use async_trait::async_trait;

Expand All @@ -31,4 +32,8 @@ impl TopicClient for QuicClient {
async fn delete_topic(&self, command: &DeleteTopic) -> Result<(), Error> {
binary::topics::delete_topic(self, command).await
}

async fn purge_topic(&self, _command: &PurgeTopic) -> Result<(), Error> {
binary::topics::purge_topic(self, _command).await
}
}
5 changes: 5 additions & 0 deletions iggy/src/tcp/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::topics::create_topic::CreateTopic;
use crate::topics::delete_topic::DeleteTopic;
use crate::topics::get_topic::GetTopic;
use crate::topics::get_topics::GetTopics;
use crate::topics::purge_topic::PurgeTopic;
use crate::topics::update_topic::UpdateTopic;
use async_trait::async_trait;

Expand All @@ -31,4 +32,8 @@ impl TopicClient for TcpClient {
async fn delete_topic(&self, command: &DeleteTopic) -> Result<(), Error> {
binary::topics::delete_topic(self, command).await
}

async fn purge_topic(&self, _command: &PurgeTopic) -> Result<(), Error> {
binary::topics::purge_topic(self, _command).await
}
}
1 change: 1 addition & 0 deletions iggy/src/topics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod create_topic;
pub mod delete_topic;
pub mod get_topic;
pub mod get_topics;
pub mod purge_topic;
pub mod update_topic;

const MAX_NAME_LENGTH: usize = 255;
Expand Down
Loading

0 comments on commit b3ab65f

Please sign in to comment.