diff --git a/src/connections/handlers.rs b/src/connections/handlers.rs index 10ed8bc..b1ea654 100644 --- a/src/connections/handlers.rs +++ b/src/connections/handlers.rs @@ -1,4 +1,4 @@ -use crate::errors_internal::{Error, InternalStreamError}; +use crate::errors_internal::{Error, InternalChannelError, InternalStreamError}; use crate::protobufs; use crate::types::EncodedToRadioPacketWithHeader; use log::{debug, error, trace}; @@ -16,7 +16,7 @@ pub fn spawn_read_handler( cancellation_token: CancellationToken, read_stream: R, read_output_tx: UnboundedSender, -) -> JoinHandle<()> +) -> JoinHandle> where R: AsyncReadExt + Send + Unpin + 'static, { @@ -27,9 +27,11 @@ where tokio::select! { _ = cancellation_token.cancelled() => { debug!("Read handler cancelled"); + Ok(()) } e = handle => { error!("Read handler unexpectedly terminated: {:#?}", e); + e } } }) @@ -82,7 +84,7 @@ pub fn spawn_write_handler( cancellation_token: CancellationToken, write_stream: W, write_input_rx: tokio::sync::mpsc::UnboundedReceiver, -) -> JoinHandle<()> +) -> JoinHandle> where W: AsyncWriteExt + Send + Unpin + 'static, { @@ -91,10 +93,14 @@ where spawn(async move { tokio::select! { _ = cancellation_token.cancelled() => { - debug!("Write handler cancelled"); + debug!("Write handler cancelled"); + Ok(()) } - _ = handle => { - error!("Write handler unexpectedly terminated"); + write_result = handle => { + if let Err(e) = &write_result { + error!("Write handler unexpectedly terminated {e:?}"); + } + write_result } } }) @@ -132,16 +138,18 @@ pub fn spawn_processing_handler( cancellation_token: CancellationToken, read_output_rx: UnboundedReceiver, decoded_packet_tx: UnboundedSender, -) -> JoinHandle<()> { +) -> JoinHandle> { let handle = start_processing_handler(read_output_rx, decoded_packet_tx); spawn(async move { tokio::select! { _ = cancellation_token.cancelled() => { - debug!("Message processing handler cancelled"); + debug!("Message processing handler cancelled"); + Ok(()) } _ = handle => { - error!("Message processing handler unexpectedly terminated"); + error!("Message processing handler unexpectedly terminated"); + Err(Error::InternalChannelError(InternalChannelError::ChannelClosedEarly {})) } } }) @@ -150,7 +158,7 @@ pub fn spawn_processing_handler( async fn start_processing_handler( mut read_output_rx: tokio::sync::mpsc::UnboundedReceiver, decoded_packet_tx: UnboundedSender, -) -> Result<(), Error> { +) { trace!("Started message processing handler"); let mut buffer = StreamBuffer::new(decoded_packet_tx); @@ -161,6 +169,4 @@ async fn start_processing_handler( } trace!("Processing read_output_rx channel closed"); - - Ok(()) } diff --git a/src/connections/stream_api.rs b/src/connections/stream_api.rs index 60343e8..75713db 100644 --- a/src/connections/stream_api.rs +++ b/src/connections/stream_api.rs @@ -1,3 +1,4 @@ +use futures_util::future::join3; use log::trace; use prost::Message; use std::{fmt::Display, marker::PhantomData}; @@ -70,9 +71,9 @@ pub struct StreamApi; pub struct ConnectedStreamApi { write_input_tx: UnboundedSender, - read_handle: JoinHandle<()>, - write_handle: JoinHandle<()>, - processing_handle: JoinHandle<()>, + read_handle: JoinHandle>, + write_handle: JoinHandle>, + processing_handle: JoinHandle>, cancellation_token: CancellationToken, @@ -586,11 +587,15 @@ impl ConnectedStreamApi { // Close worker threads - self.read_handle.await?; - self.write_handle.await?; - self.processing_handle.await?; + let (read_result, write_result, processing_result) = + join3(self.read_handle, self.write_handle, self.processing_handle).await; - trace!("TCP handlers fully disconnected"); + // Note: we only return the first error. + read_result??; + write_result??; + processing_result??; + + trace!("Handlers fully disconnected"); Ok(StreamApi) } diff --git a/src/errors_internal.rs b/src/errors_internal.rs index bc293fd..4fab3c8 100644 --- a/src/errors_internal.rs +++ b/src/errors_internal.rs @@ -86,6 +86,9 @@ pub enum InternalChannelError { /// An error indicating that the library failed to write to an internal data channel. #[error(transparent)] IncomingStreamDataWriteError(#[from] tokio::sync::mpsc::error::SendError), + + #[error("Channel unexpectedly closed")] + ChannelClosedEarly, } #[derive(Error, Debug)]