From 963d4c7ed286209398a696aa386e42fb2af9a2b0 Mon Sep 17 00:00:00 2001 From: tottoto Date: Sat, 18 Jan 2025 07:02:22 +0900 Subject: [PATCH] chore(reflection): Refactor ServerReflectionInfoStream --- tonic-reflection/src/server/v1.rs | 21 ++++++++++++++------- tonic-reflection/src/server/v1alpha.rs | 21 ++++++++++++++------- 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/tonic-reflection/src/server/v1.rs b/tonic-reflection/src/server/v1.rs index 92153e0c4..6a5054f99 100644 --- a/tonic-reflection/src/server/v1.rs +++ b/tonic-reflection/src/server/v1.rs @@ -1,7 +1,7 @@ use std::{fmt, sync::Arc}; use tokio::sync::mpsc; -use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; +use tokio_stream::{Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; use super::ReflectionServiceState; @@ -92,9 +92,7 @@ impl ServerReflection for ReflectionService { } }); - Ok(Response::new(ServerReflectionInfoStream( - ReceiverStream::new(resp_rx), - ))) + Ok(Response::new(ServerReflectionInfoStream::new(resp_rx))) } } @@ -107,7 +105,16 @@ impl From for ReflectionService { } /// A response stream. -pub struct ServerReflectionInfoStream(ReceiverStream>); +pub struct ServerReflectionInfoStream { + inner: tokio_stream::wrappers::ReceiverStream>, +} + +impl ServerReflectionInfoStream { + fn new(resp_rx: mpsc::Receiver>) -> Self { + let inner = tokio_stream::wrappers::ReceiverStream::new(resp_rx); + Self { inner } + } +} impl Stream for ServerReflectionInfoStream { type Item = Result; @@ -116,11 +123,11 @@ impl Stream for ServerReflectionInfoStream { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - std::pin::Pin::new(&mut self.0).poll_next(cx) + std::pin::Pin::new(&mut self.inner).poll_next(cx) } fn size_hint(&self) -> (usize, Option) { - self.0.size_hint() + self.inner.size_hint() } } diff --git a/tonic-reflection/src/server/v1alpha.rs b/tonic-reflection/src/server/v1alpha.rs index 9625f3cc6..b21d8d91d 100644 --- a/tonic-reflection/src/server/v1alpha.rs +++ b/tonic-reflection/src/server/v1alpha.rs @@ -1,7 +1,7 @@ use std::{fmt, sync::Arc}; use tokio::sync::mpsc; -use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; +use tokio_stream::{Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; use super::ReflectionServiceState; @@ -92,9 +92,7 @@ impl ServerReflection for ReflectionService { } }); - Ok(Response::new(ServerReflectionInfoStream( - ReceiverStream::new(resp_rx), - ))) + Ok(Response::new(ServerReflectionInfoStream::new(resp_rx))) } } @@ -107,7 +105,16 @@ impl From for ReflectionService { } /// A response stream. -pub struct ServerReflectionInfoStream(ReceiverStream>); +pub struct ServerReflectionInfoStream { + inner: tokio_stream::wrappers::ReceiverStream>, +} + +impl ServerReflectionInfoStream { + fn new(resp_rx: mpsc::Receiver>) -> Self { + let inner = tokio_stream::wrappers::ReceiverStream::new(resp_rx); + Self { inner } + } +} impl Stream for ServerReflectionInfoStream { type Item = Result; @@ -116,11 +123,11 @@ impl Stream for ServerReflectionInfoStream { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - std::pin::Pin::new(&mut self.0).poll_next(cx) + std::pin::Pin::new(&mut self.inner).poll_next(cx) } fn size_hint(&self) -> (usize, Option) { - self.0.size_hint() + self.inner.size_hint() } }