Skip to content

Commit

Permalink
Merge pull request #7 from lukipuki/error-handling
Browse files Browse the repository at this point in the history
Improve error handling
  • Loading branch information
ajmcquilkin authored Jan 23, 2024
2 parents e77d168 + 8e7fa0c commit 3a61a21
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 19 deletions.
30 changes: 18 additions & 12 deletions src/connections/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::errors_internal::{Error, InternalStreamError};
use crate::errors_internal::{Error, InternalChannelError, InternalStreamError};
use crate::protobufs;
use crate::types::EncodedToRadioPacketWithHeader;
use log::{debug, error, trace};
Expand All @@ -16,7 +16,7 @@ pub fn spawn_read_handler<R>(
cancellation_token: CancellationToken,
read_stream: R,
read_output_tx: UnboundedSender<IncomingStreamData>,
) -> JoinHandle<()>
) -> JoinHandle<Result<(), Error>>
where
R: AsyncReadExt + Send + Unpin + 'static,
{
Expand All @@ -27,9 +27,11 @@ where
tokio::select! {
_ = cancellation_token.cancelled() => {
debug!("Read handler cancelled");
Ok(())
}
e = handle => {
error!("Read handler unexpectedly terminated: {:#?}", e);
e
}
}
})
Expand Down Expand Up @@ -82,7 +84,7 @@ pub fn spawn_write_handler<W>(
cancellation_token: CancellationToken,
write_stream: W,
write_input_rx: tokio::sync::mpsc::UnboundedReceiver<EncodedToRadioPacketWithHeader>,
) -> JoinHandle<()>
) -> JoinHandle<Result<(), Error>>
where
W: AsyncWriteExt + Send + Unpin + 'static,
{
Expand All @@ -91,10 +93,14 @@ where
spawn(async move {
tokio::select! {
_ = cancellation_token.cancelled() => {
debug!("Write handler cancelled");
debug!("Write handler cancelled");
Ok(())
}
_ = handle => {
error!("Write handler unexpectedly terminated");
write_result = handle => {
if let Err(e) = &write_result {
error!("Write handler unexpectedly terminated {e:?}");
}
write_result
}
}
})
Expand Down Expand Up @@ -132,16 +138,18 @@ pub fn spawn_processing_handler(
cancellation_token: CancellationToken,
read_output_rx: UnboundedReceiver<IncomingStreamData>,
decoded_packet_tx: UnboundedSender<protobufs::FromRadio>,
) -> JoinHandle<()> {
) -> JoinHandle<Result<(), Error>> {
let handle = start_processing_handler(read_output_rx, decoded_packet_tx);

spawn(async move {
tokio::select! {
_ = cancellation_token.cancelled() => {
debug!("Message processing handler cancelled");
debug!("Message processing handler cancelled");
Ok(())
}
_ = handle => {
error!("Message processing handler unexpectedly terminated");
error!("Message processing handler unexpectedly terminated");
Err(Error::InternalChannelError(InternalChannelError::ChannelClosedEarly {}))
}
}
})
Expand All @@ -150,7 +158,7 @@ pub fn spawn_processing_handler(
async fn start_processing_handler(
mut read_output_rx: tokio::sync::mpsc::UnboundedReceiver<IncomingStreamData>,
decoded_packet_tx: UnboundedSender<protobufs::FromRadio>,
) -> Result<(), Error> {
) {
trace!("Started message processing handler");

let mut buffer = StreamBuffer::new(decoded_packet_tx);
Expand All @@ -161,6 +169,4 @@ async fn start_processing_handler(
}

trace!("Processing read_output_rx channel closed");

Ok(())
}
19 changes: 12 additions & 7 deletions src/connections/stream_api.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures_util::future::join3;
use log::trace;
use prost::Message;
use std::{fmt::Display, marker::PhantomData};
Expand Down Expand Up @@ -70,9 +71,9 @@ pub struct StreamApi;
pub struct ConnectedStreamApi<State = state::Configured> {
write_input_tx: UnboundedSender<EncodedToRadioPacketWithHeader>,

read_handle: JoinHandle<()>,
write_handle: JoinHandle<()>,
processing_handle: JoinHandle<()>,
read_handle: JoinHandle<Result<(), Error>>,
write_handle: JoinHandle<Result<(), Error>>,
processing_handle: JoinHandle<Result<(), Error>>,

cancellation_token: CancellationToken,

Expand Down Expand Up @@ -586,11 +587,15 @@ impl ConnectedStreamApi<state::Configured> {

// Close worker threads

self.read_handle.await?;
self.write_handle.await?;
self.processing_handle.await?;
let (read_result, write_result, processing_result) =
join3(self.read_handle, self.write_handle, self.processing_handle).await;

trace!("TCP handlers fully disconnected");
// Note: we only return the first error.
read_result??;
write_result??;
processing_result??;

trace!("Handlers fully disconnected");

Ok(StreamApi)
}
Expand Down
3 changes: 3 additions & 0 deletions src/errors_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ pub enum InternalChannelError {
/// An error indicating that the library failed to write to an internal data channel.
#[error(transparent)]
IncomingStreamDataWriteError(#[from] tokio::sync::mpsc::error::SendError<IncomingStreamData>),

#[error("Channel unexpectedly closed")]
ChannelClosedEarly,
}

#[derive(Error, Debug)]
Expand Down

0 comments on commit 3a61a21

Please sign in to comment.