Skip to content

Commit

Permalink
Merge pull request #33 from asynchronics/feature/connect_map
Browse files Browse the repository at this point in the history
Add tests for filter_map_connect (source & output)
  • Loading branch information
jauhien authored Aug 7, 2024
2 parents 252ada4 + 525f708 commit 1f3e04e
Show file tree
Hide file tree
Showing 2 changed files with 430 additions and 48 deletions.
239 changes: 215 additions & 24 deletions asynchronix/src/ports/output/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,59 +567,140 @@ mod tests {
use futures_executor::block_on;

use crate::channel::Receiver;
use crate::model::Context;
use crate::simulation::{Address, LocalScheduler, Scheduler};
use crate::time::{MonotonicTime, TearableAtomicTime};
use crate::util::priority_queue::PriorityQueue;
use crate::util::sync_cell::SyncCell;

use super::super::sender::{InputSender, ReplierSender};
use super::super::sender::{
FilterMapInputSender, FilterMapReplierSender, InputSender, ReplierSender,
};
use super::*;
use crate::model::Model;
use crate::model::{Context, Model};

struct Counter {
struct SumModel {
inner: Arc<AtomicUsize>,
}
impl Counter {
impl SumModel {
fn new(counter: Arc<AtomicUsize>) -> Self {
Self { inner: counter }
}
async fn inc(&mut self, by: usize) {
async fn increment(&mut self, by: usize) {
self.inner.fetch_add(by, Ordering::Relaxed);
}
async fn fetch_inc(&mut self, by: usize) -> usize {
let res = self.inner.fetch_add(by, Ordering::Relaxed);
res
}
impl Model for SumModel {}

struct DoubleModel {}
impl DoubleModel {
fn new() -> Self {
Self {}
}
async fn double(&mut self, value: usize) -> usize {
2 * value
}
}
impl Model for Counter {}
impl Model for DoubleModel {}

#[test]
fn broadcast_event_smoke() {
const N_RECV: usize = 4;
const MESSAGE: usize = 42;

let mut mailboxes = Vec::new();
let mut broadcaster = EventBroadcaster::default();
for _ in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(InputSender::new(Counter::inc, address));
let sender = Box::new(InputSender::new(SumModel::increment, address));

broadcaster.add(sender);
mailboxes.push(mailbox);
}

let th_broadcast = thread::spawn(move || {
block_on(broadcaster.broadcast(1)).unwrap();
block_on(broadcaster.broadcast(MESSAGE)).unwrap();
});

let sum = Arc::new(AtomicUsize::new(0));

let th_recv: Vec<_> = mailboxes
.into_iter()
.map(|mut mailbox| {
thread::spawn({
let mut sum_model = SumModel::new(sum.clone());

move || {
let dummy_address = Receiver::new(1).sender();
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
String::new(),
LocalScheduler::new(
Scheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
);
block_on(mailbox.recv(&mut sum_model, &dummy_context)).unwrap();
}
})
})
.collect();

th_broadcast.join().unwrap();
for th in th_recv {
th.join().unwrap();
}

assert_eq!(sum.load(Ordering::Relaxed), N_RECV * MESSAGE);
}

#[test]
fn broadcast_event_filter_map() {
const N_RECV: usize = 4;
const BROADCAST_ALL: usize = 42; // special ID signaling that the message must reach all receivers.

let mut mailboxes = Vec::new();
let mut broadcaster = EventBroadcaster::default();
for id in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let id_filter_sender = Box::new(FilterMapInputSender::new(
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
SumModel::increment,
address,
));

broadcaster.add(id_filter_sender);
mailboxes.push(mailbox);
}

let th_broadcast = thread::spawn(move || {
block_on(async {
// Send messages reaching only one receiver each.
for id in 0..N_RECV {
broadcaster.broadcast(id).await.unwrap();
}

// Broadcast the special value to all receivers.
broadcaster.broadcast(BROADCAST_ALL).await.unwrap();

// Send again messages reaching only one receiver each.
for id in 0..N_RECV {
broadcaster.broadcast(id).await.unwrap();
}
})
});

let counter = Arc::new(AtomicUsize::new(0));
let sum = Arc::new(AtomicUsize::new(0));

// Spawn all models.
let th_recv: Vec<_> = mailboxes
.into_iter()
.map(|mut mailbox| {
thread::spawn({
let mut counter = Counter::new(counter.clone());
let mut sum_model = SumModel::new(sum.clone());

move || {
let dummy_address = Receiver::new(1).sender();
Expand All @@ -633,7 +714,11 @@ mod tests {
Address(dummy_address),
),
);
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();
block_on(async {
mailbox.recv(&mut sum_model, &dummy_context).await.unwrap();
mailbox.recv(&mut sum_model, &dummy_context).await.unwrap();
mailbox.recv(&mut sum_model, &dummy_context).await.unwrap();
});
}
})
})
Expand All @@ -644,38 +729,127 @@ mod tests {
th.join().unwrap();
}

assert_eq!(counter.load(Ordering::Relaxed), N_RECV);
assert_eq!(
sum.load(Ordering::Relaxed),
N_RECV * ((N_RECV - 1) + BROADCAST_ALL) // Twice the sum of all IDs + N_RECV times the special value
);
}

#[test]
fn broadcast_query_smoke() {
const N_RECV: usize = 4;
const MESSAGE: usize = 42;

let mut mailboxes = Vec::new();
let mut broadcaster = QueryBroadcaster::default();
for _ in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(ReplierSender::new(Counter::fetch_inc, address));
let sender = Box::new(ReplierSender::new(DoubleModel::double, address));

broadcaster.add(sender);
mailboxes.push(mailbox);
}

let th_broadcast = thread::spawn(move || {
let iter = block_on(broadcaster.broadcast(1)).unwrap();
let iter = block_on(broadcaster.broadcast(MESSAGE)).unwrap();
let sum = iter.fold(0, |acc, val| acc + val);

assert_eq!(sum, N_RECV * (N_RECV - 1) / 2); // sum of {0, 1, 2, ..., (N_RECV - 1)}
sum
});

let counter = Arc::new(AtomicUsize::new(0));
let th_recv: Vec<_> = mailboxes
.into_iter()
.map(|mut mailbox| {
thread::spawn({
let mut double_model = DoubleModel::new();

move || {
let dummy_address = Receiver::new(1).sender();
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
String::new(),
LocalScheduler::new(
Scheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
);
block_on(mailbox.recv(&mut double_model, &dummy_context)).unwrap();
thread::sleep(std::time::Duration::from_millis(100));
}
})
})
.collect();

let sum = th_broadcast.join().unwrap();
for th in th_recv {
th.join().unwrap();
}

assert_eq!(sum, N_RECV * MESSAGE * 2);
}

#[test]
fn broadcast_query_filter_map() {
const N_RECV: usize = 4;
const BROADCAST_ALL: usize = 42; // special ID signaling that the message must reach all receivers.

let mut mailboxes = Vec::new();
let mut broadcaster = QueryBroadcaster::default();
for id in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(FilterMapReplierSender::new(
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
|x| 3 * x,
DoubleModel::double,
address,
));

broadcaster.add(sender);
mailboxes.push(mailbox);
}

let th_broadcast = thread::spawn(move || {
block_on(async {
let mut sum = 0;

// Send messages reaching only one receiver each.
for id in 0..N_RECV {
sum += broadcaster
.broadcast(id)
.await
.unwrap()
.fold(0, |acc, val| acc + val);
}

// Broadcast the special value to all receivers.
sum += broadcaster
.broadcast(BROADCAST_ALL)
.await
.unwrap()
.fold(0, |acc, val| acc + val);

// Send again messages reaching only one receiver each.
for id in 0..N_RECV {
sum += broadcaster
.broadcast(id)
.await
.unwrap()
.fold(0, |acc, val| acc + val);
}

sum
})
});

let th_recv: Vec<_> = mailboxes
.into_iter()
.map(|mut mailbox| {
thread::spawn({
let mut counter = Counter::new(counter.clone());
let mut double_model = DoubleModel::new();

move || {
let dummy_address = Receiver::new(1).sender();
Expand All @@ -689,19 +863,36 @@ mod tests {
Address(dummy_address),
),
);
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();

block_on(async {
mailbox
.recv(&mut double_model, &dummy_context)
.await
.unwrap();
mailbox
.recv(&mut double_model, &dummy_context)
.await
.unwrap();
mailbox
.recv(&mut double_model, &dummy_context)
.await
.unwrap();
});
thread::sleep(std::time::Duration::from_millis(100));
}
})
})
.collect();

th_broadcast.join().unwrap();
let sum = th_broadcast.join().unwrap();
for th in th_recv {
th.join().unwrap();
}

assert_eq!(counter.load(Ordering::Relaxed), N_RECV);
assert_eq!(
sum,
N_RECV * ((N_RECV - 1) + BROADCAST_ALL) * 2 * 3, // Twice the sum of all IDs + N_RECV times the special value, then doubled and tripled
);
}
}

Expand Down
Loading

0 comments on commit 1f3e04e

Please sign in to comment.