Skip to content

Commit

Permalink
Change scheduler interface and add external inputs example.
Browse files Browse the repository at this point in the history
Relevant for issue #13.
  • Loading branch information
jauhien committed Aug 2, 2024
1 parent a6a2c85 commit 468fe87
Show file tree
Hide file tree
Showing 15 changed files with 996 additions and 766 deletions.
3 changes: 2 additions & 1 deletion asynchronix/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ waker-fn = "1.1"


[dev-dependencies]
atomic-wait = "1.1"
futures-util = "0.3"
futures-executor = "0.3"

mio = { version = "1.0", features = ["os-poll", "net"] }

[build-dependencies]
tonic-build = { version = "0.11", optional = true }
Expand Down
17 changes: 10 additions & 7 deletions asynchronix/examples/assembly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ fn main() {
.add_model(assembly, assembly_mbox, "assembly")
.init(t0);

let scheduler = simu.scheduler();

// ----------
// Simulation.
// ----------
Expand All @@ -120,13 +122,14 @@ fn main() {
assert!(position.next().is_none());

// Start the motor in 2s with a PPS of 10Hz.
simu.schedule_event(
Duration::from_secs(2),
MotorAssembly::pulse_rate,
10.0,
&assembly_addr,
)
.unwrap();
scheduler
.schedule_event(
Duration::from_secs(2),
MotorAssembly::pulse_rate,
10.0,
&assembly_addr,
)
.unwrap();

// Advance simulation time to two next events.
simu.step();
Expand Down
27 changes: 17 additions & 10 deletions asynchronix/examples/espresso_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl Controller {
// Schedule the `stop_brew()` method and turn on the pump.
self.stop_brew_key = Some(
context
.scheduler
.schedule_keyed_event(self.brew_time, Self::stop_brew, ())
.unwrap(),
);
Expand Down Expand Up @@ -206,7 +207,7 @@ impl Tank {
state.set_empty_key.cancel();

// Update the volume, saturating at 0 in case of rounding errors.
let time = context.time();
let time = context.scheduler.time();
let elapsed_time = time.duration_since(state.last_volume_update).as_secs_f64();
self.volume = (self.volume - state.flow_rate * elapsed_time).max(0.0);

Expand All @@ -231,7 +232,7 @@ impl Tank {
pub async fn set_flow_rate(&mut self, flow_rate: f64, context: &Context<Self>) {
assert!(flow_rate >= 0.0);

let time = context.time();
let time = context.scheduler.time();

// If the flow rate was non-zero up to now, update the volume.
if let Some(state) = self.dynamic_state.take() {
Expand Down Expand Up @@ -273,7 +274,10 @@ impl Tank {
let duration_until_empty = Duration::from_secs_f64(duration_until_empty);

// Schedule the next update.
match context.schedule_keyed_event(duration_until_empty, Self::set_empty, ()) {
match context
.scheduler
.schedule_keyed_event(duration_until_empty, Self::set_empty, ())
{
Ok(set_empty_key) => {
let state = TankDynamicState {
last_volume_update: time,
Expand Down Expand Up @@ -373,6 +377,8 @@ fn main() {
.add_model(tank, tank_mbox, "tank")
.init(t0);

let scheduler = simu.scheduler();

// ----------
// Simulation.
// ----------
Expand Down Expand Up @@ -426,13 +432,14 @@ fn main() {
assert_eq!(flow_rate.next(), Some(0.0));

// Interrupt the brew after 15s by pressing again the brew button.
simu.schedule_event(
Duration::from_secs(15),
Controller::brew_cmd,
(),
&controller_addr,
)
.unwrap();
scheduler
.schedule_event(
Duration::from_secs(15),
Controller::brew_cmd,
(),
&controller_addr,
)
.unwrap();
simu.process_event(Controller::brew_cmd, (), &controller_addr);
assert_eq!(flow_rate.next(), Some(pump_flow_rate));

Expand Down
251 changes: 251 additions & 0 deletions asynchronix/examples/external_input.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
//! Example: a model that reads data from the external world.
//!
//! This example demonstrates in particular:
//!
//! * external world inputs (useful in cosimulation),
//! * system clock,
//! * periodic scheduling.
//!
//! ```text
//! ┌────────────────────────────────┐
//! │ Simulation │
//! ┌────────────┐ ┌────────────┐ │ ┌──────────┐ │
//! │ │ UDP │ │ message │ message │ │ message │ ┌─────────────┐
//! │ UDP Client ├─────────▶│ UDP Server ├──────────▶├─────────▶│ Listener ├─────────▶├──▶│ EventBuffer │
//! │ │ message │ │ │ │ │ │ └─────────────┘
//! └────────────┘ └────────────┘ │ └──────────┘ │
//! └────────────────────────────────┘
//! ```
use std::io::ErrorKind;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::thread::{self, sleep, JoinHandle};
use std::time::Duration;

use atomic_wait::{wait, wake_one};

use mio::net::UdpSocket as MioUdpSocket;
use mio::{Events, Interest, Poll, Token};

use asynchronix::model::{Context, InitializedModel, Model, SetupContext};
use asynchronix::ports::{EventBuffer, Output};
use asynchronix::simulation::{Mailbox, SimInit};
use asynchronix::time::{AutoSystemClock, MonotonicTime};

const DELTA: Duration = Duration::from_millis(2);
const PERIOD: Duration = Duration::from_millis(20);
const N: u32 = 10;
const SENDER: &str = "127.0.0.1:8000";
const RECEIVER: &str = "127.0.0.1:9000";

/// Model that receives external input.
pub struct Listener {
/// Received message.
pub message: Output<String>,

/// Receiver of external messages.
rx: Receiver<String>,

/// External sender.
tx: Option<Sender<String>>,

/// Synchronization with client.
start: Arc<AtomicU32>,

/// Synchronization with simulation.
stop: Arc<AtomicBool>,

/// Handle to UDP Server.
external_handle: Option<JoinHandle<()>>,
}

impl Listener {
/// Creates a Listener.
pub fn new(start: Arc<AtomicU32>) -> Self {
start.store(0, Ordering::Relaxed);

let (tx, rx) = channel();
Self {
message: Output::default(),
rx,
tx: Some(tx),
start,
stop: Arc::new(AtomicBool::new(false)),
external_handle: None,
}
}

/// Periodically scheduled function that processes external events.
pub async fn process(&mut self) {
// rx can not outlive await.
loop {
if let Ok(message) = self.rx.try_recv() {
self.message.send(message).await;
} else {
break;
}
}
}

/// UDP server.
/// Code is based on the MIO UDP example.
fn listener(tx: Sender<String>, start: Arc<AtomicU32>, stop: Arc<AtomicBool>) {
const UDP_SOCKET: Token = Token(0);
let mut poll = Poll::new().unwrap();
let mut events = Events::with_capacity(10);
let mut socket = MioUdpSocket::bind(RECEIVER.parse().unwrap()).unwrap();
poll.registry()
.register(&mut socket, UDP_SOCKET, Interest::READABLE)
.unwrap();
let mut buf = [0; 1 << 16];

// Wake up the client.
start.store(1, Ordering::Relaxed);
wake_one(&*start);

'process: loop {
// Wait for UDP packet or end of simulation.
if let Err(err) = poll.poll(&mut events, Some(Duration::from_secs(1))) {
if err.kind() == ErrorKind::Interrupted {
// Exit if simulation is finished.
if stop.load(Ordering::Relaxed) {
break 'process;
}
continue;
}
break 'process;
}

for event in events.iter() {
match event.token() {
UDP_SOCKET => loop {
match socket.recv_from(&mut buf) {
Ok((packet_size, _)) => {
if let Ok(message) = std::str::from_utf8(&buf[..packet_size]) {
// Inject external message into simulation.
if tx.send(message.into()).is_err() {
break 'process;
}
};
}
Err(e) if e.kind() == ErrorKind::WouldBlock => {
break;
}
_ => {
break 'process;
}
}
},
_ => {
panic!("Got event for unexpected token: {:?}", event);
}
}
}
// Exit if simulation is finished.
if stop.load(Ordering::Relaxed) {
break 'process;
}
}

poll.registry().deregister(&mut socket).unwrap();
}
}

impl Model for Listener {
/// Start UDP Server on model setup.
fn setup(&mut self, _: &SetupContext<Self>) {
let tx = self.tx.take().unwrap();
let start = Arc::clone(&self.start);
let stop = Arc::clone(&self.stop);
self.external_handle = Some(thread::spawn(move || {
Self::listener(tx, start, stop);
}));
}

/// Initialize model.
async fn init(self, context: &Context<Self>) -> InitializedModel<Self> {
// Schedule periodic function that processes external events.
context
.scheduler
.schedule_periodic_event(DELTA, PERIOD, Listener::process, ())
.unwrap();

self.into()
}
}

impl Drop for Listener {
/// Notify UDP Server that simulation is over and wait for server shutdown.
fn drop(&mut self) {
self.stop.store(true, Ordering::Relaxed);
let handle = self.external_handle.take();
if let Some(handle) = handle {
handle.join().unwrap();
}
}
}

fn main() {
// ---------------
// Bench assembly.
// ---------------

// Models.

// Client-server synchronization.
let start = Arc::new(AtomicU32::new(0));

let mut listener = Listener::new(Arc::clone(&start));

// Mailboxes.
let listener_mbox = Mailbox::new();

// Model handles for simulation.
let mut message = EventBuffer::new();
listener.message.connect_sink(&message);

// Start time (arbitrary since models do not depend on absolute time).
let t0 = MonotonicTime::EPOCH;

// Assembly and initialization.
let mut simu = SimInit::new()
.add_model(listener, listener_mbox, "listener")
.set_clock(AutoSystemClock::new())
.init(t0);

// ----------
// Simulation.
// ----------

// External client that sends UDP messages.
let sender_handle = thread::spawn(move || {
// Wait until UDP Server is ready.
wait(&start, 0);

for i in 0..N {
let socket = UdpSocket::bind(SENDER).unwrap();
socket.send_to(i.to_string().as_bytes(), RECEIVER).unwrap();
if i % 3 == 0 {
sleep(PERIOD * i)
}
}
});

// Advance simulation, external messages will be collected.
simu.step_by(Duration::from_secs(2));

// Check collected external messages.
let mut packets = 0_u32;
for _ in 0..N {
// UDP can reorder packages, we are expecting that on not too loaded
// localhost packages would not be dropped
packets |= 1 << message.next().unwrap().parse::<u8>().unwrap();
}
assert_eq!(packets, u32::MAX >> 22);
assert_eq!(message.next(), None);

sender_handle.join().unwrap();
}
Loading

0 comments on commit 468fe87

Please sign in to comment.