Skip to content

Commit

Permalink
Take message ref in (filter)map_connect closures
Browse files Browse the repository at this point in the history
This avoids preemptive cloning when the closures don't consume the
message, which is common when the filtering closure returns `None`.
  • Loading branch information
sbarral committed Aug 16, 2024
1 parent 1f3e04e commit 1b1db5e
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 65 deletions.
12 changes: 6 additions & 6 deletions asynchronix/src/ports/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl<T: Clone + Send + 'static> Output<T> {
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + Sync + 'static,
C: Fn(&T) -> U + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
Expand All @@ -95,7 +95,7 @@ impl<T: Clone + Send + 'static> Output<T> {
/// argument.
pub fn map_connect_sink<C, U, S>(&mut self, map: C, sink: &S) -> LineId
where
C: Fn(T) -> U + Send + Sync + 'static,
C: Fn(&T) -> U + Send + Sync + 'static,
U: Send + 'static,
S: EventSink<U>,
{
Expand All @@ -120,7 +120,7 @@ impl<T: Clone + Send + 'static> Output<T> {
) -> LineId
where
M: Model,
C: Fn(T) -> Option<U> + Send + Sync + 'static,
C: Fn(&T) -> Option<U> + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
Expand All @@ -141,7 +141,7 @@ impl<T: Clone + Send + 'static> Output<T> {
/// argument.
pub fn filter_map_connect_sink<C, U, S>(&mut self, filter_map: C, sink: &S) -> LineId
where
C: Fn(T) -> Option<U> + Send + Sync + 'static,
C: Fn(&T) -> Option<U> + Send + Sync + 'static,
U: Send + 'static,
S: EventSink<U>,
{
Expand Down Expand Up @@ -247,7 +247,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + Sync + 'static,
C: Fn(&T) -> U + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Expand Down Expand Up @@ -282,7 +282,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
) -> LineId
where
M: Model,
C: Fn(T) -> Option<U> + Send + Sync + 'static,
C: Fn(&T) -> Option<U> + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Expand Down
14 changes: 7 additions & 7 deletions asynchronix/src/ports/output/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,13 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
while let Some(sender) = iter.next() {
// Move the argument rather than clone it for the last future.
if iter.len() == 0 {
if let Some(fut) = sender.1.send(arg) {
if let Some(fut) = sender.1.send_owned(arg) {
futures.push(fut);
}
break;
}

if let Some(fut) = sender.1.send(arg.clone()) {
if let Some(fut) = sender.1.send(&arg) {
futures.push(fut);
}
}
Expand Down Expand Up @@ -190,7 +190,7 @@ impl<T: Clone> EventBroadcaster<T> {
[] => Ok(()),

// One sender at most.
[sender] => match sender.1.send(arg) {
[sender] => match sender.1.send_owned(arg) {
None => Ok(()),
Some(fut) => fut.await.map_err(|_| BroadcastError {}),
},
Expand Down Expand Up @@ -267,7 +267,7 @@ impl<T: Clone, R> QueryBroadcaster<T, R> {

// One sender at most.
[sender] => {
if let Some(fut) = sender.1.send(arg) {
if let Some(fut) = sender.1.send_owned(arg) {
let output = fut.await.map_err(|_| BroadcastError {})?;
self.inner.shared.outputs[0] = Some(output);

Expand Down Expand Up @@ -667,7 +667,7 @@ mod tests {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let id_filter_sender = Box::new(FilterMapInputSender::new(
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
move |x: &usize| (*x == id || *x == BROADCAST_ALL).then_some(*x),
SumModel::increment,
address,
));
Expand Down Expand Up @@ -802,7 +802,7 @@ mod tests {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(FilterMapReplierSender::new(
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
move |x: &usize| (*x == id || *x == BROADCAST_ALL).then_some(*x),
|x| 3 * x,
DoubleModel::double,
address,
Expand Down Expand Up @@ -917,7 +917,7 @@ mod tests {
fut_storage: Option<RecycleBox<()>>,
}
impl<R: Send> Sender<(), R> for TestEvent<R> {
fn send(&mut self, _arg: ()) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
fn send(&mut self, _arg: &()) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
let fut_storage = &mut self.fut_storage;
let receiver = &mut self.receiver;

Expand Down
69 changes: 43 additions & 26 deletions asynchronix/src/ports/output/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@ use crate::ports::{EventSinkWriter, InputFn, ReplierFn};
/// An event or query sender abstracting over the target model and input or
/// replier method.
pub(super) trait Sender<T, R>: DynClone + Send {
/// Asynchronously send the event or request.
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>>;
/// Asynchronously sends a message using a reference to the message.
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<R, SendError>>>;

/// Asynchronously sends an owned message.
fn send_owned(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
self.send(&arg)
}
}

dyn_clone::clone_trait_object!(<T, R> Sender<T, R>);
Expand Down Expand Up @@ -54,10 +59,14 @@ impl<M, F, T, S> Sender<T, ()> for InputSender<M, F, T, S>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + 'static,
T: Clone + Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
self.send_owned(arg.clone())
}

fn send_owned(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
let func = self.func.clone();

let fut = self.sender.send(move |model, scheduler, recycle_box| {
Expand Down Expand Up @@ -122,13 +131,13 @@ where
impl<M, C, F, T, U, S> Sender<T, ()> for MapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(T) -> U + Send + Sync,
C: Fn(&T) -> U + Send + Sync,
F: for<'a> InputFn<'a, M, U, S> + Clone,
T: Send + 'static,
U: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
let func = self.func.clone();
let arg = (self.map)(arg);

Expand Down Expand Up @@ -196,13 +205,13 @@ where
impl<M, C, F, T, U, S> Sender<T, ()> for FilterMapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(T) -> Option<U> + Send + Sync,
C: Fn(&T) -> Option<U> + Send + Sync,
F: for<'a> InputFn<'a, M, U, S> + Clone,
T: Send + 'static,
U: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
(self.filter_map)(arg).map(|arg| {
let func = self.func.clone();

Expand Down Expand Up @@ -256,10 +265,14 @@ impl<T, W> EventSinkSender<T, W> {

impl<T, W> Sender<T, ()> for EventSinkSender<T, W>
where
T: Send + 'static,
T: Clone + Send + 'static,
W: EventSinkWriter<T>,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
self.send_owned(arg.clone())
}

fn send_owned(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
let writer = &mut self.writer;

Some(RecycledFuture::new(&mut self.fut_storage, async move {
Expand All @@ -283,7 +296,7 @@ impl<T, W: Clone> Clone for EventSinkSender<T, W> {
/// An object that can send mapped events to an event sink.
pub(super) struct MapEventSinkSender<T, U, W, C>
where
C: Fn(T) -> U,
C: Fn(&T) -> U,
{
writer: W,
map: Arc<C>,
Expand All @@ -293,7 +306,7 @@ where

impl<T, U, W, C> MapEventSinkSender<T, U, W, C>
where
C: Fn(T) -> U,
C: Fn(&T) -> U,
{
pub(super) fn new(map: C, writer: W) -> Self {
Self {
Expand All @@ -309,10 +322,10 @@ impl<T, U, W, C> Sender<T, ()> for MapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> U + Send + Sync,
C: Fn(&T) -> U + Send + Sync,
W: EventSinkWriter<U>,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
let writer = &mut self.writer;
let arg = (self.map)(arg);

Expand All @@ -326,7 +339,7 @@ where

impl<T, U, W, C> Clone for MapEventSinkSender<T, U, W, C>
where
C: Fn(T) -> U,
C: Fn(&T) -> U,
W: Clone,
{
fn clone(&self) -> Self {
Expand All @@ -342,7 +355,7 @@ where
/// An object that can filter and send mapped events to an event sink.
pub(super) struct FilterMapEventSinkSender<T, U, W, C>
where
C: Fn(T) -> Option<U>,
C: Fn(&T) -> Option<U>,
{
writer: W,
filter_map: Arc<C>,
Expand All @@ -352,7 +365,7 @@ where

impl<T, U, W, C> FilterMapEventSinkSender<T, U, W, C>
where
C: Fn(T) -> Option<U>,
C: Fn(&T) -> Option<U>,
{
pub(super) fn new(filter_map: C, writer: W) -> Self {
Self {
Expand All @@ -368,10 +381,10 @@ impl<T, U, W, C> Sender<T, ()> for FilterMapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> Option<U> + Send + Sync,
C: Fn(&T) -> Option<U> + Send + Sync,
W: EventSinkWriter<U>,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
let writer = &mut self.writer;

(self.filter_map)(arg).map(|arg| {
Expand All @@ -386,7 +399,7 @@ where

impl<T, U, W, C> Clone for FilterMapEventSinkSender<T, U, W, C>
where
C: Fn(T) -> Option<U>,
C: Fn(&T) -> Option<U>,
W: Clone,
{
fn clone(&self) -> Self {
Expand Down Expand Up @@ -432,11 +445,15 @@ impl<M, F, T, R, S> Sender<T, R> for ReplierSender<M, F, T, R, S>
where
M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
T: Send + 'static,
T: Clone + Send + 'static,
R: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
self.send_owned(arg.clone())
}

fn send_owned(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
let func = self.func.clone();
let sender = &mut self.sender;
let reply_receiver = &mut self.receiver;
Expand Down Expand Up @@ -525,7 +542,7 @@ where
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for MapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
C: Fn(T) -> U + Send + Sync,
C: Fn(&T) -> U + Send + Sync,
D: Fn(Q) -> R + Send + Sync,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
T: Send + 'static,
Expand All @@ -534,7 +551,7 @@ where
Q: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
let func = self.func.clone();
let arg = (self.query_map)(arg);
let sender = &mut self.sender;
Expand Down Expand Up @@ -638,7 +655,7 @@ where
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
C: Fn(T) -> Option<U> + Send + Sync,
C: Fn(&T) -> Option<U> + Send + Sync,
D: Fn(Q) -> R + Send + Sync,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
T: Send + 'static,
Expand All @@ -647,7 +664,7 @@ where
Q: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
(self.query_filter_map)(arg).map(|arg| {
let func = self.func.clone();
let sender = &mut self.sender;
Expand Down
8 changes: 4 additions & 4 deletions asynchronix/src/ports/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl<T: Clone + Send + 'static> EventSource<T> {
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + 'static,
C: for<'a> Fn(&'a T) -> U + Send + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
Expand All @@ -96,7 +96,7 @@ impl<T: Clone + Send + 'static> EventSource<T> {
) -> LineId
where
M: Model,
C: Fn(T) -> Option<U> + Send + 'static,
C: for<'a> Fn(&'a T) -> Option<U> + Send + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
Expand Down Expand Up @@ -277,7 +277,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + 'static,
C: for<'a> Fn(&'a T) -> U + Send + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Expand Down Expand Up @@ -312,7 +312,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
) -> LineId
where
M: Model,
C: Fn(T) -> Option<U> + Send + 'static,
C: for<'a> Fn(&'a T) -> Option<U> + Send + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Expand Down
Loading

0 comments on commit 1b1db5e

Please sign in to comment.