Skip to content

Commit

Permalink
Merge pull request #35 from asynchronics/feature/connect_map
Browse files Browse the repository at this point in the history
Take message by ref in (filter)map_connect closures
  • Loading branch information
jauhien authored Aug 16, 2024
2 parents 1f3e04e + 1b1db5e commit e75edcb
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 65 deletions.
12 changes: 6 additions & 6 deletions asynchronix/src/ports/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl<T: Clone + Send + 'static> Output<T> {
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + Sync + 'static,
C: Fn(&T) -> U + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
Expand All @@ -95,7 +95,7 @@ impl<T: Clone + Send + 'static> Output<T> {
/// argument.
pub fn map_connect_sink<C, U, S>(&mut self, map: C, sink: &S) -> LineId
where
C: Fn(T) -> U + Send + Sync + 'static,
C: Fn(&T) -> U + Send + Sync + 'static,
U: Send + 'static,
S: EventSink<U>,
{
Expand All @@ -120,7 +120,7 @@ impl<T: Clone + Send + 'static> Output<T> {
) -> LineId
where
M: Model,
C: Fn(T) -> Option<U> + Send + Sync + 'static,
C: Fn(&T) -> Option<U> + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
Expand All @@ -141,7 +141,7 @@ impl<T: Clone + Send + 'static> Output<T> {
/// argument.
pub fn filter_map_connect_sink<C, U, S>(&mut self, filter_map: C, sink: &S) -> LineId
where
C: Fn(T) -> Option<U> + Send + Sync + 'static,
C: Fn(&T) -> Option<U> + Send + Sync + 'static,
U: Send + 'static,
S: EventSink<U>,
{
Expand Down Expand Up @@ -247,7 +247,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + Sync + 'static,
C: Fn(&T) -> U + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Expand Down Expand Up @@ -282,7 +282,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
) -> LineId
where
M: Model,
C: Fn(T) -> Option<U> + Send + Sync + 'static,
C: Fn(&T) -> Option<U> + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Expand Down
14 changes: 7 additions & 7 deletions asynchronix/src/ports/output/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,13 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
while let Some(sender) = iter.next() {
// Move the argument rather than clone it for the last future.
if iter.len() == 0 {
if let Some(fut) = sender.1.send(arg) {
if let Some(fut) = sender.1.send_owned(arg) {
futures.push(fut);
}
break;
}

if let Some(fut) = sender.1.send(arg.clone()) {
if let Some(fut) = sender.1.send(&arg) {
futures.push(fut);
}
}
Expand Down Expand Up @@ -190,7 +190,7 @@ impl<T: Clone> EventBroadcaster<T> {
[] => Ok(()),

// One sender at most.
[sender] => match sender.1.send(arg) {
[sender] => match sender.1.send_owned(arg) {
None => Ok(()),
Some(fut) => fut.await.map_err(|_| BroadcastError {}),
},
Expand Down Expand Up @@ -267,7 +267,7 @@ impl<T: Clone, R> QueryBroadcaster<T, R> {

// One sender at most.
[sender] => {
if let Some(fut) = sender.1.send(arg) {
if let Some(fut) = sender.1.send_owned(arg) {
let output = fut.await.map_err(|_| BroadcastError {})?;
self.inner.shared.outputs[0] = Some(output);

Expand Down Expand Up @@ -667,7 +667,7 @@ mod tests {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let id_filter_sender = Box::new(FilterMapInputSender::new(
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
move |x: &usize| (*x == id || *x == BROADCAST_ALL).then_some(*x),
SumModel::increment,
address,
));
Expand Down Expand Up @@ -802,7 +802,7 @@ mod tests {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(FilterMapReplierSender::new(
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
move |x: &usize| (*x == id || *x == BROADCAST_ALL).then_some(*x),
|x| 3 * x,
DoubleModel::double,
address,
Expand Down Expand Up @@ -917,7 +917,7 @@ mod tests {
fut_storage: Option<RecycleBox<()>>,
}
impl<R: Send> Sender<(), R> for TestEvent<R> {
fn send(&mut self, _arg: ()) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
fn send(&mut self, _arg: &()) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
let fut_storage = &mut self.fut_storage;
let receiver = &mut self.receiver;

Expand Down
69 changes: 43 additions & 26 deletions asynchronix/src/ports/output/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@ use crate::ports::{EventSinkWriter, InputFn, ReplierFn};
/// An event or query sender abstracting over the target model and input or
/// replier method.
pub(super) trait Sender<T, R>: DynClone + Send {
/// Asynchronously send the event or request.
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>>;
/// Asynchronously sends a message using a reference to the message.
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<R, SendError>>>;

/// Asynchronously sends an owned message.
fn send_owned(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
self.send(&arg)
}
}

dyn_clone::clone_trait_object!(<T, R> Sender<T, R>);
Expand Down Expand Up @@ -54,10 +59,14 @@ impl<M, F, T, S> Sender<T, ()> for InputSender<M, F, T, S>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + 'static,
T: Clone + Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
self.send_owned(arg.clone())
}

fn send_owned(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
let func = self.func.clone();

let fut = self.sender.send(move |model, scheduler, recycle_box| {
Expand Down Expand Up @@ -122,13 +131,13 @@ where
impl<M, C, F, T, U, S> Sender<T, ()> for MapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(T) -> U + Send + Sync,
C: Fn(&T) -> U + Send + Sync,
F: for<'a> InputFn<'a, M, U, S> + Clone,
T: Send + 'static,
U: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
let func = self.func.clone();
let arg = (self.map)(arg);

Expand Down Expand Up @@ -196,13 +205,13 @@ where
impl<M, C, F, T, U, S> Sender<T, ()> for FilterMapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(T) -> Option<U> + Send + Sync,
C: Fn(&T) -> Option<U> + Send + Sync,
F: for<'a> InputFn<'a, M, U, S> + Clone,
T: Send + 'static,
U: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
(self.filter_map)(arg).map(|arg| {
let func = self.func.clone();

Expand Down Expand Up @@ -256,10 +265,14 @@ impl<T, W> EventSinkSender<T, W> {

impl<T, W> Sender<T, ()> for EventSinkSender<T, W>
where
T: Send + 'static,
T: Clone + Send + 'static,
W: EventSinkWriter<T>,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
self.send_owned(arg.clone())
}

fn send_owned(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
let writer = &mut self.writer;

Some(RecycledFuture::new(&mut self.fut_storage, async move {
Expand All @@ -283,7 +296,7 @@ impl<T, W: Clone> Clone for EventSinkSender<T, W> {
/// An object that can send mapped events to an event sink.
pub(super) struct MapEventSinkSender<T, U, W, C>
where
C: Fn(T) -> U,
C: Fn(&T) -> U,
{
writer: W,
map: Arc<C>,
Expand All @@ -293,7 +306,7 @@ where

impl<T, U, W, C> MapEventSinkSender<T, U, W, C>
where
C: Fn(T) -> U,
C: Fn(&T) -> U,
{
pub(super) fn new(map: C, writer: W) -> Self {
Self {
Expand All @@ -309,10 +322,10 @@ impl<T, U, W, C> Sender<T, ()> for MapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> U + Send + Sync,
C: Fn(&T) -> U + Send + Sync,
W: EventSinkWriter<U>,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
let writer = &mut self.writer;
let arg = (self.map)(arg);

Expand All @@ -326,7 +339,7 @@ where

impl<T, U, W, C> Clone for MapEventSinkSender<T, U, W, C>
where
C: Fn(T) -> U,
C: Fn(&T) -> U,
W: Clone,
{
fn clone(&self) -> Self {
Expand All @@ -342,7 +355,7 @@ where
/// An object that can filter and send mapped events to an event sink.
pub(super) struct FilterMapEventSinkSender<T, U, W, C>
where
C: Fn(T) -> Option<U>,
C: Fn(&T) -> Option<U>,
{
writer: W,
filter_map: Arc<C>,
Expand All @@ -352,7 +365,7 @@ where

impl<T, U, W, C> FilterMapEventSinkSender<T, U, W, C>
where
C: Fn(T) -> Option<U>,
C: Fn(&T) -> Option<U>,
{
pub(super) fn new(filter_map: C, writer: W) -> Self {
Self {
Expand All @@ -368,10 +381,10 @@ impl<T, U, W, C> Sender<T, ()> for FilterMapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> Option<U> + Send + Sync,
C: Fn(&T) -> Option<U> + Send + Sync,
W: EventSinkWriter<U>,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
let writer = &mut self.writer;

(self.filter_map)(arg).map(|arg| {
Expand All @@ -386,7 +399,7 @@ where

impl<T, U, W, C> Clone for FilterMapEventSinkSender<T, U, W, C>
where
C: Fn(T) -> Option<U>,
C: Fn(&T) -> Option<U>,
W: Clone,
{
fn clone(&self) -> Self {
Expand Down Expand Up @@ -432,11 +445,15 @@ impl<M, F, T, R, S> Sender<T, R> for ReplierSender<M, F, T, R, S>
where
M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
T: Send + 'static,
T: Clone + Send + 'static,
R: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
self.send_owned(arg.clone())
}

fn send_owned(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
let func = self.func.clone();
let sender = &mut self.sender;
let reply_receiver = &mut self.receiver;
Expand Down Expand Up @@ -525,7 +542,7 @@ where
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for MapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
C: Fn(T) -> U + Send + Sync,
C: Fn(&T) -> U + Send + Sync,
D: Fn(Q) -> R + Send + Sync,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
T: Send + 'static,
Expand All @@ -534,7 +551,7 @@ where
Q: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
let func = self.func.clone();
let arg = (self.query_map)(arg);
let sender = &mut self.sender;
Expand Down Expand Up @@ -638,7 +655,7 @@ where
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
C: Fn(T) -> Option<U> + Send + Sync,
C: Fn(&T) -> Option<U> + Send + Sync,
D: Fn(Q) -> R + Send + Sync,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
T: Send + 'static,
Expand All @@ -647,7 +664,7 @@ where
Q: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
(self.query_filter_map)(arg).map(|arg| {
let func = self.func.clone();
let sender = &mut self.sender;
Expand Down
8 changes: 4 additions & 4 deletions asynchronix/src/ports/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl<T: Clone + Send + 'static> EventSource<T> {
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + 'static,
C: for<'a> Fn(&'a T) -> U + Send + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
Expand All @@ -96,7 +96,7 @@ impl<T: Clone + Send + 'static> EventSource<T> {
) -> LineId
where
M: Model,
C: Fn(T) -> Option<U> + Send + 'static,
C: for<'a> Fn(&'a T) -> Option<U> + Send + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
Expand Down Expand Up @@ -277,7 +277,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + 'static,
C: for<'a> Fn(&'a T) -> U + Send + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Expand Down Expand Up @@ -312,7 +312,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
) -> LineId
where
M: Model,
C: Fn(T) -> Option<U> + Send + 'static,
C: for<'a> Fn(&'a T) -> Option<U> + Send + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Expand Down
Loading

0 comments on commit e75edcb

Please sign in to comment.