From 9a2cfe8e77da780c6ffa3cbdac070c8bf815c951 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Fri, 2 Aug 2024 18:53:07 +0200 Subject: [PATCH 1/8] Add support for mapped connections from ports --- asynchronix/src/ports/output.rs | 82 ++++++- asynchronix/src/ports/output/sender.rs | 282 ++++++++++++++++++++++++- 2 files changed, 360 insertions(+), 4 deletions(-) diff --git a/asynchronix/src/ports/output.rs b/asynchronix/src/ports/output.rs index d5599fd..9d88269 100644 --- a/asynchronix/src/ports/output.rs +++ b/asynchronix/src/ports/output.rs @@ -11,7 +11,10 @@ use crate::util::cached_rw_lock::CachedRwLock; use broadcaster::{EventBroadcaster, QueryBroadcaster}; -use self::sender::{EventSinkSender, InputSender, ReplierSender}; +use self::sender::{ + EventSinkSender, InputSender, MapEventSinkSender, MapInputSender, MapReplierSender, + ReplierSender, +}; /// An output port. /// @@ -57,6 +60,48 @@ impl Output { self.broadcaster.write().unwrap().add(sender) } + /// Adds an auto-converting connection to an input port of the model + /// specified by the address. + /// + /// Events are mapped to the type expected by the input, using the closure + /// provided in argument. + /// + /// The input port must be an asynchronous method of a model of type `M` + /// taking as argument a value of the type returned by the mapping + /// closure plus, optionally, a scheduler reference. + pub fn map_connect( + &mut self, + map: C, + input: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> U + Send + Sync + 'static, + F: for<'a> InputFn<'a, M, U, S> + Clone, + U: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(MapInputSender::new(map, input, address.into().0)); + self.broadcaster.write().unwrap().add(sender) + } + + /// Adds an auto-converting connection to an event sink such as an + /// [`EventSlot`](crate::ports::EventSlot) or + /// [`EventBuffer`](crate::ports::EventBuffer). + /// + /// Events are mapped to the type expected by the sink, using the closure + /// provided in argument. + pub fn map_connect_sink(&mut self, map: C, sink: &S) -> LineId + where + C: Fn(T) -> U + Send + Sync + 'static, + U: Send + 'static, + S: EventSink, + { + let sender = Box::new(MapEventSinkSender::new(map, sink.writer())); + self.broadcaster.write().unwrap().add(sender) + } + /// Removes the connection specified by the `LineId` parameter. /// /// It is a logic error to specify a line identifier from another @@ -136,6 +181,41 @@ impl Requestor { self.broadcaster.write().unwrap().add(sender) } + /// Adds an auto-converting connection to a replier port of the model + /// specified by the address. + /// + /// Queries and replies are mapped to the types expected by the replier port, + /// using the closures provided in argument. + /// + /// The replier port must be an asynchronous method of a model of type `M` + /// returning a value of the type returned by the second mapping closure and + /// taking as argument a value of the type returned by the first mapping + /// closure plus, optionally, a scheduler reference. + pub fn map_connect( + &mut self, + query_map: C, + reply_map: D, + replier: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> U + Send + Sync + 'static, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + U: Send + 'static, + Q: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(MapReplierSender::new( + query_map, + reply_map, + replier, + address.into().0, + )); + self.broadcaster.write().unwrap().add(sender) + } + /// Removes the connection specified by the `LineId` parameter. /// /// It is a logic error to specify a line identifier from another diff --git a/asynchronix/src/ports/output/sender.rs b/asynchronix/src/ports/output/sender.rs index 1c9ab02..48dda09 100644 --- a/asynchronix/src/ports/output/sender.rs +++ b/asynchronix/src/ports/output/sender.rs @@ -4,6 +4,7 @@ use std::future::Future; use std::marker::PhantomData; use std::mem::ManuallyDrop; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use dyn_clone::DynClone; @@ -93,7 +94,93 @@ where } } -/// An object that can send a request to a replier port and retrieve a response. +/// An object that can send mapped events to an input port. +pub(super) struct MapInputSender +where + M: Model, + C: Fn(T) -> U, + F: for<'a> InputFn<'a, M, U, S>, + T: Send + 'static, + U: Send + 'static, +{ + map: Arc, + func: F, + sender: channel::Sender, + fut_storage: Option>, + _phantom_map: PhantomData U>, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, +} + +impl MapInputSender +where + M: Model, + C: Fn(T) -> U, + F: for<'a> InputFn<'a, M, U, S>, + T: Send + 'static, + U: Send + 'static, +{ + pub(super) fn new(map: C, func: F, sender: channel::Sender) -> Self { + Self { + map: Arc::new(map), + func, + sender, + fut_storage: None, + _phantom_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for MapInputSender +where + M: Model, + C: Fn(T) -> U + Send + Sync, + F: for<'a> InputFn<'a, M, U, S> + Clone, + T: Send + 'static, + U: Send + 'static, + S: Send + 'static, +{ + fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + let func = self.func.clone(); + let arg = (self.map)(arg); + + let fut = self.sender.send(move |model, scheduler, recycle_box| { + let fut = func.call(model, arg, scheduler); + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }); + + RecycledFuture::new(&mut self.fut_storage, async move { + fut.await.map_err(|_| SendError {}) + }) + } +} + +impl Clone for MapInputSender +where + M: Model, + C: Fn(T) -> U, + F: for<'a> InputFn<'a, M, U, S> + Clone, + T: Send + 'static, + U: Send + 'static, + S: Send + 'static, +{ + fn clone(&self) -> Self { + Self { + map: self.map.clone(), + func: self.func.clone(), + sender: self.sender.clone(), + fut_storage: None, + _phantom_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +/// An object that can send requests to a replier port and retrieve responses. pub(super) struct ReplierSender { func: F, sender: channel::Sender, @@ -181,8 +268,130 @@ where } } -/// An object that can send a payload to an event sink. -pub(super) struct EventSinkSender> { +/// An object that can send mapped requests to a replier port and retrieve +/// mapped responses. +pub(super) struct MapReplierSender { + query_map: Arc, + reply_map: Arc, + func: F, + sender: channel::Sender, + receiver: multishot::Receiver, + fut_storage: Option>, + _phantom_query_map: PhantomData U>, + _phantom_reply_map: PhantomData R>, + _phantom_closure: PhantomData Q>, + _phantom_closure_marker: PhantomData, +} + +impl MapReplierSender +where + M: Model, + C: Fn(T) -> U, + D: Fn(Q) -> R, + F: for<'a> ReplierFn<'a, M, U, Q, S>, + T: Send + 'static, + R: Send + 'static, + U: Send + 'static, + Q: Send + 'static, +{ + pub(super) fn new(query_map: C, reply_map: D, func: F, sender: channel::Sender) -> Self { + Self { + query_map: Arc::new(query_map), + reply_map: Arc::new(reply_map), + func, + sender, + receiver: multishot::Receiver::new(), + fut_storage: None, + _phantom_query_map: PhantomData, + _phantom_reply_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for MapReplierSender +where + M: Model, + C: Fn(T) -> U + Send + Sync, + D: Fn(Q) -> R + Send + Sync, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + T: Send + 'static, + R: Send + 'static, + U: Send + 'static, + Q: Send + 'static, + S: Send, +{ + fn send(&mut self, arg: T) -> RecycledFuture<'_, Result> { + let func = self.func.clone(); + let arg = (self.query_map)(arg); + let sender = &mut self.sender; + let reply_receiver = &mut self.receiver; + let fut_storage = &mut self.fut_storage; + let reply_map = &*self.reply_map; + + // The previous future generated by this method should have been polled + // to completion so a new sender should be readily available. + let reply_sender = reply_receiver.sender().unwrap(); + + let send_fut = sender.send(move |model, scheduler, recycle_box| { + let fut = async move { + let reply = func.call(model, arg, scheduler).await; + reply_sender.send(reply); + }; + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }); + + RecycledFuture::new(fut_storage, async move { + // Send the message. + send_fut.await.map_err(|_| SendError {})?; + + // Wait until the message is processed and the reply is sent back. + // If an error is received, it most likely means the mailbox was + // dropped before the message was processed. + reply_receiver + .recv() + .await + .map_err(|_| SendError {}) + .map(reply_map) + }) + } +} + +impl Clone for MapReplierSender +where + M: Model, + C: Fn(T) -> U, + D: Fn(Q) -> R, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + T: Send + 'static, + R: Send + 'static, + U: Send + 'static, + Q: Send + 'static, +{ + fn clone(&self) -> Self { + Self { + query_map: self.query_map.clone(), + reply_map: self.reply_map.clone(), + func: self.func.clone(), + sender: self.sender.clone(), + receiver: multishot::Receiver::new(), + fut_storage: None, + _phantom_query_map: PhantomData, + _phantom_reply_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +/// An object that can send an event to an event sink. +pub(super) struct EventSinkSender> +where + T: Send + 'static, + W: EventSinkWriter, +{ writer: W, fut_storage: Option>, _phantom_event: PhantomData, @@ -228,6 +437,73 @@ where } } +/// An object that can send mapped events to an event sink. +pub(super) struct MapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> U, + W: EventSinkWriter, +{ + writer: W, + map: Arc, + fut_storage: Option>, + _phantom_event: PhantomData, +} + +impl MapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> U, + W: EventSinkWriter, +{ + pub(super) fn new(map: C, writer: W) -> Self { + Self { + writer, + map: Arc::new(map), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + +impl Sender for MapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> U + Send + Sync, + W: EventSinkWriter, +{ + fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + let writer = &mut self.writer; + let arg = (self.map)(arg); + + RecycledFuture::new(&mut self.fut_storage, async move { + writer.write(arg); + + Ok(()) + }) + } +} + +impl Clone for MapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> U + Send + Sync, + W: EventSinkWriter, +{ + fn clone(&self) -> Self { + Self { + writer: self.writer.clone(), + map: self.map.clone(), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + /// Error returned when the mailbox was closed or dropped. #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(super) struct SendError {} From 0ec781e18bb72ce7319c0aa2ec790b009d6df4ca Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Sat, 3 Aug 2024 11:47:57 +0200 Subject: [PATCH 2/8] Add filter_map variants for output port connection --- asynchronix/src/ports/output.rs | 62 +++- asynchronix/src/ports/output/sender.rs | 397 +++++++++++++++++-------- 2 files changed, 332 insertions(+), 127 deletions(-) diff --git a/asynchronix/src/ports/output.rs b/asynchronix/src/ports/output.rs index 9d88269..c98d60f 100644 --- a/asynchronix/src/ports/output.rs +++ b/asynchronix/src/ports/output.rs @@ -12,8 +12,8 @@ use crate::util::cached_rw_lock::CachedRwLock; use broadcaster::{EventBroadcaster, QueryBroadcaster}; use self::sender::{ - EventSinkSender, InputSender, MapEventSinkSender, MapInputSender, MapReplierSender, - ReplierSender, + EventSinkSender, FilterMapEventSinkSender, FilterMapInputSender, InputSender, + MapEventSinkSender, MapInputSender, MapReplierSender, ReplierSender, }; /// An output port. @@ -63,8 +63,8 @@ impl Output { /// Adds an auto-converting connection to an input port of the model /// specified by the address. /// - /// Events are mapped to the type expected by the input, using the closure - /// provided in argument. + /// Events are mapped to another type using the closure provided in + /// argument. /// /// The input port must be an asynchronous method of a model of type `M` /// taking as argument a value of the type returned by the mapping @@ -90,8 +90,8 @@ impl Output { /// [`EventSlot`](crate::ports::EventSlot) or /// [`EventBuffer`](crate::ports::EventBuffer). /// - /// Events are mapped to the type expected by the sink, using the closure - /// provided in argument. + /// Events are mapped to another type using the closure provided in + /// argument. pub fn map_connect_sink(&mut self, map: C, sink: &S) -> LineId where C: Fn(T) -> U + Send + Sync + 'static, @@ -102,6 +102,52 @@ impl Output { self.broadcaster.write().unwrap().add(sender) } + /// Adds an auto-converting, filtered connection to an input port of the + /// model specified by the address. + /// + /// Events are mapped to another type using the closure provided in + /// argument, or ignored if the closure returns `None`. + /// + /// The input port must be an asynchronous method of a model of type `M` + /// taking as argument a value of the type returned by the mapping + /// closure plus, optionally, a scheduler reference. + pub fn filter_map_connect( + &mut self, + filter_map: C, + input: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> Option + Send + Sync + 'static, + F: for<'a> InputFn<'a, M, U, S> + Clone, + U: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(FilterMapInputSender::new( + filter_map, + input, + address.into().0, + )); + self.broadcaster.write().unwrap().add(sender) + } + + /// Adds an auto-converting connection to an event sink such as an + /// [`EventSlot`](crate::ports::EventSlot) or + /// [`EventBuffer`](crate::ports::EventBuffer). + /// + /// Events are mapped to another type using the closure provided in + /// argument. + pub fn filter_map_connect_sink(&mut self, filter_map: C, sink: &S) -> LineId + where + C: Fn(T) -> Option + Send + Sync + 'static, + U: Send + 'static, + S: EventSink, + { + let sender = Box::new(FilterMapEventSinkSender::new(filter_map, sink.writer())); + self.broadcaster.write().unwrap().add(sender) + } + /// Removes the connection specified by the `LineId` parameter. /// /// It is a logic error to specify a line identifier from another @@ -184,8 +230,8 @@ impl Requestor { /// Adds an auto-converting connection to a replier port of the model /// specified by the address. /// - /// Queries and replies are mapped to the types expected by the replier port, - /// using the closures provided in argument. + /// Queries and replies are mapped to other types using the closures + /// provided in argument. /// /// The replier port must be an asynchronous method of a model of type `M` /// returning a value of the type returned by the second mapping closure and diff --git a/asynchronix/src/ports/output/sender.rs b/asynchronix/src/ports/output/sender.rs index 48dda09..66b45e5 100644 --- a/asynchronix/src/ports/output/sender.rs +++ b/asynchronix/src/ports/output/sender.rs @@ -1,6 +1,6 @@ use std::error::Error; use std::fmt; -use std::future::Future; +use std::future::{ready, Future}; use std::marker::PhantomData; use std::mem::ManuallyDrop; use std::pin::Pin; @@ -180,6 +180,283 @@ where } } +/// An object that can filter and send mapped events to an input port. +pub(super) struct FilterMapInputSender +where + M: Model, + C: Fn(T) -> Option, + F: for<'a> InputFn<'a, M, U, S>, + T: Send + 'static, + U: Send + 'static, +{ + filter_map: Arc, + func: F, + sender: channel::Sender, + fut_storage: Option>, + _phantom_map: PhantomData U>, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, +} + +impl FilterMapInputSender +where + M: Model, + C: Fn(T) -> Option, + F: for<'a> InputFn<'a, M, U, S>, + T: Send + 'static, + U: Send + 'static, +{ + pub(super) fn new(filter_map: C, func: F, sender: channel::Sender) -> Self { + Self { + filter_map: Arc::new(filter_map), + func, + sender, + fut_storage: None, + _phantom_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for FilterMapInputSender +where + M: Model, + C: Fn(T) -> Option + Send + Sync, + F: for<'a> InputFn<'a, M, U, S> + Clone, + T: Send + 'static, + U: Send + 'static, + S: Send + 'static, +{ + fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + let func = self.func.clone(); + + match (self.filter_map)(arg) { + Some(arg) => { + let fut = self.sender.send(move |model, scheduler, recycle_box| { + let fut = func.call(model, arg, scheduler); + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }); + + RecycledFuture::new(&mut self.fut_storage, async move { + fut.await.map_err(|_| SendError {}) + }) + } + None => RecycledFuture::new(&mut self.fut_storage, ready(Ok(()))), + } + } +} + +impl Clone for FilterMapInputSender +where + M: Model, + C: Fn(T) -> Option, + F: for<'a> InputFn<'a, M, U, S> + Clone, + T: Send + 'static, + U: Send + 'static, + S: Send + 'static, +{ + fn clone(&self) -> Self { + Self { + filter_map: self.filter_map.clone(), + func: self.func.clone(), + sender: self.sender.clone(), + fut_storage: None, + _phantom_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +/// An object that can send an event to an event sink. +pub(super) struct EventSinkSender> +where + T: Send + 'static, + W: EventSinkWriter, +{ + writer: W, + fut_storage: Option>, + _phantom_event: PhantomData, +} + +impl> EventSinkSender { + pub(super) fn new(writer: W) -> Self { + Self { + writer, + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + +impl Sender for EventSinkSender +where + T: Send + 'static, + W: EventSinkWriter, +{ + fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + let writer = &mut self.writer; + + RecycledFuture::new(&mut self.fut_storage, async move { + writer.write(arg); + + Ok(()) + }) + } +} + +impl Clone for EventSinkSender +where + T: Send + 'static, + W: EventSinkWriter, +{ + fn clone(&self) -> Self { + Self { + writer: self.writer.clone(), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + +/// An object that can send mapped events to an event sink. +pub(super) struct MapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> U, + W: EventSinkWriter, +{ + writer: W, + map: Arc, + fut_storage: Option>, + _phantom_event: PhantomData, +} + +impl MapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> U, + W: EventSinkWriter, +{ + pub(super) fn new(map: C, writer: W) -> Self { + Self { + writer, + map: Arc::new(map), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + +impl Sender for MapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> U + Send + Sync, + W: EventSinkWriter, +{ + fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + let writer = &mut self.writer; + let arg = (self.map)(arg); + + RecycledFuture::new(&mut self.fut_storage, async move { + writer.write(arg); + + Ok(()) + }) + } +} + +impl Clone for MapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> U + Send + Sync, + W: EventSinkWriter, +{ + fn clone(&self) -> Self { + Self { + writer: self.writer.clone(), + map: self.map.clone(), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + +/// An object that can filter and send mapped events to an event sink. +pub(super) struct FilterMapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> Option, + W: EventSinkWriter, +{ + writer: W, + filter_map: Arc, + fut_storage: Option>, + _phantom_event: PhantomData, +} + +impl FilterMapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> Option, + W: EventSinkWriter, +{ + pub(super) fn new(filter_map: C, writer: W) -> Self { + Self { + writer, + filter_map: Arc::new(filter_map), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + +impl Sender for FilterMapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> Option + Send + Sync, + W: EventSinkWriter, +{ + fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + let writer = &mut self.writer; + + match (self.filter_map)(arg) { + Some(arg) => RecycledFuture::new(&mut self.fut_storage, async move { + writer.write(arg); + + Ok(()) + }), + None => RecycledFuture::new(&mut self.fut_storage, ready(Ok(()))), + } + } +} + +impl Clone for FilterMapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> Option + Send + Sync, + W: EventSinkWriter, +{ + fn clone(&self) -> Self { + Self { + writer: self.writer.clone(), + filter_map: self.filter_map.clone(), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + /// An object that can send requests to a replier port and retrieve responses. pub(super) struct ReplierSender { func: F, @@ -386,124 +663,6 @@ where } } -/// An object that can send an event to an event sink. -pub(super) struct EventSinkSender> -where - T: Send + 'static, - W: EventSinkWriter, -{ - writer: W, - fut_storage: Option>, - _phantom_event: PhantomData, -} - -impl> EventSinkSender { - pub(super) fn new(writer: W) -> Self { - Self { - writer, - fut_storage: None, - _phantom_event: PhantomData, - } - } -} - -impl Sender for EventSinkSender -where - T: Send + 'static, - W: EventSinkWriter, -{ - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { - let writer = &mut self.writer; - - RecycledFuture::new(&mut self.fut_storage, async move { - writer.write(arg); - - Ok(()) - }) - } -} - -impl Clone for EventSinkSender -where - T: Send + 'static, - W: EventSinkWriter, -{ - fn clone(&self) -> Self { - Self { - writer: self.writer.clone(), - fut_storage: None, - _phantom_event: PhantomData, - } - } -} - -/// An object that can send mapped events to an event sink. -pub(super) struct MapEventSinkSender -where - T: Send + 'static, - U: Send + 'static, - C: Fn(T) -> U, - W: EventSinkWriter, -{ - writer: W, - map: Arc, - fut_storage: Option>, - _phantom_event: PhantomData, -} - -impl MapEventSinkSender -where - T: Send + 'static, - U: Send + 'static, - C: Fn(T) -> U, - W: EventSinkWriter, -{ - pub(super) fn new(map: C, writer: W) -> Self { - Self { - writer, - map: Arc::new(map), - fut_storage: None, - _phantom_event: PhantomData, - } - } -} - -impl Sender for MapEventSinkSender -where - T: Send + 'static, - U: Send + 'static, - C: Fn(T) -> U + Send + Sync, - W: EventSinkWriter, -{ - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { - let writer = &mut self.writer; - let arg = (self.map)(arg); - - RecycledFuture::new(&mut self.fut_storage, async move { - writer.write(arg); - - Ok(()) - }) - } -} - -impl Clone for MapEventSinkSender -where - T: Send + 'static, - U: Send + 'static, - C: Fn(T) -> U + Send + Sync, - W: EventSinkWriter, -{ - fn clone(&self) -> Self { - Self { - writer: self.writer.clone(), - map: self.map.clone(), - fut_storage: None, - _phantom_event: PhantomData, - } - } -} - /// Error returned when the mailbox was closed or dropped. #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(super) struct SendError {} From 3527d62b41605c200a1cc2f5ab472f02c123eb02 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Sat, 3 Aug 2024 13:09:36 +0200 Subject: [PATCH 3/8] Remove unnecessary trait bounds + improve doc --- asynchronix/src/ports/output.rs | 12 +- asynchronix/src/ports/output/sender.rs | 161 ++++++++----------------- 2 files changed, 53 insertions(+), 120 deletions(-) diff --git a/asynchronix/src/ports/output.rs b/asynchronix/src/ports/output.rs index c98d60f..95d982b 100644 --- a/asynchronix/src/ports/output.rs +++ b/asynchronix/src/ports/output.rs @@ -68,7 +68,7 @@ impl Output { /// /// The input port must be an asynchronous method of a model of type `M` /// taking as argument a value of the type returned by the mapping - /// closure plus, optionally, a scheduler reference. + /// closure plus, optionally, a context reference. pub fn map_connect( &mut self, map: C, @@ -110,7 +110,7 @@ impl Output { /// /// The input port must be an asynchronous method of a model of type `M` /// taking as argument a value of the type returned by the mapping - /// closure plus, optionally, a scheduler reference. + /// closure plus, optionally, a context reference. pub fn filter_map_connect( &mut self, filter_map: C, @@ -216,7 +216,7 @@ impl Requestor { /// /// The replier port must be an asynchronous method of a model of type `M` /// returning a value of type `R` and taking as argument a value of type `T` - /// plus, optionally, a scheduler reference. + /// plus, optionally, a context reference. pub fn connect(&mut self, replier: F, address: impl Into>) -> LineId where M: Model, @@ -234,9 +234,9 @@ impl Requestor { /// provided in argument. /// /// The replier port must be an asynchronous method of a model of type `M` - /// returning a value of the type returned by the second mapping closure and - /// taking as argument a value of the type returned by the first mapping - /// closure plus, optionally, a scheduler reference. + /// returning a value of the type returned by the reply mapping closure and + /// taking as argument a value of the type returned by the query mapping + /// closure plus, optionally, a context reference. pub fn map_connect( &mut self, query_map: C, diff --git a/asynchronix/src/ports/output/sender.rs b/asynchronix/src/ports/output/sender.rs index 66b45e5..5b285cc 100644 --- a/asynchronix/src/ports/output/sender.rs +++ b/asynchronix/src/ports/output/sender.rs @@ -24,11 +24,9 @@ pub(super) trait Sender: DynClone + Send { dyn_clone::clone_trait_object!( Sender); /// An object that can send events to an input port. -pub(super) struct InputSender +pub(super) struct InputSender where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + 'static, + M: 'static, { func: F, sender: channel::Sender, @@ -37,11 +35,9 @@ where _phantom_closure_marker: PhantomData, } -impl InputSender +impl InputSender where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + 'static, + M: 'static, { pub(super) fn new(func: F, sender: channel::Sender) -> Self { Self { @@ -54,12 +50,12 @@ where } } -impl Sender for InputSender +impl Sender for InputSender where M: Model, F: for<'a> InputFn<'a, M, T, S> + Clone, T: Send + 'static, - S: Send + 'static, + S: Send, { fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { let func = self.func.clone(); @@ -76,12 +72,10 @@ where } } -impl Clone for InputSender +impl Clone for InputSender where - M: Model, - F: for<'a> InputFn<'a, M, T, S> + Clone, - T: Send + 'static, - S: Send + 'static, + M: 'static, + F: Clone, { fn clone(&self) -> Self { Self { @@ -95,13 +89,9 @@ where } /// An object that can send mapped events to an input port. -pub(super) struct MapInputSender +pub(super) struct MapInputSender where - M: Model, - C: Fn(T) -> U, - F: for<'a> InputFn<'a, M, U, S>, - T: Send + 'static, - U: Send + 'static, + M: 'static, { map: Arc, func: F, @@ -112,13 +102,9 @@ where _phantom_closure_marker: PhantomData, } -impl MapInputSender +impl MapInputSender where - M: Model, - C: Fn(T) -> U, - F: for<'a> InputFn<'a, M, U, S>, - T: Send + 'static, - U: Send + 'static, + M: 'static, { pub(super) fn new(map: C, func: F, sender: channel::Sender) -> Self { Self { @@ -133,14 +119,14 @@ where } } -impl Sender for MapInputSender +impl Sender for MapInputSender where M: Model, C: Fn(T) -> U + Send + Sync, F: for<'a> InputFn<'a, M, U, S> + Clone, T: Send + 'static, U: Send + 'static, - S: Send + 'static, + S: Send, { fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { let func = self.func.clone(); @@ -158,14 +144,10 @@ where } } -impl Clone for MapInputSender +impl Clone for MapInputSender where - M: Model, - C: Fn(T) -> U, - F: for<'a> InputFn<'a, M, U, S> + Clone, - T: Send + 'static, - U: Send + 'static, - S: Send + 'static, + M: 'static, + F: Clone, { fn clone(&self) -> Self { Self { @@ -181,30 +163,22 @@ where } /// An object that can filter and send mapped events to an input port. -pub(super) struct FilterMapInputSender +pub(super) struct FilterMapInputSender where - M: Model, - C: Fn(T) -> Option, - F: for<'a> InputFn<'a, M, U, S>, - T: Send + 'static, - U: Send + 'static, + M: 'static, { filter_map: Arc, func: F, sender: channel::Sender, fut_storage: Option>, - _phantom_map: PhantomData U>, + _phantom_filter_map: PhantomData Option>, _phantom_closure: PhantomData, _phantom_closure_marker: PhantomData, } -impl FilterMapInputSender +impl FilterMapInputSender where - M: Model, - C: Fn(T) -> Option, - F: for<'a> InputFn<'a, M, U, S>, - T: Send + 'static, - U: Send + 'static, + M: 'static, { pub(super) fn new(filter_map: C, func: F, sender: channel::Sender) -> Self { Self { @@ -212,21 +186,21 @@ where func, sender, fut_storage: None, - _phantom_map: PhantomData, + _phantom_filter_map: PhantomData, _phantom_closure: PhantomData, _phantom_closure_marker: PhantomData, } } } -impl Sender for FilterMapInputSender +impl Sender for FilterMapInputSender where M: Model, C: Fn(T) -> Option + Send + Sync, F: for<'a> InputFn<'a, M, U, S> + Clone, T: Send + 'static, U: Send + 'static, - S: Send + 'static, + S: Send, { fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { let func = self.func.clone(); @@ -248,14 +222,10 @@ where } } -impl Clone for FilterMapInputSender +impl Clone for FilterMapInputSender where - M: Model, - C: Fn(T) -> Option, - F: for<'a> InputFn<'a, M, U, S> + Clone, - T: Send + 'static, - U: Send + 'static, - S: Send + 'static, + M: 'static, + F: Clone, { fn clone(&self) -> Self { Self { @@ -263,7 +233,7 @@ where func: self.func.clone(), sender: self.sender.clone(), fut_storage: None, - _phantom_map: PhantomData, + _phantom_filter_map: PhantomData, _phantom_closure: PhantomData, _phantom_closure_marker: PhantomData, } @@ -271,17 +241,13 @@ where } /// An object that can send an event to an event sink. -pub(super) struct EventSinkSender> -where - T: Send + 'static, - W: EventSinkWriter, -{ +pub(super) struct EventSinkSender { writer: W, fut_storage: Option>, _phantom_event: PhantomData, } -impl> EventSinkSender { +impl EventSinkSender { pub(super) fn new(writer: W) -> Self { Self { writer, @@ -307,11 +273,7 @@ where } } -impl Clone for EventSinkSender -where - T: Send + 'static, - W: EventSinkWriter, -{ +impl Clone for EventSinkSender { fn clone(&self) -> Self { Self { writer: self.writer.clone(), @@ -324,10 +286,7 @@ where /// An object that can send mapped events to an event sink. pub(super) struct MapEventSinkSender where - T: Send + 'static, - U: Send + 'static, C: Fn(T) -> U, - W: EventSinkWriter, { writer: W, map: Arc, @@ -337,10 +296,7 @@ where impl MapEventSinkSender where - T: Send + 'static, - U: Send + 'static, C: Fn(T) -> U, - W: EventSinkWriter, { pub(super) fn new(map: C, writer: W) -> Self { Self { @@ -373,10 +329,8 @@ where impl Clone for MapEventSinkSender where - T: Send + 'static, - U: Send + 'static, - C: Fn(T) -> U + Send + Sync, - W: EventSinkWriter, + C: Fn(T) -> U, + W: Clone, { fn clone(&self) -> Self { Self { @@ -391,10 +345,7 @@ where /// An object that can filter and send mapped events to an event sink. pub(super) struct FilterMapEventSinkSender where - T: Send + 'static, - U: Send + 'static, C: Fn(T) -> Option, - W: EventSinkWriter, { writer: W, filter_map: Arc, @@ -404,10 +355,7 @@ where impl FilterMapEventSinkSender where - T: Send + 'static, - U: Send + 'static, C: Fn(T) -> Option, - W: EventSinkWriter, { pub(super) fn new(filter_map: C, writer: W) -> Self { Self { @@ -442,10 +390,8 @@ where impl Clone for FilterMapEventSinkSender where - T: Send + 'static, - U: Send + 'static, - C: Fn(T) -> Option + Send + Sync, - W: EventSinkWriter, + C: Fn(T) -> Option, + W: Clone, { fn clone(&self) -> Self { Self { @@ -458,7 +404,10 @@ where } /// An object that can send requests to a replier port and retrieve responses. -pub(super) struct ReplierSender { +pub(super) struct ReplierSender +where + M: Model, +{ func: F, sender: channel::Sender, receiver: multishot::Receiver, @@ -470,9 +419,6 @@ pub(super) struct ReplierSender { impl ReplierSender where M: Model, - F: for<'a> ReplierFn<'a, M, T, R, S>, - T: Send + 'static, - R: Send + 'static, { pub(super) fn new(func: F, sender: channel::Sender) -> Self { Self { @@ -528,10 +474,7 @@ where impl Clone for ReplierSender where M: Model, - F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, - T: Send + 'static, - R: Send + 'static, - S: Send, + F: Clone, { fn clone(&self) -> Self { Self { @@ -547,7 +490,10 @@ where /// An object that can send mapped requests to a replier port and retrieve /// mapped responses. -pub(super) struct MapReplierSender { +pub(super) struct MapReplierSender +where + M: Model, +{ query_map: Arc, reply_map: Arc, func: F, @@ -563,13 +509,6 @@ pub(super) struct MapReplierSender { impl MapReplierSender where M: Model, - C: Fn(T) -> U, - D: Fn(Q) -> R, - F: for<'a> ReplierFn<'a, M, U, Q, S>, - T: Send + 'static, - R: Send + 'static, - U: Send + 'static, - Q: Send + 'static, { pub(super) fn new(query_map: C, reply_map: D, func: F, sender: channel::Sender) -> Self { Self { @@ -639,13 +578,7 @@ where impl Clone for MapReplierSender where M: Model, - C: Fn(T) -> U, - D: Fn(Q) -> R, - F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, - T: Send + 'static, - R: Send + 'static, - U: Send + 'static, - Q: Send + 'static, + F: Clone, { fn clone(&self) -> Self { Self { From 7f244d233409c3260420312267c9612a38881c6b Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Sat, 3 Aug 2024 19:43:10 +0200 Subject: [PATCH 4/8] Add map/filter_map variants for source connection --- asynchronix/src/ports/source.rs | 132 ++++++++- asynchronix/src/ports/source/broadcaster.rs | 102 +++++-- asynchronix/src/ports/source/sender.rs | 303 ++++++++++++++++++-- 3 files changed, 489 insertions(+), 48 deletions(-) diff --git a/asynchronix/src/ports/source.rs b/asynchronix/src/ports/source.rs index 7c8ce3e..08398b9 100644 --- a/asynchronix/src/ports/source.rs +++ b/asynchronix/src/ports/source.rs @@ -13,9 +13,11 @@ use crate::simulation::{ }; use crate::util::slot; -use broadcaster::ReplyIterator; -use broadcaster::{EventBroadcaster, QueryBroadcaster}; -use sender::{InputSender, ReplierSender}; +use broadcaster::{EventBroadcaster, QueryBroadcaster, ReplyIterator}; +use sender::{ + FilterMapInputSender, FilterMapReplierSender, InputSender, MapInputSender, MapReplierSender, + ReplierSender, +}; use super::ReplierFn; @@ -51,6 +53,58 @@ impl EventSource { self.broadcaster.lock().unwrap().add(sender) } + /// Adds an auto-converting connection to an input port of the model + /// specified by the address. + /// + /// Events are mapped to another type using the closure provided in + /// argument. + /// + /// The input port must be an asynchronous method of a model of type `M` + /// taking as argument a value of the type returned by the mapping closure + /// plus, optionally, a context reference. + pub fn map_connect( + &mut self, + map: C, + input: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> U + Send + 'static, + F: for<'a> InputFn<'a, M, U, S> + Clone, + U: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(MapInputSender::new(map, input, address.into().0)); + self.broadcaster.lock().unwrap().add(sender) + } + + /// Adds an auto-converting, filtered connection to an input port of the + /// model specified by the address. + /// + /// Events are mapped to another type using the closure provided in + /// argument, or ignored if the closure returns `None`. + /// + /// The input port must be an asynchronous method of a model of type `M` + /// taking as argument a value of the type returned by the mapping closure + /// plus, optionally, a context reference. + pub fn filter_map_connect( + &mut self, + map: C, + input: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> Option + Send + 'static, + F: for<'a> InputFn<'a, M, U, S> + Clone, + U: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(FilterMapInputSender::new(map, input, address.into().0)); + self.broadcaster.lock().unwrap().add(sender) + } + /// Removes the connection specified by the `LineId` parameter. /// /// It is a logic error to specify a line identifier from another @@ -193,7 +247,7 @@ impl QuerySource { /// /// The replier port must be an asynchronous method of a model of type `M` /// returning a value of type `R` and taking as argument a value of type `T` - /// plus, optionally, a scheduler reference. + /// plus, optionally, a context reference. pub fn connect(&mut self, replier: F, address: impl Into>) -> LineId where M: Model, @@ -204,6 +258,76 @@ impl QuerySource { self.broadcaster.lock().unwrap().add(sender) } + /// Adds an auto-converting connection to a replier port of the model + /// specified by the address. + /// + /// Queries and replies are mapped to other types using the closures + /// provided in argument. + /// + /// The replier port must be an asynchronous method of a model of type `M` + /// returning a value of the type returned by the reply mapping closure and + /// taking as argument a value of the type returned by the query mapping + /// closure plus, optionally, a context reference. + pub fn map_connect( + &mut self, + query_map: C, + reply_map: D, + replier: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> U + Send + 'static, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + U: Send + 'static, + Q: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(MapReplierSender::new( + query_map, + reply_map, + replier, + address.into().0, + )); + self.broadcaster.lock().unwrap().add(sender) + } + + /// Adds an auto-converting, filtered connection to a replier port of the + /// model specified by the address. + /// + /// Queries and replies are mapped to other types using the closures + /// provided in argument, or ignored if the query closure returns `None`. + /// + /// The replier port must be an asynchronous method of a model of type `M` + /// returning a value of the type returned by the reply mapping closure and + /// taking as argument a value of the type returned by the query mapping + /// closure plus, optionally, a context reference. + pub fn filter_map_connect( + &mut self, + query_filter_map: C, + reply_map: D, + replier: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> Option + Send + 'static, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + U: Send + 'static, + Q: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(FilterMapReplierSender::new( + query_filter_map, + reply_map, + replier, + address.into().0, + )); + self.broadcaster.lock().unwrap().add(sender) + } + /// Removes the connection specified by the `LineId` parameter. /// /// It is a logic error to specify a line identifier from another diff --git a/asynchronix/src/ports/source/broadcaster.rs b/asynchronix/src/ports/source/broadcaster.rs index b545b66..138cc14 100644 --- a/asynchronix/src/ports/source/broadcaster.rs +++ b/asynchronix/src/ports/source/broadcaster.rs @@ -71,26 +71,26 @@ impl BroadcasterInner { self.senders.len() } - /// Efficiently broadcasts a message or a query to multiple addresses. - /// - /// This method does not collect the responses from queries. - fn broadcast(&mut self, arg: T) -> BroadcastFuture { - let mut future_states = Vec::with_capacity(self.senders.len()); + /// Return a list of futures broadcasting an event or query to multiple addresses. + fn futures(&mut self, arg: T) -> Vec> { + let mut future_states = Vec::new(); // Broadcast the message and collect all futures. let mut iter = self.senders.iter_mut(); while let Some(sender) = iter.next() { // Move the argument rather than clone it for the last future. if iter.len() == 0 { - future_states.push(SenderFutureState::Pending(sender.1.send(arg))); + if let Some(fut) = sender.1.send(arg) { + future_states.push(SenderFutureState::Pending(fut)); + } break; } - - future_states.push(SenderFutureState::Pending(sender.1.send(arg.clone()))); + if let Some(fut) = sender.1.send(arg.clone()) { + future_states.push(SenderFutureState::Pending(fut)); + } } - // Generate the global future. - BroadcastFuture::new(future_states) + future_states } } @@ -157,17 +157,27 @@ impl EventBroadcaster { let fut = match self.inner.senders.as_mut_slice() { // No sender. [] => Fut::Empty, - // One sender. + // One sender at most. [sender] => Fut::Single(sender.1.send(arg)), - // Multiple senders. - _ => Fut::Multiple(self.inner.broadcast(arg)), + // Possibly multiple senders. + _ => Fut::Multiple(self.inner.futures(arg)), }; async { match fut { - Fut::Empty => Ok(()), - Fut::Single(fut) => fut.await.map_err(|_| BroadcastError {}), - Fut::Multiple(fut) => fut.await.map(|_| ()), + // No sender. + Fut::Empty | Fut::Single(None) => Ok(()), + + Fut::Single(Some(fut)) => fut.await.map_err(|_| BroadcastError {}), + + Fut::Multiple(mut futures) => match futures.as_mut_slice() { + // No sender. + [] => Ok(()), + // One sender. + [SenderFutureState::Pending(fut)] => fut.await.map_err(|_| BroadcastError {}), + // Multiple senders. + _ => BroadcastFuture::new(futures).await.map(|_| ()), + }, } } } @@ -235,20 +245,39 @@ impl QueryBroadcaster { let fut = match self.inner.senders.as_mut_slice() { // No sender. [] => Fut::Empty, - // One sender. + // One sender at most. [sender] => Fut::Single(sender.1.send(arg)), - // Multiple senders. - _ => Fut::Multiple(self.inner.broadcast(arg)), + // Possibly multiple senders. + _ => Fut::Multiple(self.inner.futures(arg)), }; async { match fut { - Fut::Empty => Ok(ReplyIterator(Vec::new().into_iter())), - Fut::Single(fut) => fut + // No sender. + Fut::Empty | Fut::Single(None) => Ok(ReplyIterator(Vec::new().into_iter())), + + Fut::Single(Some(fut)) => fut .await .map(|reply| ReplyIterator(vec![SenderFutureState::Ready(reply)].into_iter())) .map_err(|_| BroadcastError {}), - Fut::Multiple(fut) => fut.await.map_err(|_| BroadcastError {}), + + Fut::Multiple(mut futures) => match futures.as_mut_slice() { + // No sender. + [] => Ok(ReplyIterator(Vec::new().into_iter())), + + // One sender. + [SenderFutureState::Pending(fut)] => fut + .await + .map(|reply| { + ReplyIterator(vec![SenderFutureState::Ready(reply)].into_iter()) + }) + .map_err(|_| BroadcastError {}), + + // Multiple senders. + _ => BroadcastFuture::new(futures) + .await + .map_err(|_| BroadcastError {}), + }, } } } @@ -598,14 +627,17 @@ mod tests { receiver: Option>>, } impl Sender<(), R> for TestEvent { - fn send(&mut self, _arg: ()) -> Pin> + Send>> { + fn send( + &mut self, + _arg: (), + ) -> Option> + Send>>> { let receiver = self.receiver.take().unwrap(); - Box::pin(async move { + Some(Box::pin(async move { let mut stream = Box::pin(receiver.filter_map(|item| async { item })); Ok(stream.next().await.unwrap()) - }) + })) } } @@ -634,8 +666,16 @@ mod tests { ) } + // This tests fails with "Concurrent load and mut accesses" even though the + // `task_list` implementation which triggers it does not use any unsafe. + // This is most certainly related to this Loom bug: + // + // https://github.com/tokio-rs/loom/issues/260 + // + // Disabling until the bug is fixed. + #[ignore] #[test] - fn loom_broadcast_basic() { + fn loom_broadcast_query_basic() { const DEFAULT_PREEMPTION_BOUND: usize = 3; let mut builder = Builder::new(); @@ -707,8 +747,16 @@ mod tests { }); } + // This tests fails with "Concurrent load and mut accesses" even though the + // `task_list` implementation which triggers it does not use any unsafe. + // This is most certainly related to this Loom bug: + // + // https://github.com/tokio-rs/loom/issues/260 + // + // Disabling until the bug is fixed. + #[ignore] #[test] - fn loom_broadcast_spurious() { + fn loom_broadcast_query_spurious() { const DEFAULT_PREEMPTION_BOUND: usize = 3; let mut builder = Builder::new(); diff --git a/asynchronix/src/ports/source/sender.rs b/asynchronix/src/ports/source/sender.rs index 1e83141..66c7080 100644 --- a/asynchronix/src/ports/source/sender.rs +++ b/asynchronix/src/ports/source/sender.rs @@ -3,6 +3,7 @@ use std::fmt; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; +use std::sync::Arc; use futures_channel::oneshot; use recycle_box::{coerce_box, RecycleBox}; @@ -16,22 +17,23 @@ pub(super) type SenderFuture = Pin: Send { /// Asynchronously send the event or request. - fn send(&mut self, arg: T) -> SenderFuture; + fn send(&mut self, arg: T) -> Option>; } /// An object that can send events to an input port. -pub(super) struct InputSender { +pub(super) struct InputSender +where + M: 'static, +{ func: F, sender: channel::Sender, _phantom_closure: PhantomData, _phantom_closure_marker: PhantomData, } -impl InputSender +impl InputSender where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + 'static, + M: 'static, { pub(super) fn new(func: F, sender: channel::Sender) -> Self { Self { @@ -43,18 +45,74 @@ where } } -impl Sender for InputSender +impl Sender for InputSender where M: Model, F: for<'a> InputFn<'a, M, T, S> + Clone, T: Send + 'static, - S: Send + 'static, + S: Send, +{ + fn send(&mut self, arg: T) -> Option> { + let func = self.func.clone(); + let sender = self.sender.clone(); + + Some(Box::pin(async move { + sender + .send(move |model, scheduler, recycle_box| { + let fut = func.call(model, arg, scheduler); + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }) + .await + .map_err(|_| SendError {}) + })) + } +} + +/// An object that can send mapped events to an input port. +pub(super) struct MapInputSender +where + M: 'static, +{ + map: C, + func: F, + sender: channel::Sender, + _phantom_map: PhantomData U>, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, +} + +impl MapInputSender +where + M: 'static, +{ + pub(super) fn new(map: C, func: F, sender: channel::Sender) -> Self { + Self { + map, + func, + sender, + _phantom_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for MapInputSender +where + M: Model, + C: Fn(T) -> U + Send, + F: for<'a> InputFn<'a, M, U, S> + Clone, + T: Send + 'static, + U: Send + 'static, + S: Send, { - fn send(&mut self, arg: T) -> SenderFuture<()> { + fn send(&mut self, arg: T) -> Option> { let func = self.func.clone(); + let arg = (self.map)(arg); let sender = self.sender.clone(); - Box::pin(async move { + Some(Box::pin(async move { sender .send(move |model, scheduler, recycle_box| { let fut = func.call(model, arg, scheduler); @@ -63,12 +121,72 @@ where }) .await .map_err(|_| SendError {}) + })) + } +} + +/// An object that can filter and send mapped events to an input port. +pub(super) struct FilterMapInputSender +where + M: 'static, +{ + filter_map: C, + func: F, + sender: channel::Sender, + _phantom_map: PhantomData U>, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, +} + +impl FilterMapInputSender +where + M: 'static, +{ + pub(super) fn new(filter_map: C, func: F, sender: channel::Sender) -> Self { + Self { + filter_map, + func, + sender, + _phantom_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for FilterMapInputSender +where + M: Model, + C: Fn(T) -> Option + Send, + F: for<'a> InputFn<'a, M, U, S> + Clone, + T: Send + 'static, + U: Send + 'static, + S: Send, +{ + fn send(&mut self, arg: T) -> Option> { + (self.filter_map)(arg).map(|arg| { + let func = self.func.clone(); + let sender = self.sender.clone(); + + Box::pin(async move { + sender + .send(move |model, scheduler, recycle_box| { + let fut = func.call(model, arg, scheduler); + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }) + .await + .map_err(|_| SendError {}) + }) as SenderFuture<()> }) } } /// An object that can send a request to a replier port and retrieve a response. -pub(super) struct ReplierSender { +pub(super) struct ReplierSender +where + M: 'static, +{ func: F, sender: channel::Sender, _phantom_closure: PhantomData R>, @@ -77,10 +195,7 @@ pub(super) struct ReplierSender { impl ReplierSender where - M: Model, - F: for<'a> ReplierFn<'a, M, T, R, S>, - T: Send + 'static, - R: Send + 'static, + M: 'static, { pub(super) fn new(func: F, sender: channel::Sender) -> Self { Self { @@ -100,12 +215,12 @@ where R: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> SenderFuture { + fn send(&mut self, arg: T) -> Option> { let func = self.func.clone(); let sender = self.sender.clone(); let (reply_sender, reply_receiver) = oneshot::channel(); - Box::pin(async move { + Some(Box::pin(async move { sender .send(move |model, scheduler, recycle_box| { let fut = async move { @@ -119,6 +234,160 @@ where .map_err(|_| SendError {})?; reply_receiver.await.map_err(|_| SendError {}) + })) + } +} + +/// An object that can send a mapped request to a replier port and retrieve a +/// mapped response. +pub(super) struct MapReplierSender +where + M: 'static, +{ + query_map: C, + reply_map: Arc, + func: F, + sender: channel::Sender, + _phantom_query_map: PhantomData U>, + _phantom_reply_map: PhantomData R>, + _phantom_closure: PhantomData Q>, + _phantom_closure_marker: PhantomData, +} + +impl MapReplierSender +where + M: 'static, +{ + pub(super) fn new(query_map: C, reply_map: D, func: F, sender: channel::Sender) -> Self { + Self { + query_map, + reply_map: Arc::new(reply_map), + func, + sender, + _phantom_query_map: PhantomData, + _phantom_reply_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for MapReplierSender +where + M: Model, + C: Fn(T) -> U + Send, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + T: Send + 'static, + R: Send + 'static, + U: Send + 'static, + Q: Send + 'static, + S: Send, +{ + fn send(&mut self, arg: T) -> Option> { + let func = self.func.clone(); + let arg = (self.query_map)(arg); + let sender = self.sender.clone(); + let reply_map = self.reply_map.clone(); + let (reply_sender, reply_receiver) = oneshot::channel(); + + Some(Box::pin(async move { + sender + .send(move |model, scheduler, recycle_box| { + let fut = async move { + let reply = func.call(model, arg, scheduler).await; + let _ = reply_sender.send(reply); + }; + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }) + .await + .map_err(|_| SendError {})?; + + reply_receiver + .await + .map_err(|_| SendError {}) + .map(&*reply_map) + })) + } +} + +/// An object that can filter and send a mapped request to a replier port and +/// retrieve a mapped response. +pub(super) struct FilterMapReplierSender +where + M: 'static, +{ + query_filter_map: C, + reply_map: Arc, + func: F, + sender: channel::Sender, + _phantom_query_map: PhantomData Option>, + _phantom_reply_map: PhantomData R>, + _phantom_closure: PhantomData Q>, + _phantom_closure_marker: PhantomData, +} + +impl FilterMapReplierSender +where + M: 'static, +{ + pub(super) fn new( + query_filter_map: C, + reply_map: D, + func: F, + sender: channel::Sender, + ) -> Self { + Self { + query_filter_map, + reply_map: Arc::new(reply_map), + func, + sender, + _phantom_query_map: PhantomData, + _phantom_reply_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for FilterMapReplierSender +where + M: Model, + C: Fn(T) -> Option + Send, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + T: Send + 'static, + R: Send + 'static, + U: Send + 'static, + Q: Send + 'static, + S: Send, +{ + fn send(&mut self, arg: T) -> Option> { + (self.query_filter_map)(arg).map(|arg| { + let func = self.func.clone(); + let sender = self.sender.clone(); + let reply_map = self.reply_map.clone(); + let (reply_sender, reply_receiver) = oneshot::channel(); + + Box::pin(async move { + sender + .send(move |model, scheduler, recycle_box| { + let fut = async move { + let reply = func.call(model, arg, scheduler).await; + let _ = reply_sender.send(reply); + }; + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }) + .await + .map_err(|_| SendError {})?; + + reply_receiver + .await + .map_err(|_| SendError {}) + .map(&*reply_map) + }) as SenderFuture }) } } From 2270a94b8d0063869381e6321079107a28afe736 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Mon, 5 Aug 2024 09:56:13 +0200 Subject: [PATCH 5/8] Simplify output broadcaster implementation --- asynchronix/src/ports/output/broadcaster.rs | 127 ++++++++------------ 1 file changed, 47 insertions(+), 80 deletions(-) diff --git a/asynchronix/src/ports/output/broadcaster.rs b/asynchronix/src/ports/output/broadcaster.rs index 0be241a..d8a8afe 100644 --- a/asynchronix/src/ports/output/broadcaster.rs +++ b/asynchronix/src/ports/output/broadcaster.rs @@ -4,9 +4,8 @@ use std::pin::Pin; use std::task::{Context, Poll}; use diatomic_waker::WakeSink; -use recycle_box::{coerce_box, RecycleBox}; -use super::sender::{SendError, Sender}; +use super::sender::{RecycledFuture, SendError, Sender}; use super::LineId; use crate::util::task_set::TaskSet; @@ -20,8 +19,8 @@ use crate::util::task_set::TaskSet; /// This is somewhat similar to what `FuturesOrdered` in the `futures` crate /// does, but with some key differences: /// -/// - tasks and future storage are reusable to avoid repeated allocation, so -/// allocation occurs only after a new sender is added, +/// - tasks, output storage and future storage are reusable to avoid repeated +/// allocation, so allocation occurs only after a new sender is added, /// - the outputs of all sender futures are returned all at once rather than /// with an asynchronous iterator (a.k.a. async stream). pub(super) struct BroadcasterInner { @@ -46,11 +45,15 @@ impl BroadcasterInner { self.next_line_id += 1; self.senders.push((line_id, sender)); - - self.shared.futures_env.push(FutureEnv::default()); - + self.shared.outputs.push(None); self.shared.task_set.resize(self.senders.len()); + // The storage is alway an empty vector so we just book some capacity. + self.shared + .storage + .as_mut() + .map(|s| s.try_reserve(self.senders.len()).unwrap()); + line_id } @@ -61,7 +64,7 @@ impl BroadcasterInner { pub(super) fn remove(&mut self, id: LineId) -> bool { if let Some(pos) = self.senders.iter().position(|s| s.0 == id) { self.senders.swap_remove(pos); - self.shared.futures_env.swap_remove(pos); + self.shared.outputs.truncate(self.senders.len()); self.shared.task_set.resize(self.senders.len()); return true; @@ -73,7 +76,7 @@ impl BroadcasterInner { /// Removes all senders. pub(super) fn clear(&mut self) { self.senders.clear(); - self.shared.futures_env.clear(); + self.shared.outputs.clear(); self.shared.task_set.resize(0); } @@ -89,30 +92,15 @@ impl BroadcasterInner { let mut futures = recycle_vec(self.shared.storage.take().unwrap_or_default()); // Broadcast the message and collect all futures. - let mut iter = self - .senders - .iter_mut() - .zip(self.shared.futures_env.iter_mut()); - while let Some((sender, futures_env)) = iter.next() { - let future_cache = futures_env - .storage - .take() - .unwrap_or_else(|| RecycleBox::new(())); - + let mut iter = self.senders.iter_mut(); + while let Some(sender) = iter.next() { // Move the argument rather than clone it for the last future. if iter.len() == 0 { - let future: RecycleBox> + Send + '_> = - coerce_box!(RecycleBox::recycle(future_cache, sender.1.send(arg))); - - futures.push(RecycleBox::into_pin(future)); + futures.push(sender.1.send(arg)); break; } - let future: RecycleBox> + Send + '_> = coerce_box!( - RecycleBox::recycle(future_cache, sender.1.send(arg.clone())) - ); - - futures.push(RecycleBox::into_pin(future)); + futures.push(sender.1.send(arg.clone())); } // Generate the global future. @@ -132,7 +120,7 @@ impl Default for BroadcasterInner { shared: Shared { wake_sink, task_set: TaskSet::new(wake_src), - futures_env: Vec::new(), + outputs: Vec::new(), storage: None, }, } @@ -262,7 +250,7 @@ impl QueryBroadcaster { // One sender. [sender] => { let output = sender.1.send(arg).await.map_err(|_| BroadcastError {})?; - self.inner.shared.futures_env[0].output = Some(output); + self.inner.shared.outputs[0] = Some(output); } // Multiple senders. _ => self.inner.broadcast(arg).await?, @@ -273,9 +261,9 @@ impl QueryBroadcaster { let outputs = self .inner .shared - .futures_env + .outputs .iter_mut() - .map(|t| t.output.take().unwrap()); + .map(|t| t.take().unwrap()); Ok(outputs) } @@ -297,40 +285,20 @@ impl Clone for QueryBroadcaster { } } -/// Data related to a sender future. -struct FutureEnv { - /// Cached storage for the future. - storage: Option>, - /// Output of the associated future. - output: Option, -} - -impl Default for FutureEnv { - fn default() -> Self { - Self { - storage: None, - output: None, - } - } -} - -/// A type-erased `Send` future wrapped in a `RecycleBox`. -type RecycleBoxFuture<'a, R> = RecycleBox> + Send + 'a>; - /// Fields of `Broadcaster` that are explicitly borrowed by a `BroadcastFuture`. struct Shared { /// Thread-safe waker handle. wake_sink: WakeSink, /// Tasks associated to the sender futures. task_set: TaskSet, - /// Data related to the sender futures. - futures_env: Vec>, + /// Outputs of the sender futures. + outputs: Vec>, /// Cached storage for the sender futures. /// /// When it exists, the cached storage is always an empty vector but it /// typically has a non-zero capacity. Its purpose is to reuse the /// previously allocated capacity when creating new sender futures. - storage: Option>>>, + storage: Option>>>, } impl Clone for Shared { @@ -338,13 +306,13 @@ impl Clone for Shared { let wake_sink = WakeSink::new(); let wake_src = wake_sink.source(); - let mut futures_env = Vec::new(); - futures_env.resize_with(self.futures_env.len(), Default::default); + let mut outputs = Vec::new(); + outputs.resize_with(self.outputs.len(), Default::default); Self { wake_sink, task_set: TaskSet::with_len(wake_src, self.task_set.len()), - futures_env, + outputs, storage: None, } } @@ -363,7 +331,7 @@ pub(super) struct BroadcastFuture<'a, R> { /// Reference to the shared fields of the `Broadcast` object. shared: &'a mut Shared, /// List of all send futures. - futures: ManuallyDrop>>>, + futures: ManuallyDrop>>>, /// The total count of futures that have not yet been polled to completion. pending_futures_count: usize, /// State of completion of the future. @@ -372,14 +340,17 @@ pub(super) struct BroadcastFuture<'a, R> { impl<'a, R> BroadcastFuture<'a, R> { /// Creates a new `BroadcastFuture`. - fn new(shared: &'a mut Shared, futures: Vec>>) -> Self { + fn new( + shared: &'a mut Shared, + futures: Vec>>, + ) -> Self { let pending_futures_count = futures.len(); - assert!(shared.futures_env.len() == pending_futures_count); + assert!(shared.outputs.len() == pending_futures_count); - for futures_env in shared.futures_env.iter_mut() { + for output in shared.outputs.iter_mut() { // Drop the previous output if necessary. - futures_env.output.take(); + output.take(); } BroadcastFuture { @@ -395,12 +366,7 @@ impl<'a, R> Drop for BroadcastFuture<'a, R> { fn drop(&mut self) { // Safety: this is safe since `self.futures` is never accessed after it // is moved out. - let mut futures = unsafe { ManuallyDrop::take(&mut self.futures) }; - - // Recycle the future-containing boxes. - for (future, futures_env) in futures.drain(..).zip(self.shared.futures_env.iter_mut()) { - futures_env.storage = Some(RecycleBox::vacate_pinned(future)); - } + let futures = unsafe { ManuallyDrop::take(&mut self.futures) }; // Recycle the vector that contained the futures. self.shared.storage = Some(recycle_vec(futures)); @@ -425,14 +391,14 @@ impl<'a, R> Future for BroadcastFuture<'a, R> { this.shared.task_set.discard_scheduled(); for task_idx in 0..this.futures.len() { - let future_env = &mut this.shared.futures_env[task_idx]; - let future = &mut this.futures[task_idx]; + let output = &mut this.shared.outputs[task_idx]; + let future = std::pin::Pin::new(&mut this.futures[task_idx]); let task_waker_ref = this.shared.task_set.waker_of(task_idx); let task_cx_ref = &mut Context::from_waker(&task_waker_ref); - match future.as_mut().poll(task_cx_ref) { - Poll::Ready(Ok(output)) => { - future_env.output = Some(output); + match future.poll(task_cx_ref) { + Poll::Ready(Ok(o)) => { + *output = Some(o); this.pending_futures_count -= 1; } Poll::Ready(Err(_)) => { @@ -477,20 +443,20 @@ impl<'a, R> Future for BroadcastFuture<'a, R> { }; for task_idx in scheduled_tasks { - let future_env = &mut this.shared.futures_env[task_idx]; + let output = &mut this.shared.outputs[task_idx]; // Do not poll completed futures. - if future_env.output.is_some() { + if output.is_some() { continue; } - let future = &mut this.futures[task_idx]; + let future = std::pin::Pin::new(&mut this.futures[task_idx]); let task_waker_ref = this.shared.task_set.waker_of(task_idx); let task_cx_ref = &mut Context::from_waker(&task_waker_ref); - match future.as_mut().poll(task_cx_ref) { - Poll::Ready(Ok(output)) => { - future_env.output = Some(output); + match future.poll(task_cx_ref) { + Poll::Ready(Ok(o)) => { + *output = Some(o); this.pending_futures_count -= 1; } Poll::Ready(Err(_)) => { @@ -703,6 +669,7 @@ mod tests { use loom::sync::atomic::{AtomicBool, Ordering}; use loom::thread; + use recycle_box::RecycleBox; use waker_fn::waker_fn; use super::super::sender::RecycledFuture; From b5187ded442294e7dcf7651fee019681410545a4 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Wed, 7 Aug 2024 10:11:53 +0200 Subject: [PATCH 6/8] Optimize filtered connections from outputs --- asynchronix/src/ports/output.rs | 36 ++++ asynchronix/src/ports/output/broadcaster.rs | 118 ++++++++---- asynchronix/src/ports/output/sender.rs | 190 +++++++++++++++----- asynchronix/src/ports/source/broadcaster.rs | 3 +- 4 files changed, 269 insertions(+), 78 deletions(-) diff --git a/asynchronix/src/ports/output.rs b/asynchronix/src/ports/output.rs index 95d982b..f2c04d7 100644 --- a/asynchronix/src/ports/output.rs +++ b/asynchronix/src/ports/output.rs @@ -10,6 +10,7 @@ use crate::simulation::Address; use crate::util::cached_rw_lock::CachedRwLock; use broadcaster::{EventBroadcaster, QueryBroadcaster}; +use sender::FilterMapReplierSender; use self::sender::{ EventSinkSender, FilterMapEventSinkSender, FilterMapInputSender, InputSender, @@ -262,6 +263,41 @@ impl Requestor { self.broadcaster.write().unwrap().add(sender) } + /// Adds an auto-converting, filtered connection to a replier port of the + /// model specified by the address. + /// + /// Queries and replies are mapped to other types using the closures + /// provided in argument, or ignored if the query closure returns `None`. + /// + /// The replier port must be an asynchronous method of a model of type `M` + /// returning a value of the type returned by the reply mapping closure and + /// taking as argument a value of the type returned by the query mapping + /// closure plus, optionally, a context reference. + pub fn filter_map_connect( + &mut self, + query_filer_map: C, + reply_map: D, + replier: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> Option + Send + Sync + 'static, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + U: Send + 'static, + Q: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(FilterMapReplierSender::new( + query_filer_map, + reply_map, + replier, + address.into().0, + )); + self.broadcaster.write().unwrap().add(sender) + } + /// Removes the connection specified by the `LineId` parameter. /// /// It is a logic error to specify a line identifier from another diff --git a/asynchronix/src/ports/output/broadcaster.rs b/asynchronix/src/ports/output/broadcaster.rs index d8a8afe..07027d2 100644 --- a/asynchronix/src/ports/output/broadcaster.rs +++ b/asynchronix/src/ports/output/broadcaster.rs @@ -46,13 +46,11 @@ impl BroadcasterInner { self.senders.push((line_id, sender)); self.shared.outputs.push(None); - self.shared.task_set.resize(self.senders.len()); // The storage is alway an empty vector so we just book some capacity. - self.shared - .storage - .as_mut() - .map(|s| s.try_reserve(self.senders.len()).unwrap()); + self.shared.storage.as_mut().map(|s| { + let _ = s.try_reserve(self.senders.len()); + }); line_id } @@ -65,7 +63,6 @@ impl BroadcasterInner { if let Some(pos) = self.senders.iter().position(|s| s.0 == id) { self.senders.swap_remove(pos); self.shared.outputs.truncate(self.senders.len()); - self.shared.task_set.resize(self.senders.len()); return true; } @@ -77,7 +74,6 @@ impl BroadcasterInner { pub(super) fn clear(&mut self) { self.senders.clear(); self.shared.outputs.clear(); - self.shared.task_set.resize(0); } /// Returns the number of connected senders. @@ -85,10 +81,15 @@ impl BroadcasterInner { self.senders.len() } - /// Efficiently broadcasts a message or a query to multiple addresses. - /// - /// This method does not collect the responses from queries. - fn broadcast(&mut self, arg: T) -> BroadcastFuture<'_, R> { + /// Return a list of futures broadcasting an event or query to multiple + /// addresses. + fn futures( + &mut self, + arg: T, + ) -> ( + &'_ mut Shared, + Vec>>, + ) { let mut futures = recycle_vec(self.shared.storage.take().unwrap_or_default()); // Broadcast the message and collect all futures. @@ -96,15 +97,18 @@ impl BroadcasterInner { while let Some(sender) = iter.next() { // Move the argument rather than clone it for the last future. if iter.len() == 0 { - futures.push(sender.1.send(arg)); + if let Some(fut) = sender.1.send(arg) { + futures.push(fut); + } break; } - futures.push(sender.1.send(arg.clone())); + if let Some(fut) = sender.1.send(arg.clone()) { + futures.push(fut); + } } - // Generate the global future. - BroadcastFuture::new(&mut self.shared, futures) + (&mut self.shared, futures) } } @@ -183,10 +187,22 @@ impl EventBroadcaster { match self.inner.senders.as_mut_slice() { // No sender. [] => Ok(()), - // One sender. - [sender] => sender.1.send(arg).await.map_err(|_| BroadcastError {}), - // Multiple senders. - _ => self.inner.broadcast(arg).await, + + // One sender at most. + [sender] => match sender.1.send(arg) { + None => Ok(()), + Some(fut) => fut.await.map_err(|_| BroadcastError {}), + }, + + // Possibly multiple senders. + _ => { + let (shared, mut futures) = self.inner.futures(arg); + match futures.as_mut_slice() { + [] => Ok(()), + [fut] => fut.await.map_err(|_| BroadcastError {}), + _ => BroadcastFuture::new(shared, futures).await, + } + } } } } @@ -244,25 +260,49 @@ impl QueryBroadcaster { &mut self, arg: T, ) -> Result + '_, BroadcastError> { - match self.inner.senders.as_mut_slice() { + let output_count = match self.inner.senders.as_mut_slice() { // No sender. - [] => {} - // One sender. + [] => 0, + + // One sender at most. [sender] => { - let output = sender.1.send(arg).await.map_err(|_| BroadcastError {})?; - self.inner.shared.outputs[0] = Some(output); + if let Some(fut) = sender.1.send(arg) { + let output = fut.await.map_err(|_| BroadcastError {})?; + self.inner.shared.outputs[0] = Some(output); + + 1 + } else { + 0 + } + } + + // Possibly multiple senders. + _ => { + let (shared, mut futures) = self.inner.futures(arg); + let output_count = futures.len(); + + match futures.as_mut_slice() { + [] => {} + [fut] => { + let output = fut.await.map_err(|_| BroadcastError {})?; + shared.outputs[0] = Some(output); + } + _ => { + BroadcastFuture::new(shared, futures).await?; + } + } + + output_count } - // Multiple senders. - _ => self.inner.broadcast(arg).await?, }; - // At this point all outputs should be available so `unwrap` can be - // called on the output of each future. + // At this point all outputs should be available. let outputs = self .inner .shared .outputs .iter_mut() + .take(output_count) .map(|t| t.take().unwrap()); Ok(outputs) @@ -311,7 +351,7 @@ impl Clone for Shared { Self { wake_sink, - task_set: TaskSet::with_len(wake_src, self.task_set.len()), + task_set: TaskSet::new(wake_src), outputs, storage: None, } @@ -345,11 +385,11 @@ impl<'a, R> BroadcastFuture<'a, R> { futures: Vec>>, ) -> Self { let pending_futures_count = futures.len(); + shared.task_set.resize(pending_futures_count); - assert!(shared.outputs.len() == pending_futures_count); - - for output in shared.outputs.iter_mut() { - // Drop the previous output if necessary. + for output in shared.outputs.iter_mut().take(pending_futures_count) { + // Empty the output slots to be used. This is necessary in case the + // previous broadcast future was cancelled. output.take(); } @@ -379,7 +419,11 @@ impl<'a, R> Future for BroadcastFuture<'a, R> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = &mut *self; - assert_ne!(this.state, FutureState::Completed); + assert_ne!( + this.state, + FutureState::Completed, + "broadcast future polled after completion" + ); // Poll all sender futures once if this is the first time the broadcast // future is polled. @@ -681,15 +725,15 @@ mod tests { fut_storage: Option>, } impl Sender<(), R> for TestEvent { - fn send(&mut self, _arg: ()) -> RecycledFuture<'_, Result> { + fn send(&mut self, _arg: ()) -> Option>> { let fut_storage = &mut self.fut_storage; let receiver = &mut self.receiver; - RecycledFuture::new(fut_storage, async { + Some(RecycledFuture::new(fut_storage, async { let mut stream = Box::pin(receiver.filter_map(|item| async { item })); Ok(stream.next().await.unwrap()) - }) + })) } } diff --git a/asynchronix/src/ports/output/sender.rs b/asynchronix/src/ports/output/sender.rs index 5b285cc..9ddb9c7 100644 --- a/asynchronix/src/ports/output/sender.rs +++ b/asynchronix/src/ports/output/sender.rs @@ -1,6 +1,6 @@ use std::error::Error; use std::fmt; -use std::future::{ready, Future}; +use std::future::Future; use std::marker::PhantomData; use std::mem::ManuallyDrop; use std::pin::Pin; @@ -18,7 +18,7 @@ use crate::ports::{EventSinkWriter, InputFn, ReplierFn}; /// replier method. pub(super) trait Sender: DynClone + Send { /// Asynchronously send the event or request. - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result>; + fn send(&mut self, arg: T) -> Option>>; } dyn_clone::clone_trait_object!( Sender); @@ -57,7 +57,7 @@ where T: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + fn send(&mut self, arg: T) -> Option>> { let func = self.func.clone(); let fut = self.sender.send(move |model, scheduler, recycle_box| { @@ -66,9 +66,9 @@ where coerce_box!(RecycleBox::recycle(recycle_box, fut)) }); - RecycledFuture::new(&mut self.fut_storage, async move { + Some(RecycledFuture::new(&mut self.fut_storage, async move { fut.await.map_err(|_| SendError {}) - }) + })) } } @@ -128,7 +128,7 @@ where U: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + fn send(&mut self, arg: T) -> Option>> { let func = self.func.clone(); let arg = (self.map)(arg); @@ -138,9 +138,9 @@ where coerce_box!(RecycleBox::recycle(recycle_box, fut)) }); - RecycledFuture::new(&mut self.fut_storage, async move { + Some(RecycledFuture::new(&mut self.fut_storage, async move { fut.await.map_err(|_| SendError {}) - }) + })) } } @@ -202,23 +202,20 @@ where U: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { - let func = self.func.clone(); + fn send(&mut self, arg: T) -> Option>> { + (self.filter_map)(arg).map(|arg| { + let func = self.func.clone(); - match (self.filter_map)(arg) { - Some(arg) => { - let fut = self.sender.send(move |model, scheduler, recycle_box| { - let fut = func.call(model, arg, scheduler); + let fut = self.sender.send(move |model, scheduler, recycle_box| { + let fut = func.call(model, arg, scheduler); - coerce_box!(RecycleBox::recycle(recycle_box, fut)) - }); + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }); - RecycledFuture::new(&mut self.fut_storage, async move { - fut.await.map_err(|_| SendError {}) - }) - } - None => RecycledFuture::new(&mut self.fut_storage, ready(Ok(()))), - } + RecycledFuture::new(&mut self.fut_storage, async move { + fut.await.map_err(|_| SendError {}) + }) + }) } } @@ -262,14 +259,14 @@ where T: Send + 'static, W: EventSinkWriter, { - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + fn send(&mut self, arg: T) -> Option>> { let writer = &mut self.writer; - RecycledFuture::new(&mut self.fut_storage, async move { + Some(RecycledFuture::new(&mut self.fut_storage, async move { writer.write(arg); Ok(()) - }) + })) } } @@ -315,15 +312,15 @@ where C: Fn(T) -> U + Send + Sync, W: EventSinkWriter, { - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + fn send(&mut self, arg: T) -> Option>> { let writer = &mut self.writer; let arg = (self.map)(arg); - RecycledFuture::new(&mut self.fut_storage, async move { + Some(RecycledFuture::new(&mut self.fut_storage, async move { writer.write(arg); Ok(()) - }) + })) } } @@ -374,17 +371,16 @@ where C: Fn(T) -> Option + Send + Sync, W: EventSinkWriter, { - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + fn send(&mut self, arg: T) -> Option>> { let writer = &mut self.writer; - match (self.filter_map)(arg) { - Some(arg) => RecycledFuture::new(&mut self.fut_storage, async move { + (self.filter_map)(arg).map(|arg| { + RecycledFuture::new(&mut self.fut_storage, async move { writer.write(arg); Ok(()) - }), - None => RecycledFuture::new(&mut self.fut_storage, ready(Ok(()))), - } + }) + }) } } @@ -440,7 +436,7 @@ where R: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result> { + fn send(&mut self, arg: T) -> Option>> { let func = self.func.clone(); let sender = &mut self.sender; let reply_receiver = &mut self.receiver; @@ -459,7 +455,7 @@ where coerce_box!(RecycleBox::recycle(recycle_box, fut)) }); - RecycledFuture::new(fut_storage, async move { + Some(RecycledFuture::new(fut_storage, async move { // Send the message. send_fut.await.map_err(|_| SendError {})?; @@ -467,7 +463,7 @@ where // If an error is received, it most likely means the mailbox was // dropped before the message was processed. reply_receiver.recv().await.map_err(|_| SendError {}) - }) + })) } } @@ -538,7 +534,7 @@ where Q: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result> { + fn send(&mut self, arg: T) -> Option>> { let func = self.func.clone(); let arg = (self.query_map)(arg); let sender = &mut self.sender; @@ -559,7 +555,7 @@ where coerce_box!(RecycleBox::recycle(recycle_box, fut)) }); - RecycledFuture::new(fut_storage, async move { + Some(RecycledFuture::new(fut_storage, async move { // Send the message. send_fut.await.map_err(|_| SendError {})?; @@ -571,7 +567,7 @@ where .await .map_err(|_| SendError {}) .map(reply_map) - }) + })) } } @@ -596,6 +592,120 @@ where } } +/// An object that can filter and send mapped requests to a replier port and +/// retrieve mapped responses. +pub(super) struct FilterMapReplierSender +where + M: Model, +{ + query_filter_map: Arc, + reply_map: Arc, + func: F, + sender: channel::Sender, + receiver: multishot::Receiver, + fut_storage: Option>, + _phantom_query_map: PhantomData U>, + _phantom_reply_map: PhantomData R>, + _phantom_closure: PhantomData Q>, + _phantom_closure_marker: PhantomData, +} + +impl FilterMapReplierSender +where + M: Model, +{ + pub(super) fn new( + query_filter_map: C, + reply_map: D, + func: F, + sender: channel::Sender, + ) -> Self { + Self { + query_filter_map: Arc::new(query_filter_map), + reply_map: Arc::new(reply_map), + func, + sender, + receiver: multishot::Receiver::new(), + fut_storage: None, + _phantom_query_map: PhantomData, + _phantom_reply_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for FilterMapReplierSender +where + M: Model, + C: Fn(T) -> Option + Send + Sync, + D: Fn(Q) -> R + Send + Sync, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + T: Send + 'static, + R: Send + 'static, + U: Send + 'static, + Q: Send + 'static, + S: Send, +{ + fn send(&mut self, arg: T) -> Option>> { + (self.query_filter_map)(arg).map(|arg| { + let func = self.func.clone(); + let sender = &mut self.sender; + let reply_receiver = &mut self.receiver; + let fut_storage = &mut self.fut_storage; + let reply_map = &*self.reply_map; + + // The previous future generated by this method should have been polled + // to completion so a new sender should be readily available. + let reply_sender = reply_receiver.sender().unwrap(); + + let send_fut = sender.send(move |model, scheduler, recycle_box| { + let fut = async move { + let reply = func.call(model, arg, scheduler).await; + reply_sender.send(reply); + }; + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }); + + RecycledFuture::new(fut_storage, async move { + // Send the message. + send_fut.await.map_err(|_| SendError {})?; + + // Wait until the message is processed and the reply is sent back. + // If an error is received, it most likely means the mailbox was + // dropped before the message was processed. + reply_receiver + .recv() + .await + .map_err(|_| SendError {}) + .map(reply_map) + }) + }) + } +} + +impl Clone for FilterMapReplierSender +where + M: Model, + F: Clone, +{ + fn clone(&self) -> Self { + Self { + query_filter_map: self.query_filter_map.clone(), + reply_map: self.reply_map.clone(), + func: self.func.clone(), + sender: self.sender.clone(), + receiver: multishot::Receiver::new(), + fut_storage: None, + _phantom_query_map: PhantomData, + _phantom_reply_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + /// Error returned when the mailbox was closed or dropped. #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(super) struct SendError {} diff --git a/asynchronix/src/ports/source/broadcaster.rs b/asynchronix/src/ports/source/broadcaster.rs index 138cc14..bb36054 100644 --- a/asynchronix/src/ports/source/broadcaster.rs +++ b/asynchronix/src/ports/source/broadcaster.rs @@ -71,7 +71,8 @@ impl BroadcasterInner { self.senders.len() } - /// Return a list of futures broadcasting an event or query to multiple addresses. + /// Return a list of futures broadcasting an event or query to multiple + /// addresses. fn futures(&mut self, arg: T) -> Vec> { let mut future_states = Vec::new(); From b544bcee92058e891656c0ab4b6a2dad885b848a Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Wed, 7 Aug 2024 10:23:10 +0200 Subject: [PATCH 7/8] Simplify task_set & satisfy clippy --- asynchronix/src/ports/output/broadcaster.rs | 7 +-- asynchronix/src/util/task_set.rs | 60 ++------------------- 2 files changed, 8 insertions(+), 59 deletions(-) diff --git a/asynchronix/src/ports/output/broadcaster.rs b/asynchronix/src/ports/output/broadcaster.rs index 07027d2..4300392 100644 --- a/asynchronix/src/ports/output/broadcaster.rs +++ b/asynchronix/src/ports/output/broadcaster.rs @@ -48,9 +48,9 @@ impl BroadcasterInner { self.shared.outputs.push(None); // The storage is alway an empty vector so we just book some capacity. - self.shared.storage.as_mut().map(|s| { - let _ = s.try_reserve(self.senders.len()); - }); + if let Some(storage) = self.shared.storage.as_mut() { + let _ = storage.try_reserve(self.senders.len()); + }; line_id } @@ -83,6 +83,7 @@ impl BroadcasterInner { /// Return a list of futures broadcasting an event or query to multiple /// addresses. + #[allow(clippy::type_complexity)] fn futures( &mut self, arg: T, diff --git a/asynchronix/src/util/task_set.rs b/asynchronix/src/util/task_set.rs index e1145e8..3ad9b4f 100644 --- a/asynchronix/src/util/task_set.rs +++ b/asynchronix/src/util/task_set.rs @@ -196,60 +196,12 @@ impl TaskSet { next: AtomicU32::new(SLEEPING), })); } - - return; } - // Try to shrink the vector of tasks. - // - // The main issue when shrinking the vector of tasks is that stale - // wakers may still be around and may at any moment be scheduled and - // insert their task index in the list of scheduled tasks. If it cannot - // be guaranteed that this will not happen, then the vector of tasks - // cannot be shrunk further, otherwise the iterator for scheduled tasks - // will later fail when reaching a task with an invalid index. - // - // We follow a 2-steps strategy: - // - // 1) remove all tasks currently in the list of scheduled task and set - // them to `SLEEPING` state in case some of them might have an index - // that will be invalidated when the vector of tasks is shrunk; - // - // 2) attempt to iteratively shrink the vector of tasks by removing - // tasks starting from the back of the vector: - // - If a task is in the `SLEEPING` state, then its `next` pointer is - // changed to an arbitrary value other than`SLEEPING`, but the task - // is not inserted in the list of scheduled tasks; this way, the - // task will be effectively rendered inactive. The task can now be - // removed from the vector. - // - If a task is found in a non-`SLEEPING` state (meaning that there - // was a race and the task was scheduled after step 1) then abandon - // further shrinking and leave this task in the vector; the iterator - // for scheduled tasks mitigates such situation by only yielding - // task indices that are within the expected range. - - // Step 1: unscheduled tasks that may be scheduled. - self.discard_scheduled(); - - // Step 2: attempt to remove tasks starting at the back of the vector. - while self.tasks.len() > len { - // There is at least one task since `len()` was non-zero. - let task = self.tasks.last().unwrap(); - - // Ordering: Relaxed ordering is sufficient since the task is - // effectively discarded. - if task - .next - .compare_exchange(SLEEPING, EMPTY, Ordering::Relaxed, Ordering::Relaxed) - .is_err() - { - // The task could not be removed for now so the set of tasks cannot - // be shrunk further. - break; - } - - self.tasks.pop(); - } + // The vector of tasks is never shrunk as this is a fairly costly + // operation and is not strictly necessary. Typically, inactive tasks + // left at the back of the vector are never waken anyway, and if they + // are, they are filtered out by the task iterator. } /// Returns `true` if one or more sub-tasks are currently scheduled. @@ -271,10 +223,6 @@ impl TaskSet { waker_ref(&self.tasks[idx]) } - - pub(crate) fn len(&self) -> usize { - self.task_count - } } /// Internals shared between a `TaskSet` and its associated `Task`s. From c4d93f5c31db13719b327533c1a31dd81eeaf20b Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Wed, 7 Aug 2024 10:29:13 +0200 Subject: [PATCH 8/8] Disable Loom tests for broadcaster due to Loom bug --- asynchronix/src/ports/output/broadcaster.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/asynchronix/src/ports/output/broadcaster.rs b/asynchronix/src/ports/output/broadcaster.rs index 4300392..f3cc423 100644 --- a/asynchronix/src/ports/output/broadcaster.rs +++ b/asynchronix/src/ports/output/broadcaster.rs @@ -770,6 +770,14 @@ mod tests { ) } + // This tests fails with "Concurrent load and mut accesses" even though the + // `task_list` implementation which triggers it does not use any unsafe. + // This is most certainly related to this Loom bug: + // + // https://github.com/tokio-rs/loom/issues/260 + // + // Disabling until the bug is fixed. + #[ignore] #[test] fn loom_broadcast_basic() { const DEFAULT_PREEMPTION_BOUND: usize = 3; @@ -843,6 +851,14 @@ mod tests { }); } + // This tests fails with "Concurrent load and mut accesses" even though the + // `task_list` implementation which triggers it does not use any unsafe. + // This is most certainly related to this Loom bug: + // + // https://github.com/tokio-rs/loom/issues/260 + // + // Disabling until the bug is fixed. + #[ignore] #[test] fn loom_broadcast_spurious() { const DEFAULT_PREEMPTION_BOUND: usize = 3;