Skip to content

Commit

Permalink
Merge pull request #32 from asynchronics/feature/connect_map
Browse files Browse the repository at this point in the history
Add map/filter_map variants of the `connect` method
  • Loading branch information
jauhien authored Aug 7, 2024
2 parents d9099c4 + c4d93f5 commit 252ada4
Show file tree
Hide file tree
Showing 7 changed files with 1,319 additions and 261 deletions.
166 changes: 164 additions & 2 deletions asynchronix/src/ports/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ use crate::simulation::Address;
use crate::util::cached_rw_lock::CachedRwLock;

use broadcaster::{EventBroadcaster, QueryBroadcaster};
use sender::FilterMapReplierSender;

use self::sender::{EventSinkSender, InputSender, ReplierSender};
use self::sender::{
EventSinkSender, FilterMapEventSinkSender, FilterMapInputSender, InputSender,
MapEventSinkSender, MapInputSender, MapReplierSender, ReplierSender,
};

/// An output port.
///
Expand Down Expand Up @@ -57,6 +61,94 @@ impl<T: Clone + Send + 'static> Output<T> {
self.broadcaster.write().unwrap().add(sender)
}

/// Adds an auto-converting connection to an input port of the model
/// specified by the address.
///
/// Events are mapped to another type using the closure provided in
/// argument.
///
/// The input port must be an asynchronous method of a model of type `M`
/// taking as argument a value of the type returned by the mapping
/// closure plus, optionally, a context reference.
pub fn map_connect<M, C, F, U, S>(
&mut self,
map: C,
input: F,
address: impl Into<Address<M>>,
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(MapInputSender::new(map, input, address.into().0));
self.broadcaster.write().unwrap().add(sender)
}

/// Adds an auto-converting connection to an event sink such as an
/// [`EventSlot`](crate::ports::EventSlot) or
/// [`EventBuffer`](crate::ports::EventBuffer).
///
/// Events are mapped to another type using the closure provided in
/// argument.
pub fn map_connect_sink<C, U, S>(&mut self, map: C, sink: &S) -> LineId
where
C: Fn(T) -> U + Send + Sync + 'static,
U: Send + 'static,
S: EventSink<U>,
{
let sender = Box::new(MapEventSinkSender::new(map, sink.writer()));
self.broadcaster.write().unwrap().add(sender)
}

/// Adds an auto-converting, filtered connection to an input port of the
/// model specified by the address.
///
/// Events are mapped to another type using the closure provided in
/// argument, or ignored if the closure returns `None`.
///
/// The input port must be an asynchronous method of a model of type `M`
/// taking as argument a value of the type returned by the mapping
/// closure plus, optionally, a context reference.
pub fn filter_map_connect<M, C, F, U, S>(
&mut self,
filter_map: C,
input: F,
address: impl Into<Address<M>>,
) -> LineId
where
M: Model,
C: Fn(T) -> Option<U> + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(FilterMapInputSender::new(
filter_map,
input,
address.into().0,
));
self.broadcaster.write().unwrap().add(sender)
}

/// Adds an auto-converting connection to an event sink such as an
/// [`EventSlot`](crate::ports::EventSlot) or
/// [`EventBuffer`](crate::ports::EventBuffer).
///
/// Events are mapped to another type using the closure provided in
/// argument.
pub fn filter_map_connect_sink<C, U, S>(&mut self, filter_map: C, sink: &S) -> LineId
where
C: Fn(T) -> Option<U> + Send + Sync + 'static,
U: Send + 'static,
S: EventSink<U>,
{
let sender = Box::new(FilterMapEventSinkSender::new(filter_map, sink.writer()));
self.broadcaster.write().unwrap().add(sender)
}

/// Removes the connection specified by the `LineId` parameter.
///
/// It is a logic error to specify a line identifier from another
Expand Down Expand Up @@ -125,7 +217,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
///
/// The replier port must be an asynchronous method of a model of type `M`
/// returning a value of type `R` and taking as argument a value of type `T`
/// plus, optionally, a scheduler reference.
/// plus, optionally, a context reference.
pub fn connect<M, F, S>(&mut self, replier: F, address: impl Into<Address<M>>) -> LineId
where
M: Model,
Expand All @@ -136,6 +228,76 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
self.broadcaster.write().unwrap().add(sender)
}

/// Adds an auto-converting connection to a replier port of the model
/// specified by the address.
///
/// Queries and replies are mapped to other types using the closures
/// provided in argument.
///
/// The replier port must be an asynchronous method of a model of type `M`
/// returning a value of the type returned by the reply mapping closure and
/// taking as argument a value of the type returned by the query mapping
/// closure plus, optionally, a context reference.
pub fn map_connect<M, C, D, F, U, Q, S>(
&mut self,
query_map: C,
reply_map: D,
replier: F,
address: impl Into<Address<M>>,
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Q: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(MapReplierSender::new(
query_map,
reply_map,
replier,
address.into().0,
));
self.broadcaster.write().unwrap().add(sender)
}

/// Adds an auto-converting, filtered connection to a replier port of the
/// model specified by the address.
///
/// Queries and replies are mapped to other types using the closures
/// provided in argument, or ignored if the query closure returns `None`.
///
/// The replier port must be an asynchronous method of a model of type `M`
/// returning a value of the type returned by the reply mapping closure and
/// taking as argument a value of the type returned by the query mapping
/// closure plus, optionally, a context reference.
pub fn filter_map_connect<M, C, D, F, U, Q, S>(
&mut self,
query_filer_map: C,
reply_map: D,
replier: F,
address: impl Into<Address<M>>,
) -> LineId
where
M: Model,
C: Fn(T) -> Option<U> + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Q: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(FilterMapReplierSender::new(
query_filer_map,
reply_map,
replier,
address.into().0,
));
self.broadcaster.write().unwrap().add(sender)
}

/// Removes the connection specified by the `LineId` parameter.
///
/// It is a logic error to specify a line identifier from another
Expand Down
Loading

0 comments on commit 252ada4

Please sign in to comment.