Skip to content

Commit

Permalink
Merge pull request #71 from asynchronics/combinator
Browse files Browse the repository at this point in the history
Add replier adaptor
  • Loading branch information
sbarral authored Dec 11, 2024
2 parents 31be2b0 + d63bcdf commit 4623765
Show file tree
Hide file tree
Showing 9 changed files with 335 additions and 13 deletions.
276 changes: 276 additions & 0 deletions nexosim-util/examples/replier_adaptor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
//! Example: RIU acquiring data from sensor.
//!
//! This example demonstrates in particular:
//!
//! * the use of replier port adaptor,
//! * periodic model self-scheduling.
//!
//! ```text
//! ┌────────┐ ┌─────────┐ Sensor TC ┌─────┐
//! Set temperature ●────►│ │ ◄Sensor TC │ │◄────────────┤ │
//! │ Sensor │◄►────────────►◄│ Adaptor │ Sensor TM │ RIU ├────► RIU TM
//! Set illuminance ●────►│ │ Sensor TM► │ ├────────────►│ │
//! └────────┘ └─────────┘ └─────┘
//! ```
use std::fmt::Debug;
use std::time::Duration;

use nexosim::model::{Context, InitializedModel, Model};
use nexosim::ports::{EventBuffer, Output};
use nexosim::simulation::{Mailbox, SimInit, SimulationError};
use nexosim::time::MonotonicTime;
use nexosim_util::combinators::ReplierAdaptor;

const DELTA: Duration = Duration::from_millis(2);
const PERIOD: Duration = Duration::from_secs(1);

/// Sensor TC.
#[derive(Clone, Debug, PartialEq)]
pub enum SensorTc {
GetTemp,
GetIllum,
}

/// Sensor TM.
#[derive(Clone, Debug, PartialEq)]
pub enum SensorTm {
Temp(f64),
Illum(f64),
}

/// Sensor model.
pub struct Sensor {
/// Temperature [deg C] -- internal state.
temp: f64,

/// Illuminance [lx] -- internal state.
illum: f64,
}

impl Sensor {
/// Creates a sensor model.
pub fn new() -> Self {
Self {
temp: 0.0,
illum: 0.0,
}
}

/// Sets sensor temperature [deg C].
pub async fn set_temp(&mut self, temp: f64) {
self.temp = temp;
}

/// Sets sensor illuminance [lx].
pub async fn set_illum(&mut self, illum: f64) {
self.illum = illum;
}

/// Processes sensor TC -- input port.
pub async fn process_tc(&mut self, tc: SensorTc) -> SensorTm {
match tc {
SensorTc::GetTemp => SensorTm::Temp(self.temp),
SensorTc::GetIllum => SensorTm::Illum(self.illum),
}
}
}

impl Model for Sensor {}

/// Internal TM field.
#[derive(Clone, Debug, PartialEq)]
pub struct TmField<T>
where
T: Clone + Debug + PartialEq,
{
/// TM value.
pub value: T,

/// TM readiness flag.
pub ready: bool,
}

/// RIU TM.
#[derive(Clone, Debug, PartialEq)]
pub struct RiuTm {
/// Temperature [deg C].
temp: f64,

/// Iluminance [lx].
illum: f64,
}

/// RIU model.
pub struct Riu {
/// Sensor TC -- output port.
pub sensor_tc: Output<SensorTc>,

/// RIU TM -- output port.
pub tm: Output<RiuTm>,

/// Temperature [deg C] -- internal state.
temp: TmField<f64>,

/// Illuminance [lx] -- internal state.
illum: TmField<f64>,
}

impl Riu {
/// Creates an RIU model.
pub fn new() -> Self {
Self {
sensor_tc: Output::new(),
tm: Output::new(),
temp: TmField {
value: 0.0,
ready: true,
},
illum: TmField {
value: 0.0,
ready: true,
},
}
}

/// Processes sensor TM -- input port.
pub async fn sensor_tm(&mut self, tm: SensorTm) {
match tm {
SensorTm::Temp(temp) => {
self.temp = TmField {
value: temp,
ready: true,
}
}
SensorTm::Illum(illum) => {
self.illum = TmField {
value: illum,
ready: true,
}
}
}

if self.temp.ready && self.illum.ready {
self.report().await
}
}

/// Starts sensor TM acquisition -- periodic activity.
async fn acquire(&mut self) {
self.temp.ready = false;
self.illum.ready = false;
self.sensor_tc.send(SensorTc::GetTemp).await;
self.sensor_tc.send(SensorTc::GetIllum).await
}

/// Reports RIU TM.
async fn report(&mut self) {
self.tm
.send(RiuTm {
temp: self.temp.value,
illum: self.illum.value,
})
.await
}
}

impl Model for Riu {
/// Initializes model.
async fn init(self, cx: &mut Context<Self>) -> InitializedModel<Self> {
// Schedule periodic acquisition.
cx.schedule_periodic_event(DELTA, PERIOD, Riu::acquire, ())
.unwrap();

self.into()
}
}

fn main() -> Result<(), SimulationError> {
// ---------------
// Bench assembly.
// ---------------

// Models.
let sensor = Sensor::new();
let mut riu = Riu::new();
let mut sensor_adaptor = ReplierAdaptor::new();

// Mailboxes.
let sensor_mbox = Mailbox::new();
let riu_mbox = Mailbox::new();
let sensor_adaptor_mbox = Mailbox::new();

// Connections.
riu.sensor_tc
.connect(ReplierAdaptor::input, &sensor_adaptor_mbox);
sensor_adaptor.output.connect(Riu::sensor_tm, &riu_mbox);
sensor_adaptor
.requestor
.connect(Sensor::process_tc, &sensor_mbox);

// Model handles for simulation.
let mut tm = EventBuffer::new();
let sensor_addr = sensor_mbox.address();

riu.tm.connect_sink(&tm);

// Start time (arbitrary since models do not depend on absolute time).
let t0 = MonotonicTime::EPOCH;

// Assembly and initialization.
let mut simu = SimInit::new()
.add_model(sensor, sensor_mbox, "sensor")
.add_model(riu, riu_mbox, "riu")
.add_model(sensor_adaptor, sensor_adaptor_mbox, "sensor_adaptor")
.init(t0)?
.0;

// ----------
// Simulation.
// ----------

// Initial state: no RIU TM.
assert_eq!(tm.next(), None);

simu.step_until(Duration::from_millis(1200))?;

// RIU TM generated.
assert_eq!(
tm.next(),
Some(RiuTm {
temp: 0.0,
illum: 0.0
})
);

// Consume all RIU TM generated so far.
while tm.next().is_some() {}

// Set temperature and wait for RIU TM.
simu.process_event(Sensor::set_temp, 2.0, &sensor_addr)?;

simu.step_until(Duration::from_millis(1000))?;

assert_eq!(
tm.next(),
Some(RiuTm {
temp: 2.0,
illum: 0.0
})
);

// Set illuminance and wait for RIU TM.
simu.process_event(Sensor::set_illum, 3.0, &sensor_addr)?;

simu.step_until(Duration::from_millis(1000))?;

assert_eq!(
tm.next(),
Some(RiuTm {
temp: 2.0,
illum: 3.0
})
);

Ok(())
}
47 changes: 47 additions & 0 deletions nexosim-util/src/combinators.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//! Connector combinators.
//!
//! This module contains combinator types useful for simulation bench assembly.
//!
use nexosim::model::Model;
use nexosim::ports::{Output, Requestor};

/// A replier adaptor.
///
/// `ReplierAdaptor` generic model is aimed to connect pair of input/output
/// ports to a replier ports.
///
/// Model input is propagated to all the connected replier ports and their
/// answers are written to the model output.
pub struct ReplierAdaptor<T: Clone + Send + 'static, R: Clone + Send + 'static> {
/// Requestor port to be connected to replier port.
pub requestor: Requestor<T, R>,

/// Output port to be connected to input port.
pub output: Output<R>,
}

impl<T: Clone + Send + 'static, R: Clone + Send + 'static> ReplierAdaptor<T, R> {
/// Creates a `ReplierAdaptor` model.
pub fn new() -> Self {
Self::default()
}

/// Input port.
pub async fn input(&mut self, data: T) {
for res in self.requestor.send(data).await {
self.output.send(res).await;
}
}
}

impl<T: Clone + Send + 'static, R: Clone + Send + 'static> Model for ReplierAdaptor<T, R> {}

impl<T: Clone + Send + 'static, R: Clone + Send + 'static> Default for ReplierAdaptor<T, R> {
fn default() -> Self {
Self {
requestor: Requestor::new(),
output: Output::new(),
}
}
}
1 change: 1 addition & 0 deletions nexosim-util/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod combinators;
pub mod observables;
8 changes: 4 additions & 4 deletions nexosim/src/channel/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,21 @@ pub(super) struct MessageBorrow<'a, T: ?Sized> {
stamp: usize,
}

impl<'a, T: ?Sized> Deref for MessageBorrow<'a, T> {
impl<T: ?Sized> Deref for MessageBorrow<'_, T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.msg
}
}

impl<'a, T: ?Sized> DerefMut for MessageBorrow<'a, T> {
impl<T: ?Sized> DerefMut for MessageBorrow<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.msg
}
}

impl<'a, T: ?Sized> Drop for MessageBorrow<'a, T> {
impl<T: ?Sized> Drop for MessageBorrow<'_, T> {
fn drop(&mut self) {
let slot = &self.queue.buffer[self.index];

Expand All @@ -67,7 +67,7 @@ impl<'a, T: ?Sized> Drop for MessageBorrow<'a, T> {
slot.stamp.store(self.stamp, Ordering::Release);
}
}
impl<'a, M> fmt::Debug for MessageBorrow<'a, M> {
impl<M> fmt::Debug for MessageBorrow<'_, M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MessageBorrow").finish_non_exhaustive()
}
Expand Down
2 changes: 1 addition & 1 deletion nexosim/src/executor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ where
///
/// An arbitrary tag can be attached to the task, a clone of which will be
/// passed to the scheduling function each time it is called.
///
/// The returned `Runnable` must be scheduled by the user.
pub(crate) fn spawn<F, S, T>(
future: F,
Expand Down
1 change: 0 additions & 1 deletion nexosim/src/model/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ use super::{Model, ProtoModel};
/// }
/// impl Model for DelayedGreeter {}
/// ```
// The self-scheduling caveat seems related to this issue:
// https://github.com/rust-lang/rust/issues/78649
pub struct Context<M: Model> {
Expand Down
4 changes: 2 additions & 2 deletions nexosim/src/ports/output/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ impl<'a, R> BroadcastFuture<'a, R> {
}
}

impl<'a, R> Drop for BroadcastFuture<'a, R> {
impl<R> Drop for BroadcastFuture<'_, R> {
fn drop(&mut self) {
// Safety: this is safe since `self.futures` is never accessed after it
// is moved out.
Expand All @@ -361,7 +361,7 @@ impl<'a, R> Drop for BroadcastFuture<'a, R> {
}
}

impl<'a, R> Future for BroadcastFuture<'a, R> {
impl<R> Future for BroadcastFuture<'_, R> {
type Output = Result<(), SendError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down
Loading

0 comments on commit 4623765

Please sign in to comment.