Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add map/filter_map variants of the connect method #32

Merged
merged 8 commits into from
Aug 7, 2024
166 changes: 164 additions & 2 deletions asynchronix/src/ports/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ use crate::simulation::Address;
use crate::util::cached_rw_lock::CachedRwLock;

use broadcaster::{EventBroadcaster, QueryBroadcaster};
use sender::FilterMapReplierSender;

use self::sender::{EventSinkSender, InputSender, ReplierSender};
use self::sender::{
EventSinkSender, FilterMapEventSinkSender, FilterMapInputSender, InputSender,
MapEventSinkSender, MapInputSender, MapReplierSender, ReplierSender,
};

/// An output port.
///
Expand Down Expand Up @@ -57,6 +61,94 @@ impl<T: Clone + Send + 'static> Output<T> {
self.broadcaster.write().unwrap().add(sender)
}

/// Adds an auto-converting connection to an input port of the model
/// specified by the address.
///
/// Events are mapped to another type using the closure provided in
/// argument.
///
/// The input port must be an asynchronous method of a model of type `M`
/// taking as argument a value of the type returned by the mapping
/// closure plus, optionally, a context reference.
pub fn map_connect<M, C, F, U, S>(
&mut self,
map: C,
input: F,
address: impl Into<Address<M>>,
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(MapInputSender::new(map, input, address.into().0));
self.broadcaster.write().unwrap().add(sender)
}

/// Adds an auto-converting connection to an event sink such as an
/// [`EventSlot`](crate::ports::EventSlot) or
/// [`EventBuffer`](crate::ports::EventBuffer).
///
/// Events are mapped to another type using the closure provided in
/// argument.
pub fn map_connect_sink<C, U, S>(&mut self, map: C, sink: &S) -> LineId
where
C: Fn(T) -> U + Send + Sync + 'static,
U: Send + 'static,
S: EventSink<U>,
{
let sender = Box::new(MapEventSinkSender::new(map, sink.writer()));
self.broadcaster.write().unwrap().add(sender)
}

/// Adds an auto-converting, filtered connection to an input port of the
/// model specified by the address.
///
/// Events are mapped to another type using the closure provided in
/// argument, or ignored if the closure returns `None`.
///
/// The input port must be an asynchronous method of a model of type `M`
/// taking as argument a value of the type returned by the mapping
/// closure plus, optionally, a context reference.
pub fn filter_map_connect<M, C, F, U, S>(
&mut self,
filter_map: C,
input: F,
address: impl Into<Address<M>>,
) -> LineId
where
M: Model,
C: Fn(T) -> Option<U> + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(FilterMapInputSender::new(
filter_map,
input,
address.into().0,
));
self.broadcaster.write().unwrap().add(sender)
}

/// Adds an auto-converting connection to an event sink such as an
/// [`EventSlot`](crate::ports::EventSlot) or
/// [`EventBuffer`](crate::ports::EventBuffer).
///
/// Events are mapped to another type using the closure provided in
/// argument.
pub fn filter_map_connect_sink<C, U, S>(&mut self, filter_map: C, sink: &S) -> LineId
where
C: Fn(T) -> Option<U> + Send + Sync + 'static,
U: Send + 'static,
S: EventSink<U>,
{
let sender = Box::new(FilterMapEventSinkSender::new(filter_map, sink.writer()));
self.broadcaster.write().unwrap().add(sender)
}

/// Removes the connection specified by the `LineId` parameter.
///
/// It is a logic error to specify a line identifier from another
Expand Down Expand Up @@ -125,7 +217,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
///
/// The replier port must be an asynchronous method of a model of type `M`
/// returning a value of type `R` and taking as argument a value of type `T`
/// plus, optionally, a scheduler reference.
/// plus, optionally, a context reference.
pub fn connect<M, F, S>(&mut self, replier: F, address: impl Into<Address<M>>) -> LineId
where
M: Model,
Expand All @@ -136,6 +228,76 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
self.broadcaster.write().unwrap().add(sender)
}

/// Adds an auto-converting connection to a replier port of the model
/// specified by the address.
///
/// Queries and replies are mapped to other types using the closures
/// provided in argument.
///
/// The replier port must be an asynchronous method of a model of type `M`
/// returning a value of the type returned by the reply mapping closure and
/// taking as argument a value of the type returned by the query mapping
/// closure plus, optionally, a context reference.
pub fn map_connect<M, C, D, F, U, Q, S>(
&mut self,
query_map: C,
reply_map: D,
replier: F,
address: impl Into<Address<M>>,
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Q: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(MapReplierSender::new(
query_map,
reply_map,
replier,
address.into().0,
));
self.broadcaster.write().unwrap().add(sender)
}

/// Adds an auto-converting, filtered connection to a replier port of the
/// model specified by the address.
///
/// Queries and replies are mapped to other types using the closures
/// provided in argument, or ignored if the query closure returns `None`.
///
/// The replier port must be an asynchronous method of a model of type `M`
/// returning a value of the type returned by the reply mapping closure and
/// taking as argument a value of the type returned by the query mapping
/// closure plus, optionally, a context reference.
pub fn filter_map_connect<M, C, D, F, U, Q, S>(
&mut self,
query_filer_map: C,
reply_map: D,
replier: F,
address: impl Into<Address<M>>,
) -> LineId
where
M: Model,
C: Fn(T) -> Option<U> + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Q: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(FilterMapReplierSender::new(
query_filer_map,
reply_map,
replier,
address.into().0,
));
self.broadcaster.write().unwrap().add(sender)
}

/// Removes the connection specified by the `LineId` parameter.
///
/// It is a logic error to specify a line identifier from another
Expand Down
Loading
Loading