diff --git a/asynchronix/src/ports/output.rs b/asynchronix/src/ports/output.rs index d5599fd..f2c04d7 100644 --- a/asynchronix/src/ports/output.rs +++ b/asynchronix/src/ports/output.rs @@ -10,8 +10,12 @@ use crate::simulation::Address; use crate::util::cached_rw_lock::CachedRwLock; use broadcaster::{EventBroadcaster, QueryBroadcaster}; +use sender::FilterMapReplierSender; -use self::sender::{EventSinkSender, InputSender, ReplierSender}; +use self::sender::{ + EventSinkSender, FilterMapEventSinkSender, FilterMapInputSender, InputSender, + MapEventSinkSender, MapInputSender, MapReplierSender, ReplierSender, +}; /// An output port. /// @@ -57,6 +61,94 @@ impl Output { self.broadcaster.write().unwrap().add(sender) } + /// Adds an auto-converting connection to an input port of the model + /// specified by the address. + /// + /// Events are mapped to another type using the closure provided in + /// argument. + /// + /// The input port must be an asynchronous method of a model of type `M` + /// taking as argument a value of the type returned by the mapping + /// closure plus, optionally, a context reference. + pub fn map_connect( + &mut self, + map: C, + input: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> U + Send + Sync + 'static, + F: for<'a> InputFn<'a, M, U, S> + Clone, + U: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(MapInputSender::new(map, input, address.into().0)); + self.broadcaster.write().unwrap().add(sender) + } + + /// Adds an auto-converting connection to an event sink such as an + /// [`EventSlot`](crate::ports::EventSlot) or + /// [`EventBuffer`](crate::ports::EventBuffer). + /// + /// Events are mapped to another type using the closure provided in + /// argument. + pub fn map_connect_sink(&mut self, map: C, sink: &S) -> LineId + where + C: Fn(T) -> U + Send + Sync + 'static, + U: Send + 'static, + S: EventSink, + { + let sender = Box::new(MapEventSinkSender::new(map, sink.writer())); + self.broadcaster.write().unwrap().add(sender) + } + + /// Adds an auto-converting, filtered connection to an input port of the + /// model specified by the address. + /// + /// Events are mapped to another type using the closure provided in + /// argument, or ignored if the closure returns `None`. + /// + /// The input port must be an asynchronous method of a model of type `M` + /// taking as argument a value of the type returned by the mapping + /// closure plus, optionally, a context reference. + pub fn filter_map_connect( + &mut self, + filter_map: C, + input: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> Option + Send + Sync + 'static, + F: for<'a> InputFn<'a, M, U, S> + Clone, + U: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(FilterMapInputSender::new( + filter_map, + input, + address.into().0, + )); + self.broadcaster.write().unwrap().add(sender) + } + + /// Adds an auto-converting connection to an event sink such as an + /// [`EventSlot`](crate::ports::EventSlot) or + /// [`EventBuffer`](crate::ports::EventBuffer). + /// + /// Events are mapped to another type using the closure provided in + /// argument. + pub fn filter_map_connect_sink(&mut self, filter_map: C, sink: &S) -> LineId + where + C: Fn(T) -> Option + Send + Sync + 'static, + U: Send + 'static, + S: EventSink, + { + let sender = Box::new(FilterMapEventSinkSender::new(filter_map, sink.writer())); + self.broadcaster.write().unwrap().add(sender) + } + /// Removes the connection specified by the `LineId` parameter. /// /// It is a logic error to specify a line identifier from another @@ -125,7 +217,7 @@ impl Requestor { /// /// The replier port must be an asynchronous method of a model of type `M` /// returning a value of type `R` and taking as argument a value of type `T` - /// plus, optionally, a scheduler reference. + /// plus, optionally, a context reference. pub fn connect(&mut self, replier: F, address: impl Into>) -> LineId where M: Model, @@ -136,6 +228,76 @@ impl Requestor { self.broadcaster.write().unwrap().add(sender) } + /// Adds an auto-converting connection to a replier port of the model + /// specified by the address. + /// + /// Queries and replies are mapped to other types using the closures + /// provided in argument. + /// + /// The replier port must be an asynchronous method of a model of type `M` + /// returning a value of the type returned by the reply mapping closure and + /// taking as argument a value of the type returned by the query mapping + /// closure plus, optionally, a context reference. + pub fn map_connect( + &mut self, + query_map: C, + reply_map: D, + replier: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> U + Send + Sync + 'static, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + U: Send + 'static, + Q: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(MapReplierSender::new( + query_map, + reply_map, + replier, + address.into().0, + )); + self.broadcaster.write().unwrap().add(sender) + } + + /// Adds an auto-converting, filtered connection to a replier port of the + /// model specified by the address. + /// + /// Queries and replies are mapped to other types using the closures + /// provided in argument, or ignored if the query closure returns `None`. + /// + /// The replier port must be an asynchronous method of a model of type `M` + /// returning a value of the type returned by the reply mapping closure and + /// taking as argument a value of the type returned by the query mapping + /// closure plus, optionally, a context reference. + pub fn filter_map_connect( + &mut self, + query_filer_map: C, + reply_map: D, + replier: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> Option + Send + Sync + 'static, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + U: Send + 'static, + Q: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(FilterMapReplierSender::new( + query_filer_map, + reply_map, + replier, + address.into().0, + )); + self.broadcaster.write().unwrap().add(sender) + } + /// Removes the connection specified by the `LineId` parameter. /// /// It is a logic error to specify a line identifier from another diff --git a/asynchronix/src/ports/output/broadcaster.rs b/asynchronix/src/ports/output/broadcaster.rs index 0be241a..f3cc423 100644 --- a/asynchronix/src/ports/output/broadcaster.rs +++ b/asynchronix/src/ports/output/broadcaster.rs @@ -4,9 +4,8 @@ use std::pin::Pin; use std::task::{Context, Poll}; use diatomic_waker::WakeSink; -use recycle_box::{coerce_box, RecycleBox}; -use super::sender::{SendError, Sender}; +use super::sender::{RecycledFuture, SendError, Sender}; use super::LineId; use crate::util::task_set::TaskSet; @@ -20,8 +19,8 @@ use crate::util::task_set::TaskSet; /// This is somewhat similar to what `FuturesOrdered` in the `futures` crate /// does, but with some key differences: /// -/// - tasks and future storage are reusable to avoid repeated allocation, so -/// allocation occurs only after a new sender is added, +/// - tasks, output storage and future storage are reusable to avoid repeated +/// allocation, so allocation occurs only after a new sender is added, /// - the outputs of all sender futures are returned all at once rather than /// with an asynchronous iterator (a.k.a. async stream). pub(super) struct BroadcasterInner { @@ -46,10 +45,12 @@ impl BroadcasterInner { self.next_line_id += 1; self.senders.push((line_id, sender)); + self.shared.outputs.push(None); - self.shared.futures_env.push(FutureEnv::default()); - - self.shared.task_set.resize(self.senders.len()); + // The storage is alway an empty vector so we just book some capacity. + if let Some(storage) = self.shared.storage.as_mut() { + let _ = storage.try_reserve(self.senders.len()); + }; line_id } @@ -61,8 +62,7 @@ impl BroadcasterInner { pub(super) fn remove(&mut self, id: LineId) -> bool { if let Some(pos) = self.senders.iter().position(|s| s.0 == id) { self.senders.swap_remove(pos); - self.shared.futures_env.swap_remove(pos); - self.shared.task_set.resize(self.senders.len()); + self.shared.outputs.truncate(self.senders.len()); return true; } @@ -73,8 +73,7 @@ impl BroadcasterInner { /// Removes all senders. pub(super) fn clear(&mut self) { self.senders.clear(); - self.shared.futures_env.clear(); - self.shared.task_set.resize(0); + self.shared.outputs.clear(); } /// Returns the number of connected senders. @@ -82,41 +81,35 @@ impl BroadcasterInner { self.senders.len() } - /// Efficiently broadcasts a message or a query to multiple addresses. - /// - /// This method does not collect the responses from queries. - fn broadcast(&mut self, arg: T) -> BroadcastFuture<'_, R> { + /// Return a list of futures broadcasting an event or query to multiple + /// addresses. + #[allow(clippy::type_complexity)] + fn futures( + &mut self, + arg: T, + ) -> ( + &'_ mut Shared, + Vec>>, + ) { let mut futures = recycle_vec(self.shared.storage.take().unwrap_or_default()); // Broadcast the message and collect all futures. - let mut iter = self - .senders - .iter_mut() - .zip(self.shared.futures_env.iter_mut()); - while let Some((sender, futures_env)) = iter.next() { - let future_cache = futures_env - .storage - .take() - .unwrap_or_else(|| RecycleBox::new(())); - + let mut iter = self.senders.iter_mut(); + while let Some(sender) = iter.next() { // Move the argument rather than clone it for the last future. if iter.len() == 0 { - let future: RecycleBox> + Send + '_> = - coerce_box!(RecycleBox::recycle(future_cache, sender.1.send(arg))); - - futures.push(RecycleBox::into_pin(future)); + if let Some(fut) = sender.1.send(arg) { + futures.push(fut); + } break; } - let future: RecycleBox> + Send + '_> = coerce_box!( - RecycleBox::recycle(future_cache, sender.1.send(arg.clone())) - ); - - futures.push(RecycleBox::into_pin(future)); + if let Some(fut) = sender.1.send(arg.clone()) { + futures.push(fut); + } } - // Generate the global future. - BroadcastFuture::new(&mut self.shared, futures) + (&mut self.shared, futures) } } @@ -132,7 +125,7 @@ impl Default for BroadcasterInner { shared: Shared { wake_sink, task_set: TaskSet::new(wake_src), - futures_env: Vec::new(), + outputs: Vec::new(), storage: None, }, } @@ -195,10 +188,22 @@ impl EventBroadcaster { match self.inner.senders.as_mut_slice() { // No sender. [] => Ok(()), - // One sender. - [sender] => sender.1.send(arg).await.map_err(|_| BroadcastError {}), - // Multiple senders. - _ => self.inner.broadcast(arg).await, + + // One sender at most. + [sender] => match sender.1.send(arg) { + None => Ok(()), + Some(fut) => fut.await.map_err(|_| BroadcastError {}), + }, + + // Possibly multiple senders. + _ => { + let (shared, mut futures) = self.inner.futures(arg); + match futures.as_mut_slice() { + [] => Ok(()), + [fut] => fut.await.map_err(|_| BroadcastError {}), + _ => BroadcastFuture::new(shared, futures).await, + } + } } } } @@ -256,26 +261,50 @@ impl QueryBroadcaster { &mut self, arg: T, ) -> Result + '_, BroadcastError> { - match self.inner.senders.as_mut_slice() { + let output_count = match self.inner.senders.as_mut_slice() { // No sender. - [] => {} - // One sender. + [] => 0, + + // One sender at most. [sender] => { - let output = sender.1.send(arg).await.map_err(|_| BroadcastError {})?; - self.inner.shared.futures_env[0].output = Some(output); + if let Some(fut) = sender.1.send(arg) { + let output = fut.await.map_err(|_| BroadcastError {})?; + self.inner.shared.outputs[0] = Some(output); + + 1 + } else { + 0 + } + } + + // Possibly multiple senders. + _ => { + let (shared, mut futures) = self.inner.futures(arg); + let output_count = futures.len(); + + match futures.as_mut_slice() { + [] => {} + [fut] => { + let output = fut.await.map_err(|_| BroadcastError {})?; + shared.outputs[0] = Some(output); + } + _ => { + BroadcastFuture::new(shared, futures).await?; + } + } + + output_count } - // Multiple senders. - _ => self.inner.broadcast(arg).await?, }; - // At this point all outputs should be available so `unwrap` can be - // called on the output of each future. + // At this point all outputs should be available. let outputs = self .inner .shared - .futures_env + .outputs .iter_mut() - .map(|t| t.output.take().unwrap()); + .take(output_count) + .map(|t| t.take().unwrap()); Ok(outputs) } @@ -297,40 +326,20 @@ impl Clone for QueryBroadcaster { } } -/// Data related to a sender future. -struct FutureEnv { - /// Cached storage for the future. - storage: Option>, - /// Output of the associated future. - output: Option, -} - -impl Default for FutureEnv { - fn default() -> Self { - Self { - storage: None, - output: None, - } - } -} - -/// A type-erased `Send` future wrapped in a `RecycleBox`. -type RecycleBoxFuture<'a, R> = RecycleBox> + Send + 'a>; - /// Fields of `Broadcaster` that are explicitly borrowed by a `BroadcastFuture`. struct Shared { /// Thread-safe waker handle. wake_sink: WakeSink, /// Tasks associated to the sender futures. task_set: TaskSet, - /// Data related to the sender futures. - futures_env: Vec>, + /// Outputs of the sender futures. + outputs: Vec>, /// Cached storage for the sender futures. /// /// When it exists, the cached storage is always an empty vector but it /// typically has a non-zero capacity. Its purpose is to reuse the /// previously allocated capacity when creating new sender futures. - storage: Option>>>, + storage: Option>>>, } impl Clone for Shared { @@ -338,13 +347,13 @@ impl Clone for Shared { let wake_sink = WakeSink::new(); let wake_src = wake_sink.source(); - let mut futures_env = Vec::new(); - futures_env.resize_with(self.futures_env.len(), Default::default); + let mut outputs = Vec::new(); + outputs.resize_with(self.outputs.len(), Default::default); Self { wake_sink, - task_set: TaskSet::with_len(wake_src, self.task_set.len()), - futures_env, + task_set: TaskSet::new(wake_src), + outputs, storage: None, } } @@ -363,7 +372,7 @@ pub(super) struct BroadcastFuture<'a, R> { /// Reference to the shared fields of the `Broadcast` object. shared: &'a mut Shared, /// List of all send futures. - futures: ManuallyDrop>>>, + futures: ManuallyDrop>>>, /// The total count of futures that have not yet been polled to completion. pending_futures_count: usize, /// State of completion of the future. @@ -372,14 +381,17 @@ pub(super) struct BroadcastFuture<'a, R> { impl<'a, R> BroadcastFuture<'a, R> { /// Creates a new `BroadcastFuture`. - fn new(shared: &'a mut Shared, futures: Vec>>) -> Self { + fn new( + shared: &'a mut Shared, + futures: Vec>>, + ) -> Self { let pending_futures_count = futures.len(); + shared.task_set.resize(pending_futures_count); - assert!(shared.futures_env.len() == pending_futures_count); - - for futures_env in shared.futures_env.iter_mut() { - // Drop the previous output if necessary. - futures_env.output.take(); + for output in shared.outputs.iter_mut().take(pending_futures_count) { + // Empty the output slots to be used. This is necessary in case the + // previous broadcast future was cancelled. + output.take(); } BroadcastFuture { @@ -395,12 +407,7 @@ impl<'a, R> Drop for BroadcastFuture<'a, R> { fn drop(&mut self) { // Safety: this is safe since `self.futures` is never accessed after it // is moved out. - let mut futures = unsafe { ManuallyDrop::take(&mut self.futures) }; - - // Recycle the future-containing boxes. - for (future, futures_env) in futures.drain(..).zip(self.shared.futures_env.iter_mut()) { - futures_env.storage = Some(RecycleBox::vacate_pinned(future)); - } + let futures = unsafe { ManuallyDrop::take(&mut self.futures) }; // Recycle the vector that contained the futures. self.shared.storage = Some(recycle_vec(futures)); @@ -413,7 +420,11 @@ impl<'a, R> Future for BroadcastFuture<'a, R> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = &mut *self; - assert_ne!(this.state, FutureState::Completed); + assert_ne!( + this.state, + FutureState::Completed, + "broadcast future polled after completion" + ); // Poll all sender futures once if this is the first time the broadcast // future is polled. @@ -425,14 +436,14 @@ impl<'a, R> Future for BroadcastFuture<'a, R> { this.shared.task_set.discard_scheduled(); for task_idx in 0..this.futures.len() { - let future_env = &mut this.shared.futures_env[task_idx]; - let future = &mut this.futures[task_idx]; + let output = &mut this.shared.outputs[task_idx]; + let future = std::pin::Pin::new(&mut this.futures[task_idx]); let task_waker_ref = this.shared.task_set.waker_of(task_idx); let task_cx_ref = &mut Context::from_waker(&task_waker_ref); - match future.as_mut().poll(task_cx_ref) { - Poll::Ready(Ok(output)) => { - future_env.output = Some(output); + match future.poll(task_cx_ref) { + Poll::Ready(Ok(o)) => { + *output = Some(o); this.pending_futures_count -= 1; } Poll::Ready(Err(_)) => { @@ -477,20 +488,20 @@ impl<'a, R> Future for BroadcastFuture<'a, R> { }; for task_idx in scheduled_tasks { - let future_env = &mut this.shared.futures_env[task_idx]; + let output = &mut this.shared.outputs[task_idx]; // Do not poll completed futures. - if future_env.output.is_some() { + if output.is_some() { continue; } - let future = &mut this.futures[task_idx]; + let future = std::pin::Pin::new(&mut this.futures[task_idx]); let task_waker_ref = this.shared.task_set.waker_of(task_idx); let task_cx_ref = &mut Context::from_waker(&task_waker_ref); - match future.as_mut().poll(task_cx_ref) { - Poll::Ready(Ok(output)) => { - future_env.output = Some(output); + match future.poll(task_cx_ref) { + Poll::Ready(Ok(o)) => { + *output = Some(o); this.pending_futures_count -= 1; } Poll::Ready(Err(_)) => { @@ -703,6 +714,7 @@ mod tests { use loom::sync::atomic::{AtomicBool, Ordering}; use loom::thread; + use recycle_box::RecycleBox; use waker_fn::waker_fn; use super::super::sender::RecycledFuture; @@ -714,15 +726,15 @@ mod tests { fut_storage: Option>, } impl Sender<(), R> for TestEvent { - fn send(&mut self, _arg: ()) -> RecycledFuture<'_, Result> { + fn send(&mut self, _arg: ()) -> Option>> { let fut_storage = &mut self.fut_storage; let receiver = &mut self.receiver; - RecycledFuture::new(fut_storage, async { + Some(RecycledFuture::new(fut_storage, async { let mut stream = Box::pin(receiver.filter_map(|item| async { item })); Ok(stream.next().await.unwrap()) - }) + })) } } @@ -758,6 +770,14 @@ mod tests { ) } + // This tests fails with "Concurrent load and mut accesses" even though the + // `task_list` implementation which triggers it does not use any unsafe. + // This is most certainly related to this Loom bug: + // + // https://github.com/tokio-rs/loom/issues/260 + // + // Disabling until the bug is fixed. + #[ignore] #[test] fn loom_broadcast_basic() { const DEFAULT_PREEMPTION_BOUND: usize = 3; @@ -831,6 +851,14 @@ mod tests { }); } + // This tests fails with "Concurrent load and mut accesses" even though the + // `task_list` implementation which triggers it does not use any unsafe. + // This is most certainly related to this Loom bug: + // + // https://github.com/tokio-rs/loom/issues/260 + // + // Disabling until the bug is fixed. + #[ignore] #[test] fn loom_broadcast_spurious() { const DEFAULT_PREEMPTION_BOUND: usize = 3; diff --git a/asynchronix/src/ports/output/sender.rs b/asynchronix/src/ports/output/sender.rs index 1c9ab02..9ddb9c7 100644 --- a/asynchronix/src/ports/output/sender.rs +++ b/asynchronix/src/ports/output/sender.rs @@ -4,6 +4,7 @@ use std::future::Future; use std::marker::PhantomData; use std::mem::ManuallyDrop; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use dyn_clone::DynClone; @@ -17,17 +18,15 @@ use crate::ports::{EventSinkWriter, InputFn, ReplierFn}; /// replier method. pub(super) trait Sender: DynClone + Send { /// Asynchronously send the event or request. - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result>; + fn send(&mut self, arg: T) -> Option>>; } dyn_clone::clone_trait_object!( Sender); /// An object that can send events to an input port. -pub(super) struct InputSender +pub(super) struct InputSender where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + 'static, + M: 'static, { func: F, sender: channel::Sender, @@ -36,11 +35,9 @@ where _phantom_closure_marker: PhantomData, } -impl InputSender +impl InputSender where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + 'static, + M: 'static, { pub(super) fn new(func: F, sender: channel::Sender) -> Self { Self { @@ -53,14 +50,14 @@ where } } -impl Sender for InputSender +impl Sender for InputSender where M: Model, F: for<'a> InputFn<'a, M, T, S> + Clone, T: Send + 'static, - S: Send + 'static, + S: Send, { - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + fn send(&mut self, arg: T) -> Option>> { let func = self.func.clone(); let fut = self.sender.send(move |model, scheduler, recycle_box| { @@ -69,32 +66,344 @@ where coerce_box!(RecycleBox::recycle(recycle_box, fut)) }); - RecycledFuture::new(&mut self.fut_storage, async move { + Some(RecycledFuture::new(&mut self.fut_storage, async move { fut.await.map_err(|_| SendError {}) - }) + })) } } -impl Clone for InputSender +impl Clone for InputSender +where + M: 'static, + F: Clone, +{ + fn clone(&self) -> Self { + Self { + func: self.func.clone(), + sender: self.sender.clone(), + fut_storage: None, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +/// An object that can send mapped events to an input port. +pub(super) struct MapInputSender +where + M: 'static, +{ + map: Arc, + func: F, + sender: channel::Sender, + fut_storage: Option>, + _phantom_map: PhantomData U>, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, +} + +impl MapInputSender +where + M: 'static, +{ + pub(super) fn new(map: C, func: F, sender: channel::Sender) -> Self { + Self { + map: Arc::new(map), + func, + sender, + fut_storage: None, + _phantom_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for MapInputSender where M: Model, - F: for<'a> InputFn<'a, M, T, S> + Clone, + C: Fn(T) -> U + Send + Sync, + F: for<'a> InputFn<'a, M, U, S> + Clone, + T: Send + 'static, + U: Send + 'static, + S: Send, +{ + fn send(&mut self, arg: T) -> Option>> { + let func = self.func.clone(); + let arg = (self.map)(arg); + + let fut = self.sender.send(move |model, scheduler, recycle_box| { + let fut = func.call(model, arg, scheduler); + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }); + + Some(RecycledFuture::new(&mut self.fut_storage, async move { + fut.await.map_err(|_| SendError {}) + })) + } +} + +impl Clone for MapInputSender +where + M: 'static, + F: Clone, +{ + fn clone(&self) -> Self { + Self { + map: self.map.clone(), + func: self.func.clone(), + sender: self.sender.clone(), + fut_storage: None, + _phantom_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +/// An object that can filter and send mapped events to an input port. +pub(super) struct FilterMapInputSender +where + M: 'static, +{ + filter_map: Arc, + func: F, + sender: channel::Sender, + fut_storage: Option>, + _phantom_filter_map: PhantomData Option>, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, +} + +impl FilterMapInputSender +where + M: 'static, +{ + pub(super) fn new(filter_map: C, func: F, sender: channel::Sender) -> Self { + Self { + filter_map: Arc::new(filter_map), + func, + sender, + fut_storage: None, + _phantom_filter_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for FilterMapInputSender +where + M: Model, + C: Fn(T) -> Option + Send + Sync, + F: for<'a> InputFn<'a, M, U, S> + Clone, T: Send + 'static, - S: Send + 'static, + U: Send + 'static, + S: Send, +{ + fn send(&mut self, arg: T) -> Option>> { + (self.filter_map)(arg).map(|arg| { + let func = self.func.clone(); + + let fut = self.sender.send(move |model, scheduler, recycle_box| { + let fut = func.call(model, arg, scheduler); + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }); + + RecycledFuture::new(&mut self.fut_storage, async move { + fut.await.map_err(|_| SendError {}) + }) + }) + } +} + +impl Clone for FilterMapInputSender +where + M: 'static, + F: Clone, { fn clone(&self) -> Self { Self { + filter_map: self.filter_map.clone(), func: self.func.clone(), sender: self.sender.clone(), fut_storage: None, + _phantom_filter_map: PhantomData, _phantom_closure: PhantomData, _phantom_closure_marker: PhantomData, } } } -/// An object that can send a request to a replier port and retrieve a response. -pub(super) struct ReplierSender { +/// An object that can send an event to an event sink. +pub(super) struct EventSinkSender { + writer: W, + fut_storage: Option>, + _phantom_event: PhantomData, +} + +impl EventSinkSender { + pub(super) fn new(writer: W) -> Self { + Self { + writer, + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + +impl Sender for EventSinkSender +where + T: Send + 'static, + W: EventSinkWriter, +{ + fn send(&mut self, arg: T) -> Option>> { + let writer = &mut self.writer; + + Some(RecycledFuture::new(&mut self.fut_storage, async move { + writer.write(arg); + + Ok(()) + })) + } +} + +impl Clone for EventSinkSender { + fn clone(&self) -> Self { + Self { + writer: self.writer.clone(), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + +/// An object that can send mapped events to an event sink. +pub(super) struct MapEventSinkSender +where + C: Fn(T) -> U, +{ + writer: W, + map: Arc, + fut_storage: Option>, + _phantom_event: PhantomData, +} + +impl MapEventSinkSender +where + C: Fn(T) -> U, +{ + pub(super) fn new(map: C, writer: W) -> Self { + Self { + writer, + map: Arc::new(map), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + +impl Sender for MapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> U + Send + Sync, + W: EventSinkWriter, +{ + fn send(&mut self, arg: T) -> Option>> { + let writer = &mut self.writer; + let arg = (self.map)(arg); + + Some(RecycledFuture::new(&mut self.fut_storage, async move { + writer.write(arg); + + Ok(()) + })) + } +} + +impl Clone for MapEventSinkSender +where + C: Fn(T) -> U, + W: Clone, +{ + fn clone(&self) -> Self { + Self { + writer: self.writer.clone(), + map: self.map.clone(), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + +/// An object that can filter and send mapped events to an event sink. +pub(super) struct FilterMapEventSinkSender +where + C: Fn(T) -> Option, +{ + writer: W, + filter_map: Arc, + fut_storage: Option>, + _phantom_event: PhantomData, +} + +impl FilterMapEventSinkSender +where + C: Fn(T) -> Option, +{ + pub(super) fn new(filter_map: C, writer: W) -> Self { + Self { + writer, + filter_map: Arc::new(filter_map), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + +impl Sender for FilterMapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> Option + Send + Sync, + W: EventSinkWriter, +{ + fn send(&mut self, arg: T) -> Option>> { + let writer = &mut self.writer; + + (self.filter_map)(arg).map(|arg| { + RecycledFuture::new(&mut self.fut_storage, async move { + writer.write(arg); + + Ok(()) + }) + }) + } +} + +impl Clone for FilterMapEventSinkSender +where + C: Fn(T) -> Option, + W: Clone, +{ + fn clone(&self) -> Self { + Self { + writer: self.writer.clone(), + filter_map: self.filter_map.clone(), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + +/// An object that can send requests to a replier port and retrieve responses. +pub(super) struct ReplierSender +where + M: Model, +{ func: F, sender: channel::Sender, receiver: multishot::Receiver, @@ -106,9 +415,6 @@ pub(super) struct ReplierSender { impl ReplierSender where M: Model, - F: for<'a> ReplierFn<'a, M, T, R, S>, - T: Send + 'static, - R: Send + 'static, { pub(super) fn new(func: F, sender: channel::Sender) -> Self { Self { @@ -130,7 +436,7 @@ where R: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result> { + fn send(&mut self, arg: T) -> Option>> { let func = self.func.clone(); let sender = &mut self.sender; let reply_receiver = &mut self.receiver; @@ -149,7 +455,7 @@ where coerce_box!(RecycleBox::recycle(recycle_box, fut)) }); - RecycledFuture::new(fut_storage, async move { + Some(RecycledFuture::new(fut_storage, async move { // Send the message. send_fut.await.map_err(|_| SendError {})?; @@ -157,73 +463,245 @@ where // If an error is received, it most likely means the mailbox was // dropped before the message was processed. reply_receiver.recv().await.map_err(|_| SendError {}) - }) + })) } } impl Clone for ReplierSender where M: Model, - F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, + F: Clone, +{ + fn clone(&self) -> Self { + Self { + func: self.func.clone(), + sender: self.sender.clone(), + receiver: multishot::Receiver::new(), + fut_storage: None, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +/// An object that can send mapped requests to a replier port and retrieve +/// mapped responses. +pub(super) struct MapReplierSender +where + M: Model, +{ + query_map: Arc, + reply_map: Arc, + func: F, + sender: channel::Sender, + receiver: multishot::Receiver, + fut_storage: Option>, + _phantom_query_map: PhantomData U>, + _phantom_reply_map: PhantomData R>, + _phantom_closure: PhantomData Q>, + _phantom_closure_marker: PhantomData, +} + +impl MapReplierSender +where + M: Model, +{ + pub(super) fn new(query_map: C, reply_map: D, func: F, sender: channel::Sender) -> Self { + Self { + query_map: Arc::new(query_map), + reply_map: Arc::new(reply_map), + func, + sender, + receiver: multishot::Receiver::new(), + fut_storage: None, + _phantom_query_map: PhantomData, + _phantom_reply_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for MapReplierSender +where + M: Model, + C: Fn(T) -> U + Send + Sync, + D: Fn(Q) -> R + Send + Sync, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, T: Send + 'static, R: Send + 'static, + U: Send + 'static, + Q: Send + 'static, S: Send, +{ + fn send(&mut self, arg: T) -> Option>> { + let func = self.func.clone(); + let arg = (self.query_map)(arg); + let sender = &mut self.sender; + let reply_receiver = &mut self.receiver; + let fut_storage = &mut self.fut_storage; + let reply_map = &*self.reply_map; + + // The previous future generated by this method should have been polled + // to completion so a new sender should be readily available. + let reply_sender = reply_receiver.sender().unwrap(); + + let send_fut = sender.send(move |model, scheduler, recycle_box| { + let fut = async move { + let reply = func.call(model, arg, scheduler).await; + reply_sender.send(reply); + }; + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }); + + Some(RecycledFuture::new(fut_storage, async move { + // Send the message. + send_fut.await.map_err(|_| SendError {})?; + + // Wait until the message is processed and the reply is sent back. + // If an error is received, it most likely means the mailbox was + // dropped before the message was processed. + reply_receiver + .recv() + .await + .map_err(|_| SendError {}) + .map(reply_map) + })) + } +} + +impl Clone for MapReplierSender +where + M: Model, + F: Clone, { fn clone(&self) -> Self { Self { + query_map: self.query_map.clone(), + reply_map: self.reply_map.clone(), func: self.func.clone(), sender: self.sender.clone(), receiver: multishot::Receiver::new(), fut_storage: None, + _phantom_query_map: PhantomData, + _phantom_reply_map: PhantomData, _phantom_closure: PhantomData, _phantom_closure_marker: PhantomData, } } } -/// An object that can send a payload to an event sink. -pub(super) struct EventSinkSender> { - writer: W, +/// An object that can filter and send mapped requests to a replier port and +/// retrieve mapped responses. +pub(super) struct FilterMapReplierSender +where + M: Model, +{ + query_filter_map: Arc, + reply_map: Arc, + func: F, + sender: channel::Sender, + receiver: multishot::Receiver, fut_storage: Option>, - _phantom_event: PhantomData, + _phantom_query_map: PhantomData U>, + _phantom_reply_map: PhantomData R>, + _phantom_closure: PhantomData Q>, + _phantom_closure_marker: PhantomData, } -impl> EventSinkSender { - pub(super) fn new(writer: W) -> Self { +impl FilterMapReplierSender +where + M: Model, +{ + pub(super) fn new( + query_filter_map: C, + reply_map: D, + func: F, + sender: channel::Sender, + ) -> Self { Self { - writer, + query_filter_map: Arc::new(query_filter_map), + reply_map: Arc::new(reply_map), + func, + sender, + receiver: multishot::Receiver::new(), fut_storage: None, - _phantom_event: PhantomData, + _phantom_query_map: PhantomData, + _phantom_reply_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, } } } -impl Sender for EventSinkSender +impl Sender for FilterMapReplierSender where + M: Model, + C: Fn(T) -> Option + Send + Sync, + D: Fn(Q) -> R + Send + Sync, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, T: Send + 'static, - W: EventSinkWriter, + R: Send + 'static, + U: Send + 'static, + Q: Send + 'static, + S: Send, { - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { - let writer = &mut self.writer; - - RecycledFuture::new(&mut self.fut_storage, async move { - writer.write(arg); - - Ok(()) + fn send(&mut self, arg: T) -> Option>> { + (self.query_filter_map)(arg).map(|arg| { + let func = self.func.clone(); + let sender = &mut self.sender; + let reply_receiver = &mut self.receiver; + let fut_storage = &mut self.fut_storage; + let reply_map = &*self.reply_map; + + // The previous future generated by this method should have been polled + // to completion so a new sender should be readily available. + let reply_sender = reply_receiver.sender().unwrap(); + + let send_fut = sender.send(move |model, scheduler, recycle_box| { + let fut = async move { + let reply = func.call(model, arg, scheduler).await; + reply_sender.send(reply); + }; + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }); + + RecycledFuture::new(fut_storage, async move { + // Send the message. + send_fut.await.map_err(|_| SendError {})?; + + // Wait until the message is processed and the reply is sent back. + // If an error is received, it most likely means the mailbox was + // dropped before the message was processed. + reply_receiver + .recv() + .await + .map_err(|_| SendError {}) + .map(reply_map) + }) }) } } -impl Clone for EventSinkSender +impl Clone for FilterMapReplierSender where - T: Send + 'static, - W: EventSinkWriter, + M: Model, + F: Clone, { fn clone(&self) -> Self { Self { - writer: self.writer.clone(), + query_filter_map: self.query_filter_map.clone(), + reply_map: self.reply_map.clone(), + func: self.func.clone(), + sender: self.sender.clone(), + receiver: multishot::Receiver::new(), fut_storage: None, - _phantom_event: PhantomData, + _phantom_query_map: PhantomData, + _phantom_reply_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, } } } diff --git a/asynchronix/src/ports/source.rs b/asynchronix/src/ports/source.rs index 7c8ce3e..08398b9 100644 --- a/asynchronix/src/ports/source.rs +++ b/asynchronix/src/ports/source.rs @@ -13,9 +13,11 @@ use crate::simulation::{ }; use crate::util::slot; -use broadcaster::ReplyIterator; -use broadcaster::{EventBroadcaster, QueryBroadcaster}; -use sender::{InputSender, ReplierSender}; +use broadcaster::{EventBroadcaster, QueryBroadcaster, ReplyIterator}; +use sender::{ + FilterMapInputSender, FilterMapReplierSender, InputSender, MapInputSender, MapReplierSender, + ReplierSender, +}; use super::ReplierFn; @@ -51,6 +53,58 @@ impl EventSource { self.broadcaster.lock().unwrap().add(sender) } + /// Adds an auto-converting connection to an input port of the model + /// specified by the address. + /// + /// Events are mapped to another type using the closure provided in + /// argument. + /// + /// The input port must be an asynchronous method of a model of type `M` + /// taking as argument a value of the type returned by the mapping closure + /// plus, optionally, a context reference. + pub fn map_connect( + &mut self, + map: C, + input: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> U + Send + 'static, + F: for<'a> InputFn<'a, M, U, S> + Clone, + U: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(MapInputSender::new(map, input, address.into().0)); + self.broadcaster.lock().unwrap().add(sender) + } + + /// Adds an auto-converting, filtered connection to an input port of the + /// model specified by the address. + /// + /// Events are mapped to another type using the closure provided in + /// argument, or ignored if the closure returns `None`. + /// + /// The input port must be an asynchronous method of a model of type `M` + /// taking as argument a value of the type returned by the mapping closure + /// plus, optionally, a context reference. + pub fn filter_map_connect( + &mut self, + map: C, + input: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> Option + Send + 'static, + F: for<'a> InputFn<'a, M, U, S> + Clone, + U: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(FilterMapInputSender::new(map, input, address.into().0)); + self.broadcaster.lock().unwrap().add(sender) + } + /// Removes the connection specified by the `LineId` parameter. /// /// It is a logic error to specify a line identifier from another @@ -193,7 +247,7 @@ impl QuerySource { /// /// The replier port must be an asynchronous method of a model of type `M` /// returning a value of type `R` and taking as argument a value of type `T` - /// plus, optionally, a scheduler reference. + /// plus, optionally, a context reference. pub fn connect(&mut self, replier: F, address: impl Into>) -> LineId where M: Model, @@ -204,6 +258,76 @@ impl QuerySource { self.broadcaster.lock().unwrap().add(sender) } + /// Adds an auto-converting connection to a replier port of the model + /// specified by the address. + /// + /// Queries and replies are mapped to other types using the closures + /// provided in argument. + /// + /// The replier port must be an asynchronous method of a model of type `M` + /// returning a value of the type returned by the reply mapping closure and + /// taking as argument a value of the type returned by the query mapping + /// closure plus, optionally, a context reference. + pub fn map_connect( + &mut self, + query_map: C, + reply_map: D, + replier: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> U + Send + 'static, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + U: Send + 'static, + Q: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(MapReplierSender::new( + query_map, + reply_map, + replier, + address.into().0, + )); + self.broadcaster.lock().unwrap().add(sender) + } + + /// Adds an auto-converting, filtered connection to a replier port of the + /// model specified by the address. + /// + /// Queries and replies are mapped to other types using the closures + /// provided in argument, or ignored if the query closure returns `None`. + /// + /// The replier port must be an asynchronous method of a model of type `M` + /// returning a value of the type returned by the reply mapping closure and + /// taking as argument a value of the type returned by the query mapping + /// closure plus, optionally, a context reference. + pub fn filter_map_connect( + &mut self, + query_filter_map: C, + reply_map: D, + replier: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> Option + Send + 'static, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + U: Send + 'static, + Q: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(FilterMapReplierSender::new( + query_filter_map, + reply_map, + replier, + address.into().0, + )); + self.broadcaster.lock().unwrap().add(sender) + } + /// Removes the connection specified by the `LineId` parameter. /// /// It is a logic error to specify a line identifier from another diff --git a/asynchronix/src/ports/source/broadcaster.rs b/asynchronix/src/ports/source/broadcaster.rs index b545b66..bb36054 100644 --- a/asynchronix/src/ports/source/broadcaster.rs +++ b/asynchronix/src/ports/source/broadcaster.rs @@ -71,26 +71,27 @@ impl BroadcasterInner { self.senders.len() } - /// Efficiently broadcasts a message or a query to multiple addresses. - /// - /// This method does not collect the responses from queries. - fn broadcast(&mut self, arg: T) -> BroadcastFuture { - let mut future_states = Vec::with_capacity(self.senders.len()); + /// Return a list of futures broadcasting an event or query to multiple + /// addresses. + fn futures(&mut self, arg: T) -> Vec> { + let mut future_states = Vec::new(); // Broadcast the message and collect all futures. let mut iter = self.senders.iter_mut(); while let Some(sender) = iter.next() { // Move the argument rather than clone it for the last future. if iter.len() == 0 { - future_states.push(SenderFutureState::Pending(sender.1.send(arg))); + if let Some(fut) = sender.1.send(arg) { + future_states.push(SenderFutureState::Pending(fut)); + } break; } - - future_states.push(SenderFutureState::Pending(sender.1.send(arg.clone()))); + if let Some(fut) = sender.1.send(arg.clone()) { + future_states.push(SenderFutureState::Pending(fut)); + } } - // Generate the global future. - BroadcastFuture::new(future_states) + future_states } } @@ -157,17 +158,27 @@ impl EventBroadcaster { let fut = match self.inner.senders.as_mut_slice() { // No sender. [] => Fut::Empty, - // One sender. + // One sender at most. [sender] => Fut::Single(sender.1.send(arg)), - // Multiple senders. - _ => Fut::Multiple(self.inner.broadcast(arg)), + // Possibly multiple senders. + _ => Fut::Multiple(self.inner.futures(arg)), }; async { match fut { - Fut::Empty => Ok(()), - Fut::Single(fut) => fut.await.map_err(|_| BroadcastError {}), - Fut::Multiple(fut) => fut.await.map(|_| ()), + // No sender. + Fut::Empty | Fut::Single(None) => Ok(()), + + Fut::Single(Some(fut)) => fut.await.map_err(|_| BroadcastError {}), + + Fut::Multiple(mut futures) => match futures.as_mut_slice() { + // No sender. + [] => Ok(()), + // One sender. + [SenderFutureState::Pending(fut)] => fut.await.map_err(|_| BroadcastError {}), + // Multiple senders. + _ => BroadcastFuture::new(futures).await.map(|_| ()), + }, } } } @@ -235,20 +246,39 @@ impl QueryBroadcaster { let fut = match self.inner.senders.as_mut_slice() { // No sender. [] => Fut::Empty, - // One sender. + // One sender at most. [sender] => Fut::Single(sender.1.send(arg)), - // Multiple senders. - _ => Fut::Multiple(self.inner.broadcast(arg)), + // Possibly multiple senders. + _ => Fut::Multiple(self.inner.futures(arg)), }; async { match fut { - Fut::Empty => Ok(ReplyIterator(Vec::new().into_iter())), - Fut::Single(fut) => fut + // No sender. + Fut::Empty | Fut::Single(None) => Ok(ReplyIterator(Vec::new().into_iter())), + + Fut::Single(Some(fut)) => fut .await .map(|reply| ReplyIterator(vec![SenderFutureState::Ready(reply)].into_iter())) .map_err(|_| BroadcastError {}), - Fut::Multiple(fut) => fut.await.map_err(|_| BroadcastError {}), + + Fut::Multiple(mut futures) => match futures.as_mut_slice() { + // No sender. + [] => Ok(ReplyIterator(Vec::new().into_iter())), + + // One sender. + [SenderFutureState::Pending(fut)] => fut + .await + .map(|reply| { + ReplyIterator(vec![SenderFutureState::Ready(reply)].into_iter()) + }) + .map_err(|_| BroadcastError {}), + + // Multiple senders. + _ => BroadcastFuture::new(futures) + .await + .map_err(|_| BroadcastError {}), + }, } } } @@ -598,14 +628,17 @@ mod tests { receiver: Option>>, } impl Sender<(), R> for TestEvent { - fn send(&mut self, _arg: ()) -> Pin> + Send>> { + fn send( + &mut self, + _arg: (), + ) -> Option> + Send>>> { let receiver = self.receiver.take().unwrap(); - Box::pin(async move { + Some(Box::pin(async move { let mut stream = Box::pin(receiver.filter_map(|item| async { item })); Ok(stream.next().await.unwrap()) - }) + })) } } @@ -634,8 +667,16 @@ mod tests { ) } + // This tests fails with "Concurrent load and mut accesses" even though the + // `task_list` implementation which triggers it does not use any unsafe. + // This is most certainly related to this Loom bug: + // + // https://github.com/tokio-rs/loom/issues/260 + // + // Disabling until the bug is fixed. + #[ignore] #[test] - fn loom_broadcast_basic() { + fn loom_broadcast_query_basic() { const DEFAULT_PREEMPTION_BOUND: usize = 3; let mut builder = Builder::new(); @@ -707,8 +748,16 @@ mod tests { }); } + // This tests fails with "Concurrent load and mut accesses" even though the + // `task_list` implementation which triggers it does not use any unsafe. + // This is most certainly related to this Loom bug: + // + // https://github.com/tokio-rs/loom/issues/260 + // + // Disabling until the bug is fixed. + #[ignore] #[test] - fn loom_broadcast_spurious() { + fn loom_broadcast_query_spurious() { const DEFAULT_PREEMPTION_BOUND: usize = 3; let mut builder = Builder::new(); diff --git a/asynchronix/src/ports/source/sender.rs b/asynchronix/src/ports/source/sender.rs index 1e83141..66c7080 100644 --- a/asynchronix/src/ports/source/sender.rs +++ b/asynchronix/src/ports/source/sender.rs @@ -3,6 +3,7 @@ use std::fmt; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; +use std::sync::Arc; use futures_channel::oneshot; use recycle_box::{coerce_box, RecycleBox}; @@ -16,22 +17,23 @@ pub(super) type SenderFuture = Pin: Send { /// Asynchronously send the event or request. - fn send(&mut self, arg: T) -> SenderFuture; + fn send(&mut self, arg: T) -> Option>; } /// An object that can send events to an input port. -pub(super) struct InputSender { +pub(super) struct InputSender +where + M: 'static, +{ func: F, sender: channel::Sender, _phantom_closure: PhantomData, _phantom_closure_marker: PhantomData, } -impl InputSender +impl InputSender where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + 'static, + M: 'static, { pub(super) fn new(func: F, sender: channel::Sender) -> Self { Self { @@ -43,18 +45,74 @@ where } } -impl Sender for InputSender +impl Sender for InputSender where M: Model, F: for<'a> InputFn<'a, M, T, S> + Clone, T: Send + 'static, - S: Send + 'static, + S: Send, +{ + fn send(&mut self, arg: T) -> Option> { + let func = self.func.clone(); + let sender = self.sender.clone(); + + Some(Box::pin(async move { + sender + .send(move |model, scheduler, recycle_box| { + let fut = func.call(model, arg, scheduler); + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }) + .await + .map_err(|_| SendError {}) + })) + } +} + +/// An object that can send mapped events to an input port. +pub(super) struct MapInputSender +where + M: 'static, +{ + map: C, + func: F, + sender: channel::Sender, + _phantom_map: PhantomData U>, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, +} + +impl MapInputSender +where + M: 'static, +{ + pub(super) fn new(map: C, func: F, sender: channel::Sender) -> Self { + Self { + map, + func, + sender, + _phantom_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for MapInputSender +where + M: Model, + C: Fn(T) -> U + Send, + F: for<'a> InputFn<'a, M, U, S> + Clone, + T: Send + 'static, + U: Send + 'static, + S: Send, { - fn send(&mut self, arg: T) -> SenderFuture<()> { + fn send(&mut self, arg: T) -> Option> { let func = self.func.clone(); + let arg = (self.map)(arg); let sender = self.sender.clone(); - Box::pin(async move { + Some(Box::pin(async move { sender .send(move |model, scheduler, recycle_box| { let fut = func.call(model, arg, scheduler); @@ -63,12 +121,72 @@ where }) .await .map_err(|_| SendError {}) + })) + } +} + +/// An object that can filter and send mapped events to an input port. +pub(super) struct FilterMapInputSender +where + M: 'static, +{ + filter_map: C, + func: F, + sender: channel::Sender, + _phantom_map: PhantomData U>, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, +} + +impl FilterMapInputSender +where + M: 'static, +{ + pub(super) fn new(filter_map: C, func: F, sender: channel::Sender) -> Self { + Self { + filter_map, + func, + sender, + _phantom_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for FilterMapInputSender +where + M: Model, + C: Fn(T) -> Option + Send, + F: for<'a> InputFn<'a, M, U, S> + Clone, + T: Send + 'static, + U: Send + 'static, + S: Send, +{ + fn send(&mut self, arg: T) -> Option> { + (self.filter_map)(arg).map(|arg| { + let func = self.func.clone(); + let sender = self.sender.clone(); + + Box::pin(async move { + sender + .send(move |model, scheduler, recycle_box| { + let fut = func.call(model, arg, scheduler); + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }) + .await + .map_err(|_| SendError {}) + }) as SenderFuture<()> }) } } /// An object that can send a request to a replier port and retrieve a response. -pub(super) struct ReplierSender { +pub(super) struct ReplierSender +where + M: 'static, +{ func: F, sender: channel::Sender, _phantom_closure: PhantomData R>, @@ -77,10 +195,7 @@ pub(super) struct ReplierSender { impl ReplierSender where - M: Model, - F: for<'a> ReplierFn<'a, M, T, R, S>, - T: Send + 'static, - R: Send + 'static, + M: 'static, { pub(super) fn new(func: F, sender: channel::Sender) -> Self { Self { @@ -100,12 +215,12 @@ where R: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> SenderFuture { + fn send(&mut self, arg: T) -> Option> { let func = self.func.clone(); let sender = self.sender.clone(); let (reply_sender, reply_receiver) = oneshot::channel(); - Box::pin(async move { + Some(Box::pin(async move { sender .send(move |model, scheduler, recycle_box| { let fut = async move { @@ -119,6 +234,160 @@ where .map_err(|_| SendError {})?; reply_receiver.await.map_err(|_| SendError {}) + })) + } +} + +/// An object that can send a mapped request to a replier port and retrieve a +/// mapped response. +pub(super) struct MapReplierSender +where + M: 'static, +{ + query_map: C, + reply_map: Arc, + func: F, + sender: channel::Sender, + _phantom_query_map: PhantomData U>, + _phantom_reply_map: PhantomData R>, + _phantom_closure: PhantomData Q>, + _phantom_closure_marker: PhantomData, +} + +impl MapReplierSender +where + M: 'static, +{ + pub(super) fn new(query_map: C, reply_map: D, func: F, sender: channel::Sender) -> Self { + Self { + query_map, + reply_map: Arc::new(reply_map), + func, + sender, + _phantom_query_map: PhantomData, + _phantom_reply_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for MapReplierSender +where + M: Model, + C: Fn(T) -> U + Send, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + T: Send + 'static, + R: Send + 'static, + U: Send + 'static, + Q: Send + 'static, + S: Send, +{ + fn send(&mut self, arg: T) -> Option> { + let func = self.func.clone(); + let arg = (self.query_map)(arg); + let sender = self.sender.clone(); + let reply_map = self.reply_map.clone(); + let (reply_sender, reply_receiver) = oneshot::channel(); + + Some(Box::pin(async move { + sender + .send(move |model, scheduler, recycle_box| { + let fut = async move { + let reply = func.call(model, arg, scheduler).await; + let _ = reply_sender.send(reply); + }; + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }) + .await + .map_err(|_| SendError {})?; + + reply_receiver + .await + .map_err(|_| SendError {}) + .map(&*reply_map) + })) + } +} + +/// An object that can filter and send a mapped request to a replier port and +/// retrieve a mapped response. +pub(super) struct FilterMapReplierSender +where + M: 'static, +{ + query_filter_map: C, + reply_map: Arc, + func: F, + sender: channel::Sender, + _phantom_query_map: PhantomData Option>, + _phantom_reply_map: PhantomData R>, + _phantom_closure: PhantomData Q>, + _phantom_closure_marker: PhantomData, +} + +impl FilterMapReplierSender +where + M: 'static, +{ + pub(super) fn new( + query_filter_map: C, + reply_map: D, + func: F, + sender: channel::Sender, + ) -> Self { + Self { + query_filter_map, + reply_map: Arc::new(reply_map), + func, + sender, + _phantom_query_map: PhantomData, + _phantom_reply_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for FilterMapReplierSender +where + M: Model, + C: Fn(T) -> Option + Send, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + T: Send + 'static, + R: Send + 'static, + U: Send + 'static, + Q: Send + 'static, + S: Send, +{ + fn send(&mut self, arg: T) -> Option> { + (self.query_filter_map)(arg).map(|arg| { + let func = self.func.clone(); + let sender = self.sender.clone(); + let reply_map = self.reply_map.clone(); + let (reply_sender, reply_receiver) = oneshot::channel(); + + Box::pin(async move { + sender + .send(move |model, scheduler, recycle_box| { + let fut = async move { + let reply = func.call(model, arg, scheduler).await; + let _ = reply_sender.send(reply); + }; + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }) + .await + .map_err(|_| SendError {})?; + + reply_receiver + .await + .map_err(|_| SendError {}) + .map(&*reply_map) + }) as SenderFuture }) } } diff --git a/asynchronix/src/util/task_set.rs b/asynchronix/src/util/task_set.rs index e1145e8..3ad9b4f 100644 --- a/asynchronix/src/util/task_set.rs +++ b/asynchronix/src/util/task_set.rs @@ -196,60 +196,12 @@ impl TaskSet { next: AtomicU32::new(SLEEPING), })); } - - return; } - // Try to shrink the vector of tasks. - // - // The main issue when shrinking the vector of tasks is that stale - // wakers may still be around and may at any moment be scheduled and - // insert their task index in the list of scheduled tasks. If it cannot - // be guaranteed that this will not happen, then the vector of tasks - // cannot be shrunk further, otherwise the iterator for scheduled tasks - // will later fail when reaching a task with an invalid index. - // - // We follow a 2-steps strategy: - // - // 1) remove all tasks currently in the list of scheduled task and set - // them to `SLEEPING` state in case some of them might have an index - // that will be invalidated when the vector of tasks is shrunk; - // - // 2) attempt to iteratively shrink the vector of tasks by removing - // tasks starting from the back of the vector: - // - If a task is in the `SLEEPING` state, then its `next` pointer is - // changed to an arbitrary value other than`SLEEPING`, but the task - // is not inserted in the list of scheduled tasks; this way, the - // task will be effectively rendered inactive. The task can now be - // removed from the vector. - // - If a task is found in a non-`SLEEPING` state (meaning that there - // was a race and the task was scheduled after step 1) then abandon - // further shrinking and leave this task in the vector; the iterator - // for scheduled tasks mitigates such situation by only yielding - // task indices that are within the expected range. - - // Step 1: unscheduled tasks that may be scheduled. - self.discard_scheduled(); - - // Step 2: attempt to remove tasks starting at the back of the vector. - while self.tasks.len() > len { - // There is at least one task since `len()` was non-zero. - let task = self.tasks.last().unwrap(); - - // Ordering: Relaxed ordering is sufficient since the task is - // effectively discarded. - if task - .next - .compare_exchange(SLEEPING, EMPTY, Ordering::Relaxed, Ordering::Relaxed) - .is_err() - { - // The task could not be removed for now so the set of tasks cannot - // be shrunk further. - break; - } - - self.tasks.pop(); - } + // The vector of tasks is never shrunk as this is a fairly costly + // operation and is not strictly necessary. Typically, inactive tasks + // left at the back of the vector are never waken anyway, and if they + // are, they are filtered out by the task iterator. } /// Returns `true` if one or more sub-tasks are currently scheduled. @@ -271,10 +223,6 @@ impl TaskSet { waker_ref(&self.tasks[idx]) } - - pub(crate) fn len(&self) -> usize { - self.task_count - } } /// Internals shared between a `TaskSet` and its associated `Task`s.