From c04acf6cb873de27651850feb2fff0d59e994123 Mon Sep 17 00:00:00 2001 From: Piotr Gankiewicz Date: Wed, 1 Jan 2025 16:35:39 +0100 Subject: [PATCH] Update Axum, refactor HTTP endpoints (#1413) --- Cargo.lock | 70 ++++++++++++++++++++--- server/Cargo.toml | 6 +- server/src/http/consumer_groups.rs | 4 +- server/src/http/consumer_offsets.rs | 2 +- server/src/http/messages.rs | 4 +- server/src/http/partitions.rs | 2 +- server/src/http/personal_access_tokens.rs | 2 +- server/src/http/streams.rs | 4 +- server/src/http/system.rs | 2 +- server/src/http/topics.rs | 6 +- server/src/http/users.rs | 6 +- 11 files changed, 80 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 952008122..dcb9ce378 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -404,16 +404,43 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.4.5", "bytes", "futures-util", "http 1.2.0", "http-body 1.0.1", "http-body-util", + "itoa", + "matchit 0.7.3", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d6fd624c75e18b3b4c6b9caf42b1afe24437daaee904069137d8bab077be8b8" +dependencies = [ + "axum-core 0.5.0", + "bytes", + "form_urlencoded", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", "hyper 1.5.2", "hyper-util", "itoa", - "matchit", + "matchit 0.8.4", "memchr", "mime", "percent-encoding", @@ -449,6 +476,25 @@ dependencies = [ "sync_wrapper", "tower-layer", "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df1362f362fd16024ae199c1970ce98f9661bf5ef94b9808fee734bc3698b733" +dependencies = [ + "bytes", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", "tracing", ] @@ -2662,6 +2708,12 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "maybe-async" version = "0.2.10" @@ -3714,9 +3766,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.12.11" +version = "0.12.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fe060fe50f524be480214aba758c71f99f90ee8c83c5a36b5e9e1d568eb4eb3" +checksum = "43e734407157c3c2034e0258f5e4473ddb361b1e85f95a66690d67264d7cd1da" dependencies = [ "base64 0.22.1", "bytes", @@ -4269,13 +4321,13 @@ dependencies = [ [[package]] name = "server" -version = "0.4.95" +version = "0.4.96" dependencies = [ "ahash 0.8.11", "anyhow", "async-trait", "atone", - "axum", + "axum 0.8.1", "axum-server", "bcrypt", "bincode", @@ -4872,7 +4924,7 @@ checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.7.9", "base64 0.22.1", "bytes", "h2", @@ -5713,9 +5765,9 @@ checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winnow" -version = "0.6.20" +version = "0.6.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36c1fec1a2bb5866f07c25f68c26e565c4c200aebb96d7e55710c19d3e8ac49b" +checksum = "e6f5bb5257f2407a5425c6e749bfd9692192a73e70a6060516ac04f889087d68" dependencies = [ "memchr", ] diff --git a/server/Cargo.toml b/server/Cargo.toml index 3675f0ba7..27402873d 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.95" +version = "0.4.96" edition = "2021" build = "src/build.rs" @@ -14,7 +14,7 @@ ahash = { version = "0.8.11" } anyhow = "1.0.95" async-trait = "0.1.83" atone = "0.3.7" -axum = "0.7.9" +axum = "0.8.1" axum-server = { version = "0.7.1", features = ["tls-rustls"] } bcrypt = "0.16.0" bincode = "1.3.3" @@ -56,7 +56,7 @@ opentelemetry_sdk = { version = "0.27.1", features = [ prometheus-client = "0.22.3" quinn = { version = "0.11.6" } rcgen = "0.13.2" -reqwest = { version = "0.12.11", features = [ +reqwest = { version = "0.12.12", features = [ "rustls-tls", "rustls-tls-no-provider", ] } diff --git a/server/src/http/consumer_groups.rs b/server/src/http/consumer_groups.rs index 47a60ca20..0df42918d 100644 --- a/server/src/http/consumer_groups.rs +++ b/server/src/http/consumer_groups.rs @@ -21,11 +21,11 @@ use tracing::instrument; pub fn router(state: Arc) -> Router { Router::new() .route( - "/streams/:stream_id/topics/:topic_id/consumer-groups", + "/streams/{stream_id}/topics/{topic_id}/consumer-groups", get(get_consumer_groups).post(create_consumer_group), ) .route( - "/streams/:stream_id/topics/:topic_id/consumer-groups/:group_id", + "/streams/{stream_id}/topics/{topic_id}/consumer-groups/{group_id}", get(get_consumer_group).delete(delete_consumer_group), ) .with_state(state) diff --git a/server/src/http/consumer_offsets.rs b/server/src/http/consumer_offsets.rs index 66373c2aa..2f91e7007 100644 --- a/server/src/http/consumer_offsets.rs +++ b/server/src/http/consumer_offsets.rs @@ -19,7 +19,7 @@ use std::sync::Arc; pub fn router(state: Arc) -> Router { Router::new() .route( - "/streams/:stream_id/topics/:topic_id/consumer-offsets", + "/streams/{stream_id}/topics/{topic_id}/consumer-offsets", get(get_consumer_offset).put(store_consumer_offset), ) .with_state(state) diff --git a/server/src/http/messages.rs b/server/src/http/messages.rs index 88cbc031b..bc7cab38e 100644 --- a/server/src/http/messages.rs +++ b/server/src/http/messages.rs @@ -22,11 +22,11 @@ use tracing::instrument; pub fn router(state: Arc) -> Router { Router::new() .route( - "/streams/:stream_id/topics/:topic_id/messages", + "/streams/{stream_id}/topics/{topic_id}/messages", get(poll_messages).post(send_messages), ) .route( - "/streams/:stream_id/topics/:topic_id/messages/flush/:partition_id/:fsync", + "/streams/{stream_id}/topics/{topic_id}/messages/flush/{partition_id}/{fsync}", get(flush_unsaved_buffer), ) .with_state(state) diff --git a/server/src/http/partitions.rs b/server/src/http/partitions.rs index 77d02384d..6dedc8d39 100644 --- a/server/src/http/partitions.rs +++ b/server/src/http/partitions.rs @@ -19,7 +19,7 @@ use tracing::instrument; pub fn router(state: Arc) -> Router { Router::new() .route( - "/streams/:stream_id/topics/:topic_id/partitions", + "/streams/{stream_id}/topics/{topic_id}/partitions", post(create_partitions).delete(delete_partitions), ) .with_state(state) diff --git a/server/src/http/personal_access_tokens.rs b/server/src/http/personal_access_tokens.rs index dc579ccc6..836dffaed 100644 --- a/server/src/http/personal_access_tokens.rs +++ b/server/src/http/personal_access_tokens.rs @@ -29,7 +29,7 @@ pub fn router(state: Arc) -> Router { get(get_personal_access_tokens).post(create_personal_access_token), ) .route( - "/personal-access-tokens/:name", + "/personal-access-tokens/{name}", delete(delete_personal_access_token), ) .route( diff --git a/server/src/http/streams.rs b/server/src/http/streams.rs index 6a7818923..18c553536 100644 --- a/server/src/http/streams.rs +++ b/server/src/http/streams.rs @@ -25,10 +25,10 @@ pub fn router(state: Arc) -> Router { Router::new() .route("/streams", get(get_streams).post(create_stream)) .route( - "/streams/:stream_id", + "/streams/{stream_id}", get(get_stream).put(update_stream).delete(delete_stream), ) - .route("/streams/:stream_id/purge", delete(purge_stream)) + .route("/streams/{stream_id}/purge", delete(purge_stream)) .with_state(state) } diff --git a/server/src/http/system.rs b/server/src/http/system.rs index 07535c212..e32d85c62 100644 --- a/server/src/http/system.rs +++ b/server/src/http/system.rs @@ -30,7 +30,7 @@ pub fn router(state: Arc, metrics_config: &HttpMetricsConfig) -> Route .route("/ping", get(|| async { PONG })) .route("/stats", get(get_stats)) .route("/clients", get(get_clients)) - .route("/clients/:client_id", get(get_client)) + .route("/clients/{client_id}", get(get_client)) .route("/snapshot", post(get_snapshot)); if metrics_config.enabled { router = router.route(&metrics_config.endpoint, get(get_metrics)); diff --git a/server/src/http/topics.rs b/server/src/http/topics.rs index 8eee30fad..8d8783296 100644 --- a/server/src/http/topics.rs +++ b/server/src/http/topics.rs @@ -23,15 +23,15 @@ use tracing::instrument; pub fn router(state: Arc) -> Router { Router::new() .route( - "/streams/:stream_id/topics", + "/streams/{stream_id}/topics", get(get_topics).post(create_topic), ) .route( - "/streams/:stream_id/topics/:topic_id", + "/streams/{stream_id}/topics/{topic_id}", get(get_topic).put(update_topic).delete(delete_topic), ) .route( - "/streams/:stream_id/topics/:topic_id/purge", + "/streams/{stream_id}/topics/{topic_id}/purge", delete(purge_topic), ) .with_state(state) diff --git a/server/src/http/users.rs b/server/src/http/users.rs index adb2d5c0a..cbb6d6b10 100644 --- a/server/src/http/users.rs +++ b/server/src/http/users.rs @@ -30,11 +30,11 @@ pub fn router(state: Arc) -> Router { Router::new() .route("/users", get(get_users).post(create_user)) .route( - "/users/:user_id", + "/users/{user_id}", get(get_user).put(update_user).delete(delete_user), ) - .route("/users/:user_id/permissions", put(update_permissions)) - .route("/users/:user_id/password", put(change_password)) + .route("/users/{user_id}/permissions", put(update_permissions)) + .route("/users/{user_id}/password", put(change_password)) .route("/users/login", post(login_user)) .route("/users/logout", delete(logout_user)) .route("/users/refresh-token", post(refresh_token))