From eb1d90574802c32c7e3c52cffb8d679c747136e4 Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Wed, 6 Dec 2023 22:08:40 -0500 Subject: [PATCH 01/12] Update deps, hyper 1.0, axum 0.7 --- tonic/Cargo.toml | 46 +++++++++++++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index 013cc6e72..747635056 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -28,7 +28,14 @@ gzip = ["dep:flate2"] zstd = ["dep:zstd"] default = ["transport", "codegen", "prost"] prost = ["dep:prost"] -tls = ["dep:rustls-pemfile", "transport", "dep:tokio-rustls", "dep:rustls", "tokio/rt", "tokio/macros"] +tls = [ + "dep:rustls-pemfile", + "transport", + "dep:tokio-rustls", + "dep:rustls", + "tokio/rt", + "tokio/macros", +] tls-roots = ["tls-roots-common", "dep:rustls-native-certs"] tls-roots-common = ["tls"] tls-webpki-roots = ["tls-roots-common", "dep:webpki-roots"] @@ -52,29 +59,42 @@ channel = [] [dependencies] base64 = "0.21" bytes = "1.0" -http = "0.2" +http = "1.0" tracing = "0.1" tokio = "1.0.1" -http-body = "0.4.4" +http-body = "1.0" +http-body-util = "0.1" percent-encoding = "2.1" pin-project = "1.0.11" tower-layer = "0.3" tower-service = "0.3" # prost -prost = {version = "0.12", default-features = false, features = ["std"], optional = true} +prost = { version = "0.12", default-features = false, features = [ + "std", +], optional = true } # codegen -async-trait = {version = "0.1.13", optional = true} +async-trait = { version = "0.1.13", optional = true } # transport -h2 = {version = "0.3.17", optional = true} -hyper = {version = "0.14.26", features = ["full"], optional = true} -hyper-timeout = {version = "0.4", optional = true} +h2 = { version = "0.4", optional = true } +hyper = { version = "1.0", features = ["full"], optional = true } +hyper-util = { version = "0.1", features = ["full"] } +hyper-timeout = { version = "0.5", optional = true } tokio-stream = "0.1" -tower = {version = "0.4.7", default-features = false, features = ["balance", "buffer", "discover", "limit", "load", "make", "timeout", "util"], optional = true} -axum = {version = "0.6.9", default_features = false, optional = true} +tower = { version = "0.4.7", default-features = false, features = [ + "balance", + "buffer", + "discover", + "limit", + "load", + "make", + "timeout", + "util", +], optional = true } +axum = { version = "0.7", default_features = false, optional = true } # rustls async-stream = { version = "0.3", optional = true } @@ -85,7 +105,7 @@ rustls = { version = "0.21.7", optional = true } webpki-roots = { version = "0.25.0", optional = true } # compression -flate2 = {version = "1.0", optional = true} +flate2 = { version = "1.0", optional = true } zstd = { version = "0.12.3", optional = true } [dev-dependencies] @@ -94,8 +114,8 @@ quickcheck = "1.0" quickcheck_macros = "1.0" rand = "0.8" static_assertions = "1.0" -tokio = {version = "1.0", features = ["rt", "macros"]} -tower = {version = "0.4.7", features = ["full"]} +tokio = { version = "1.0", features = ["rt", "macros"] } +tower = { version = "0.4.7", features = ["full"] } [package.metadata.docs.rs] all-features = true From 2850a09f359257ec026713aff1a6a8b5c5a93034 Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Wed, 6 Dec 2023 22:17:34 -0500 Subject: [PATCH 02/12] Update imports to use util crates Some low-hanging import fixes --- tonic/src/body.rs | 6 ++---- tonic/src/transport/channel/endpoint.rs | 4 ++-- tonic/src/transport/channel/mod.rs | 3 ++- tonic/src/transport/server/mod.rs | 2 +- tonic/src/transport/service/connection.rs | 3 ++- tonic/src/transport/service/discover.rs | 2 +- tonic/src/transport/service/io.rs | 2 +- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/tonic/src/body.rs b/tonic/src/body.rs index ef95eec47..624b5f692 100644 --- a/tonic/src/body.rs +++ b/tonic/src/body.rs @@ -1,9 +1,7 @@ //! HTTP specific body utilities. -use http_body::Body; - /// A type erased HTTP body used for tonic services. -pub type BoxBody = http_body::combinators::UnsyncBoxBody; +pub type BoxBody = http_body_util::combinators::UnsyncBoxBody; /// Convert a [`http_body::Body`] into a [`BoxBody`]. pub(crate) fn boxed(body: B) -> BoxBody @@ -16,7 +14,7 @@ where /// Create an empty `BoxBody` pub fn empty_body() -> BoxBody { - http_body::Empty::new() + http_body_util::Empty::new() .map_err(|err| match err {}) .boxed_unsync() } diff --git a/tonic/src/transport/channel/endpoint.rs b/tonic/src/transport/channel/endpoint.rs index 6aacb57a5..ac23de98e 100644 --- a/tonic/src/transport/channel/endpoint.rs +++ b/tonic/src/transport/channel/endpoint.rs @@ -313,7 +313,7 @@ impl Endpoint { /// Create a channel from this config. pub async fn connect(&self) -> Result { - let mut http = hyper::client::connect::HttpConnector::new(); + let mut http = hyper_util::client::legacy::connect::HttpConnector::new(); http.enforce_http(false); http.set_nodelay(self.tcp_nodelay); http.set_keepalive(self.tcp_keepalive); @@ -334,7 +334,7 @@ impl Endpoint { /// The channel returned by this method does not attempt to connect to the endpoint until first /// use. pub fn connect_lazy(&self) -> Channel { - let mut http = hyper::client::connect::HttpConnector::new(); + let mut http = hyper_util::client::legacy::connect::HttpConnector::new(); http.enforce_http(false); http.set_nodelay(self.tcp_nodelay); http.set_keepalive(self.tcp_keepalive); diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index b510a6980..94ed09953 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -17,7 +17,7 @@ use http::{ uri::{InvalidUri, Uri}, Request, Response, }; -use hyper::client::connect::Connection as HyperConnection; +use hyper_util::client::legacy::connect::Connection as HyperConnection; use std::{ fmt, future::Future, @@ -236,3 +236,4 @@ impl fmt::Debug for ResponseFuture { f.debug_struct("ResponseFuture").finish() } } + diff --git a/tonic/src/transport/server/mod.rs b/tonic/src/transport/server/mod.rs index e10f11f68..57488f2c3 100644 --- a/tonic/src/transport/server/mod.rs +++ b/tonic/src/transport/server/mod.rs @@ -65,7 +65,7 @@ use tower::{ Service, ServiceBuilder, }; -type BoxHttpBody = http_body::combinators::UnsyncBoxBody; +type BoxHttpBody = http_body_util::combinators::UnsyncBoxBody; type BoxService = tower::util::BoxService, Response, crate::Error>; type TraceInterceptor = Arc) -> tracing::Span + Send + Sync + 'static>; diff --git a/tonic/src/transport/service/connection.rs b/tonic/src/transport/service/connection.rs index 46a88dda5..10a45c0ec 100644 --- a/tonic/src/transport/service/connection.rs +++ b/tonic/src/transport/service/connection.rs @@ -5,8 +5,8 @@ use crate::{ }; use http::Uri; use hyper::client::conn::Builder; -use hyper::client::connect::Connection as HyperConnection; use hyper::client::service::Connect as HyperConnect; +use hyper_util::client::legacy::connect::Connection as HyperConnection; use std::{ fmt, task::{Context, Poll}, @@ -126,3 +126,4 @@ impl fmt::Debug for Connection { f.debug_struct("Connection").finish() } } + diff --git a/tonic/src/transport/service/discover.rs b/tonic/src/transport/service/discover.rs index 2d23ca74c..fb31c4f92 100644 --- a/tonic/src/transport/service/discover.rs +++ b/tonic/src/transport/service/discover.rs @@ -32,7 +32,7 @@ impl Stream for DynamicServiceStream { Poll::Pending | Poll::Ready(None) => Poll::Pending, Poll::Ready(Some(change)) => match change { Change::Insert(k, endpoint) => { - let mut http = hyper::client::connect::HttpConnector::new(); + let mut http = hyper_util::client::legacy::connect::HttpConnector::new(); http.set_nodelay(endpoint.tcp_nodelay); http.set_keepalive(endpoint.tcp_keepalive); http.set_connect_timeout(endpoint.connect_timeout); diff --git a/tonic/src/transport/service/io.rs b/tonic/src/transport/service/io.rs index 2230b9b2e..e5e75287a 100644 --- a/tonic/src/transport/service/io.rs +++ b/tonic/src/transport/service/io.rs @@ -1,5 +1,5 @@ use crate::transport::server::Connected; -use hyper::client::connect::{Connected as HyperConnected, Connection}; +use hyper_util::client::legacy::connect::{Connected as HyperConnected, Connection}; use std::io; use std::io::IoSlice; use std::pin::Pin; From efc5b3c541acf151938ab42b6bfe17f57bb58f0a Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Wed, 6 Dec 2023 23:39:25 -0500 Subject: [PATCH 03/12] Use Axum Request and Response in transport This commit is primarily converting Request and Response types within the transport module to Axum 0.7 Request/Response. There is still more to come to finish this conversion. There are also small changes such as updating the hyper service builder syntax. Over the course of this commit,it was discovered that hyper-util is missing `http2_max_pending_accept_reset_streams`. --- tonic/src/transport/channel/mod.rs | 22 ++++++++------------ tonic/src/transport/mod.rs | 5 +++-- tonic/src/transport/server/mod.rs | 23 ++++++++++++--------- tonic/src/transport/service/connection.rs | 25 ++++++++--------------- tonic/src/transport/service/router.rs | 25 ++++++++++------------- 5 files changed, 45 insertions(+), 55 deletions(-) diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index 94ed09953..b1bbc6046 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -10,13 +10,9 @@ pub use endpoint::Endpoint; pub use tls::ClientTlsConfig; use super::service::{Connection, DynamicServiceStream, SharedExec}; -use crate::body::BoxBody; use crate::transport::Executor; use bytes::Bytes; -use http::{ - uri::{InvalidUri, Uri}, - Request, Response, -}; +use http::uri::{InvalidUri, Uri}; use hyper_util::client::legacy::connect::Connection as HyperConnection; use std::{ fmt, @@ -30,6 +26,7 @@ use tokio::{ sync::mpsc::{channel, Sender}, }; +use axum::{extract::Request, response::Response, body::Body}; use tower::balance::p2c::Balance; use tower::{ buffer::{self, Buffer}, @@ -38,7 +35,7 @@ use tower::{ Service, }; -type Svc = Either, Response, crate::Error>>; +type Svc = Either>; const DEFAULT_BUFFER_SIZE: usize = 1024; @@ -67,14 +64,14 @@ const DEFAULT_BUFFER_SIZE: usize = 1024; /// cloning the `Channel` type is cheap and encouraged. #[derive(Clone)] pub struct Channel { - svc: Buffer>, + svc: Buffer, } /// A future that resolves to an HTTP response. /// /// This is returned by the `Service::call` on [`Channel`]. pub struct ResponseFuture { - inner: buffer::future::ResponseFuture<>>::Future>, + inner: buffer::future::ResponseFuture<>::Future>, } impl Channel { @@ -200,8 +197,8 @@ impl Channel { } } -impl Service> for Channel { - type Response = http::Response; +impl Service for Channel { + type Response = Response; type Error = super::Error; type Future = ResponseFuture; @@ -209,7 +206,7 @@ impl Service> for Channel { Service::poll_ready(&mut self.svc, cx).map_err(super::Error::from_source) } - fn call(&mut self, request: http::Request) -> Self::Future { + fn call(&mut self, request: Request) -> Self::Future { let inner = Service::call(&mut self.svc, request); ResponseFuture { inner } @@ -217,7 +214,7 @@ impl Service> for Channel { } impl Future for ResponseFuture { - type Output = Result, super::Error>; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let val = ready!(Pin::new(&mut self.inner).poll(cx)).map_err(super::Error::from_source)?; @@ -236,4 +233,3 @@ impl fmt::Debug for ResponseFuture { f.debug_struct("ResponseFuture").finish() } } - diff --git a/tonic/src/transport/mod.rs b/tonic/src/transport/mod.rs index c676bfb92..e18e849c3 100644 --- a/tonic/src/transport/mod.rs +++ b/tonic/src/transport/mod.rs @@ -108,8 +108,8 @@ pub use self::tls::Certificate; #[doc(inline)] /// A deprecated re-export. Please use `tonic::server::NamedService` directly. pub use crate::server::NamedService; -pub use axum::{body::BoxBody as AxumBoxBody, Router as AxumRouter}; -pub use hyper::{Body, Uri}; +pub use axum::{body::Body as AxumBoxBody, Router as AxumRouter}; +pub use hyper::Uri; pub(crate) use self::service::executor::Executor; @@ -124,3 +124,4 @@ pub use self::server::ServerTlsConfig; pub use self::tls::Identity; type BoxFuture<'a, T> = std::pin::Pin + Send + 'a>>; + diff --git a/tonic/src/transport/server/mod.rs b/tonic/src/transport/server/mod.rs index 57488f2c3..c8bb74edd 100644 --- a/tonic/src/transport/server/mod.rs +++ b/tonic/src/transport/server/mod.rs @@ -16,6 +16,7 @@ pub use super::service::RoutesBuilder; /// A deprecated re-export. Please use `tonic::server::NamedService` directly. pub use crate::server::NamedService; pub use conn::{Connected, TcpConnectInfo}; +use hyper_util::rt::TokioExecutor; #[cfg(feature = "tls")] pub use tls::ServerTlsConfig; @@ -536,16 +537,17 @@ impl Server { _io: PhantomData, }; - let server = hyper::Server::builder(incoming) - .http2_only(http2_only) - .http2_initial_connection_window_size(init_connection_window_size) - .http2_initial_stream_window_size(init_stream_window_size) - .http2_max_concurrent_streams(max_concurrent_streams) - .http2_keep_alive_interval(http2_keepalive_interval) - .http2_keep_alive_timeout(http2_keepalive_timeout) - .http2_adaptive_window(http2_adaptive_window.unwrap_or_default()) - .http2_max_pending_accept_reset_streams(http2_max_pending_accept_reset_streams) - .http2_max_frame_size(max_frame_size); + let server = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()) + .http2() + .initial_connection_window_size(init_connection_window_size) + .initial_stream_window_size(init_stream_window_size) + .max_concurrent_streams(max_concurrent_streams) + .keep_alive_interval(http2_keepalive_interval) + .keep_alive_timeout(http2_keepalive_timeout) + .adaptive_window(http2_adaptive_window.unwrap_or_default()) + // FIXME: wait for this to be added to hyper-util + //.max_pending_accept_reset_streams(http2_max_pending_accept_reset_streams) + .max_frame_size(max_frame_size); if let Some(signal) = signal { server @@ -887,3 +889,4 @@ where future::ready(Ok(svc)) } } + diff --git a/tonic/src/transport/service/connection.rs b/tonic/src/transport/service/connection.rs index 10a45c0ec..6184de983 100644 --- a/tonic/src/transport/service/connection.rs +++ b/tonic/src/transport/service/connection.rs @@ -1,12 +1,8 @@ use super::{grpc_timeout::GrpcTimeout, reconnect::Reconnect, AddOrigin, UserAgent}; -use crate::{ - body::BoxBody, - transport::{BoxFuture, Endpoint}, -}; +use crate::transport::{BoxFuture, Endpoint}; use http::Uri; -use hyper::client::conn::Builder; -use hyper::client::service::Connect as HyperConnect; -use hyper_util::client::legacy::connect::Connection as HyperConnection; +use hyper::client::conn::http2::Builder; +use hyper_util::client::legacy::connect::{Connect as HyperConnect, Connection as HyperConnection}; use std::{ fmt, task::{Context, Poll}, @@ -21,9 +17,8 @@ use tower::{ }; use tower_service::Service; -pub(crate) type Request = http::Request; -pub(crate) type Response = http::Response; - +pub(crate) type Request = axum::extract::Request; +pub(crate) type Response = axum::response::Response; pub(crate) struct Connection { inner: BoxService, } @@ -36,12 +31,10 @@ impl Connection { C::Future: Unpin + Send, C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, { - let mut settings = Builder::new() - .http2_initial_stream_window_size(endpoint.init_stream_window_size) - .http2_initial_connection_window_size(endpoint.init_connection_window_size) - .http2_only(true) - .http2_keep_alive_interval(endpoint.http2_keep_alive_interval) - .executor(endpoint.executor.clone()) + let mut settings = Builder::new(endpoint.executor) + .initial_stream_window_size(endpoint.init_stream_window_size) + .initial_connection_window_size(endpoint.init_connection_window_size) + .keep_alive_interval(endpoint.http2_keep_alive_interval) .clone(); if let Some(val) = endpoint.http2_keep_alive_timeout { diff --git a/tonic/src/transport/service/router.rs b/tonic/src/transport/service/router.rs index 85636c4d4..ab3d43978 100644 --- a/tonic/src/transport/service/router.rs +++ b/tonic/src/transport/service/router.rs @@ -1,9 +1,5 @@ -use crate::{ - body::{boxed, BoxBody}, - server::NamedService, -}; -use http::{Request, Response}; -use hyper::Body; +use crate::{body::boxed, server::NamedService}; +use axum::{extract::Request, response::Response}; use pin_project::pin_project; use std::{ convert::Infallible, @@ -31,7 +27,7 @@ impl RoutesBuilder { /// Add a new service. pub fn add_service(&mut self, svc: S) -> &mut Self where - S: Service, Response = Response, Error = Infallible> + S: Service + NamedService + Clone + Send @@ -53,7 +49,7 @@ impl Routes { /// Create a new routes with `svc` already added to it. pub fn new(svc: S) -> Self where - S: Service, Response = Response, Error = Infallible> + S: Service + NamedService + Clone + Send @@ -68,7 +64,7 @@ impl Routes { /// Add a new service. pub fn add_service(mut self, svc: S) -> Self where - S: Service, Response = Response, Error = Infallible> + S: Service + NamedService + Clone + Send @@ -103,8 +99,8 @@ async fn unimplemented() -> impl axum::response::IntoResponse { (status, headers) } -impl Service> for Routes { - type Response = Response; +impl Service for Routes { + type Response = Response; type Error = crate::Error; type Future = RoutesFuture; @@ -113,13 +109,13 @@ impl Service> for Routes { Poll::Ready(Ok(())) } - fn call(&mut self, req: Request) -> Self::Future { + fn call(&mut self, req: Request) -> Self::Future { RoutesFuture(self.router.call(req)) } } #[pin_project] -pub struct RoutesFuture(#[pin] axum::routing::future::RouteFuture); +pub struct RoutesFuture(#[pin] axum::routing::future::RouteFuture); impl fmt::Debug for RoutesFuture { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -128,7 +124,7 @@ impl fmt::Debug for RoutesFuture { } impl Future for RoutesFuture { - type Output = Result, crate::Error>; + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match ready!(self.project().0.poll(cx)) { @@ -137,3 +133,4 @@ impl Future for RoutesFuture { } } } + From 6da14d81df621b31c3985cd7f0776acb0f81ccbd Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Wed, 6 Dec 2023 23:52:39 -0500 Subject: [PATCH 04/12] Chop out AddrStream Replaced with `tokio::net::TcpStream`. Inspired by https://github.com/hyperium/hyper/issues/2850 --- tonic/src/transport/server/conn.rs | 12 ------------ tonic/src/transport/server/incoming.rs | 9 +++------ 2 files changed, 3 insertions(+), 18 deletions(-) diff --git a/tonic/src/transport/server/conn.rs b/tonic/src/transport/server/conn.rs index 907cf4965..37bcc561b 100644 --- a/tonic/src/transport/server/conn.rs +++ b/tonic/src/transport/server/conn.rs @@ -1,4 +1,3 @@ -use hyper::server::conn::AddrStream; use std::net::SocketAddr; use tokio::net::TcpStream; @@ -86,17 +85,6 @@ impl TcpConnectInfo { } } -impl Connected for AddrStream { - type ConnectInfo = TcpConnectInfo; - - fn connect_info(&self) -> Self::ConnectInfo { - TcpConnectInfo { - local_addr: Some(self.local_addr()), - remote_addr: Some(self.remote_addr()), - } - } -} - impl Connected for TcpStream { type ConnectInfo = TcpConnectInfo; diff --git a/tonic/src/transport/server/incoming.rs b/tonic/src/transport/server/incoming.rs index 61aadc93d..ce7175173 100644 --- a/tonic/src/transport/server/incoming.rs +++ b/tonic/src/transport/server/incoming.rs @@ -1,9 +1,5 @@ use super::{Connected, Server}; use crate::transport::service::ServerIo; -use hyper::server::{ - accept::Accept, - conn::{AddrIncoming, AddrStream}, -}; use std::{ net::SocketAddr, pin::Pin, @@ -12,7 +8,7 @@ use std::{ }; use tokio::{ io::{AsyncRead, AsyncWrite}, - net::TcpListener, + net::{TcpListener, TcpStream}, }; use tokio_stream::{Stream, StreamExt}; @@ -187,7 +183,7 @@ impl TcpIncoming { } impl Stream for TcpIncoming { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.inner).poll_accept(cx) @@ -207,3 +203,4 @@ mod tests { let _t3 = TcpIncoming::new(addr, true, None).unwrap(); } } + From 55cbf4c671bb508cbdc3af319111bf21fdc4e068 Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Thu, 7 Dec 2023 00:01:44 -0500 Subject: [PATCH 05/12] Replace AddrIncoming with TcpListener --- tonic/src/transport/server/incoming.rs | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/tonic/src/transport/server/incoming.rs b/tonic/src/transport/server/incoming.rs index ce7175173..dc42bc002 100644 --- a/tonic/src/transport/server/incoming.rs +++ b/tonic/src/transport/server/incoming.rs @@ -123,7 +123,7 @@ enum SelectOutput { /// of `AsyncRead + AsyncWrite` that communicate with clients that connect to a socket address. #[derive(Debug)] pub struct TcpIncoming { - inner: AddrIncoming, + inner: TcpListener, } impl TcpIncoming { @@ -163,22 +163,16 @@ impl TcpIncoming { nodelay: bool, keepalive: Option, ) -> Result { - let mut inner = AddrIncoming::bind(&addr)?; + let mut inner = TcpListener::bind(&addr)?; inner.set_nodelay(nodelay); inner.set_keepalive(keepalive); Ok(TcpIncoming { inner }) } +} - /// Creates a new `TcpIncoming` from an existing `tokio::net::TcpListener`. - pub fn from_listener( - listener: TcpListener, - nodelay: bool, - keepalive: Option, - ) -> Result { - let mut inner = AddrIncoming::from_listener(listener)?; - inner.set_nodelay(nodelay); - inner.set_keepalive(keepalive); - Ok(TcpIncoming { inner }) +impl From for TcpIncoming { + fn from(inner: TcpListener) -> Self { + TcpIncoming { inner } } } From 0dc89d2848230435cf52eec132aa7d2bda053af1 Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Thu, 7 Dec 2023 00:35:05 -0500 Subject: [PATCH 06/12] Update impl of http_body::Body - use `Frame`. - update `poll_data` to `poll_frame` - remove `poll_trailers` in most places I am not sure that a simple rename of `poll_data` to `poll_frame` is right. I also left one instance of `poll_trailers` in *tonic/src/codec/decode.rs*. --- tonic/src/codec/encode.rs | 14 ++++---------- tonic/src/codec/prost.rs | 14 ++++---------- tonic/src/service/interceptor.rs | 21 ++++++++------------- 3 files changed, 16 insertions(+), 33 deletions(-) diff --git a/tonic/src/codec/encode.rs b/tonic/src/codec/encode.rs index 13eb2c96d..ccb2a7945 100644 --- a/tonic/src/codec/encode.rs +++ b/tonic/src/codec/encode.rs @@ -3,7 +3,7 @@ use super::{EncodeBuf, Encoder, DEFAULT_MAX_SEND_MESSAGE_SIZE, HEADER_SIZE}; use crate::{Code, Status}; use bytes::{BufMut, Bytes, BytesMut}; use http::HeaderMap; -use http_body::Body; +use http_body::{Body, Frame}; use pin_project::pin_project; use std::{ pin::Pin, @@ -319,10 +319,10 @@ where self.state.is_end_stream } - fn poll_data( + fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll, Self::Error>>> { let self_proj = self.project(); match ready!(self_proj.inner.poll_next(cx)) { Some(Ok(d)) => Some(Ok(d)).into(), @@ -336,11 +336,5 @@ where None => None.into(), } } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll, Status>> { - Poll::Ready(self.project().state.trailers()) - } } + diff --git a/tonic/src/codec/prost.rs b/tonic/src/codec/prost.rs index d2f1652f4..74e307352 100644 --- a/tonic/src/codec/prost.rs +++ b/tonic/src/codec/prost.rs @@ -268,7 +268,7 @@ mod tests { mod body { use crate::Status; use bytes::Bytes; - use http_body::Body; + use http_body::{Body, Frame}; use std::{ pin::Pin, task::{Context, Poll}, @@ -299,10 +299,10 @@ mod tests { type Data = Bytes; type Error = Status; - fn poll_data( + fn poll_frame( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll, Self::Error>>> { // every other call to poll_data returns data let should_send = self.count % 2 == 0; let data_len = self.data.len(); @@ -325,13 +325,7 @@ mod tests { Poll::Ready(None) } } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - Poll::Ready(Ok(None)) - } } } } + diff --git a/tonic/src/service/interceptor.rs b/tonic/src/service/interceptor.rs index cadff466f..a23b5aede 100644 --- a/tonic/src/service/interceptor.rs +++ b/tonic/src/service/interceptor.rs @@ -232,7 +232,8 @@ where mod tests { #[allow(unused_imports)] use super::*; - use http::header::HeaderMap; + use http_body::Frame; + use http_body_util::Empty; use std::{ pin::Pin, task::{Context, Poll}, @@ -246,19 +247,12 @@ mod tests { type Data = Bytes; type Error = Status; - fn poll_data( + fn poll_frame( self: Pin<&mut Self>, _cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll, Self::Error>>> { Poll::Ready(None) } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - Poll::Ready(Ok(None)) - } } #[tokio::test] @@ -318,19 +312,20 @@ mod tests { #[tokio::test] async fn doesnt_change_http_method() { - let svc = tower::service_fn(|request: http::Request| async move { + let svc = tower::service_fn(|request: http::Request| async move { assert_eq!(request.method(), http::Method::OPTIONS); - Ok::<_, hyper::Error>(hyper::Response::new(hyper::Body::empty())) + Ok::<_, hyper::Error>(hyper::Response::new(Empty::new())) }); let svc = InterceptedService::new(svc, Ok); let request = http::Request::builder() .method(http::Method::OPTIONS) - .body(hyper::Body::empty()) + .body(Empty::new()) .unwrap(); svc.oneshot(request).await.unwrap(); } } + From 1db6eafbd0cb4d9f25d61c511dbf114a80814815 Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Thu, 7 Dec 2023 01:22:40 -0500 Subject: [PATCH 07/12] More http2 method renames --- tonic/src/transport/service/connection.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tonic/src/transport/service/connection.rs b/tonic/src/transport/service/connection.rs index 6184de983..170000c37 100644 --- a/tonic/src/transport/service/connection.rs +++ b/tonic/src/transport/service/connection.rs @@ -38,15 +38,15 @@ impl Connection { .clone(); if let Some(val) = endpoint.http2_keep_alive_timeout { - settings.http2_keep_alive_timeout(val); + settings.keep_alive_timeout(val); } if let Some(val) = endpoint.http2_keep_alive_while_idle { - settings.http2_keep_alive_while_idle(val); + settings.keep_alive_while_idle(val); } if let Some(val) = endpoint.http2_adaptive_window { - settings.http2_adaptive_window(val); + settings.adaptive_window(val); } let stack = ServiceBuilder::new() From 2c60deb3f64bf23f1f0285a011134c5441993046 Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Thu, 7 Dec 2023 01:23:04 -0500 Subject: [PATCH 08/12] Use BodyExt trait --- tonic/src/body.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tonic/src/body.rs b/tonic/src/body.rs index 624b5f692..568849bf2 100644 --- a/tonic/src/body.rs +++ b/tonic/src/body.rs @@ -1,5 +1,7 @@ //! HTTP specific body utilities. +use http_body_util::BodyExt; + /// A type erased HTTP body used for tonic services. pub type BoxBody = http_body_util::combinators::UnsyncBoxBody; @@ -18,3 +20,4 @@ pub fn empty_body() -> BoxBody { .map_err(|err| match err {}) .boxed_unsync() } + From 998f4f2d858f4f8b58687f879b9de35d4da51a1b Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Thu, 7 Dec 2023 01:31:44 -0500 Subject: [PATCH 09/12] Remove `hyper::Error::is_connect` `is_connect` was deprecated when the higher-level client was removed from hyper. The corresponding comments are removed as well. --- tonic/src/status.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/tonic/src/status.rs b/tonic/src/status.rs index da8b792e5..d0ba18a34 100644 --- a/tonic/src/status.rs +++ b/tonic/src/status.rs @@ -412,13 +412,7 @@ impl Status { // > status. Note that the frequency of PINGs is highly dependent on the network // > environment, implementations are free to adjust PING frequency based on network and // > application requirements, which is why it's mapped to unavailable here. - // - // Likewise, if we are unable to connect to the server, map this to UNAVAILABLE. This is - // consistent with the behavior of a C++ gRPC client when the server is not running, and - // matches the spec of: - // > The service is currently unavailable. This is most likely a transient condition that - // > can be corrected if retried with a backoff. - if err.is_timeout() || err.is_connect() { + if err.is_timeout() { return Some(Status::unavailable(err.to_string())); } @@ -1009,3 +1003,4 @@ mod tests { assert_eq!(status.details(), DETAILS); } } + From 3542956e13752dd02fc144b73c81b939b01013b7 Mon Sep 17 00:00:00 2001 From: Allan Zhang Date: Thu, 7 Dec 2023 01:34:17 -0500 Subject: [PATCH 10/12] Add Clone to Extensions::insert `http::Extensions::insert` requires `T: Clone` so we add it. --- tonic/src/extensions.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tonic/src/extensions.rs b/tonic/src/extensions.rs index 67a7137be..896a9f873 100644 --- a/tonic/src/extensions.rs +++ b/tonic/src/extensions.rs @@ -24,7 +24,7 @@ impl Extensions { /// If a extension of this type already existed, it will /// be returned. #[inline] - pub fn insert(&mut self, val: T) -> Option { + pub fn insert(&mut self, val: T) -> Option { self.inner.insert(val) } @@ -95,3 +95,4 @@ impl GrpcMethod { self.method } } + From 0b8a3cc33a7f30fb20f35fcf0fe84d25300199a3 Mon Sep 17 00:00:00 2001 From: Ludea Date: Tue, 19 Dec 2023 06:43:16 +0000 Subject: [PATCH 11/12] add poll_frame, delete poll_trailers --- tonic/src/transport/server/recover_error.rs | 24 ++++++--------------- 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/tonic/src/transport/server/recover_error.rs b/tonic/src/transport/server/recover_error.rs index 6d7e55bf4..20c583aac 100644 --- a/tonic/src/transport/server/recover_error.rs +++ b/tonic/src/transport/server/recover_error.rs @@ -98,30 +98,18 @@ where type Data = B::Data; type Error = B::Error; - fn poll_data( + fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { - match self.project().inner.as_pin_mut() { - Some(b) => b.poll_data(cx), - None => Poll::Ready(None), - } + ) -> Poll, Self::Error>>> { + Pin::new(&mut self.0).poll_frame(cx) } - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - match self.project().inner.as_pin_mut() { - Some(b) => b.poll_trailers(cx), - None => Poll::Ready(Ok(None)), - } + fn size_hint(&self) -> http_body::SizeHint { + self.0.size_hint() } fn is_end_stream(&self) -> bool { - match &self.inner { - Some(b) => b.is_end_stream(), - None => true, - } + self.body.is_end_stream() } } From ff0b124c236c63f0950fdd830589e15aeb3cd1cb Mon Sep 17 00:00:00 2001 From: Ludea Date: Tue, 19 Dec 2023 06:50:59 +0000 Subject: [PATCH 12/12] Use TcpStream instead of hyper connect --- tonic/src/transport/server/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tonic/src/transport/server/mod.rs b/tonic/src/transport/server/mod.rs index c8bb74edd..9819661dd 100644 --- a/tonic/src/transport/server/mod.rs +++ b/tonic/src/transport/server/mod.rs @@ -43,7 +43,7 @@ use crate::body::BoxBody; use bytes::Bytes; use http::{Request, Response}; use http_body::Body as _; -use hyper::{server::accept, Body}; +use hyper::Body; use pin_project::pin_project; use std::{ convert::Infallible, @@ -57,6 +57,7 @@ use std::{ time::Duration, }; use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::TcpStream; use tokio_stream::Stream; use tower::{ layer::util::{Identity, Stack}, @@ -527,7 +528,7 @@ impl Server { let svc = self.service_builder.service(svc); let tcp = incoming::tcp_incoming(incoming, self); - let incoming = accept::from_stream::<_, _, crate::Error>(tcp); + let incoming = TcpStream::accept::from_stream::<_, _, crate::Error>(tcp); let svc = MakeSvc { inner: svc,