Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for filter_map_connect (source & output) #33

Merged
merged 1 commit into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 215 additions & 24 deletions asynchronix/src/ports/output/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,59 +567,140 @@ mod tests {
use futures_executor::block_on;

use crate::channel::Receiver;
use crate::model::Context;
use crate::simulation::{Address, LocalScheduler, Scheduler};
use crate::time::{MonotonicTime, TearableAtomicTime};
use crate::util::priority_queue::PriorityQueue;
use crate::util::sync_cell::SyncCell;

use super::super::sender::{InputSender, ReplierSender};
use super::super::sender::{
FilterMapInputSender, FilterMapReplierSender, InputSender, ReplierSender,
};
use super::*;
use crate::model::Model;
use crate::model::{Context, Model};

struct Counter {
struct SumModel {
inner: Arc<AtomicUsize>,
}
impl Counter {
impl SumModel {
fn new(counter: Arc<AtomicUsize>) -> Self {
Self { inner: counter }
}
async fn inc(&mut self, by: usize) {
async fn increment(&mut self, by: usize) {
self.inner.fetch_add(by, Ordering::Relaxed);
}
async fn fetch_inc(&mut self, by: usize) -> usize {
let res = self.inner.fetch_add(by, Ordering::Relaxed);
res
}
impl Model for SumModel {}

struct DoubleModel {}
impl DoubleModel {
fn new() -> Self {
Self {}
}
async fn double(&mut self, value: usize) -> usize {
2 * value
}
}
impl Model for Counter {}
impl Model for DoubleModel {}

#[test]
fn broadcast_event_smoke() {
const N_RECV: usize = 4;
const MESSAGE: usize = 42;

let mut mailboxes = Vec::new();
let mut broadcaster = EventBroadcaster::default();
for _ in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(InputSender::new(Counter::inc, address));
let sender = Box::new(InputSender::new(SumModel::increment, address));

broadcaster.add(sender);
mailboxes.push(mailbox);
}

let th_broadcast = thread::spawn(move || {
block_on(broadcaster.broadcast(1)).unwrap();
block_on(broadcaster.broadcast(MESSAGE)).unwrap();
});

let sum = Arc::new(AtomicUsize::new(0));

let th_recv: Vec<_> = mailboxes
.into_iter()
.map(|mut mailbox| {
thread::spawn({
let mut sum_model = SumModel::new(sum.clone());

move || {
let dummy_address = Receiver::new(1).sender();
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
String::new(),
LocalScheduler::new(
Scheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
);
block_on(mailbox.recv(&mut sum_model, &dummy_context)).unwrap();
}
})
})
.collect();

th_broadcast.join().unwrap();
for th in th_recv {
th.join().unwrap();
}

assert_eq!(sum.load(Ordering::Relaxed), N_RECV * MESSAGE);
}

#[test]
fn broadcast_event_filter_map() {
const N_RECV: usize = 4;
const BROADCAST_ALL: usize = 42; // special ID signaling that the message must reach all receivers.

let mut mailboxes = Vec::new();
let mut broadcaster = EventBroadcaster::default();
for id in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let id_filter_sender = Box::new(FilterMapInputSender::new(
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
SumModel::increment,
address,
));

broadcaster.add(id_filter_sender);
mailboxes.push(mailbox);
}

let th_broadcast = thread::spawn(move || {
block_on(async {
// Send messages reaching only one receiver each.
for id in 0..N_RECV {
broadcaster.broadcast(id).await.unwrap();
}

// Broadcast the special value to all receivers.
broadcaster.broadcast(BROADCAST_ALL).await.unwrap();

// Send again messages reaching only one receiver each.
for id in 0..N_RECV {
broadcaster.broadcast(id).await.unwrap();
}
})
});

let counter = Arc::new(AtomicUsize::new(0));
let sum = Arc::new(AtomicUsize::new(0));

// Spawn all models.
let th_recv: Vec<_> = mailboxes
.into_iter()
.map(|mut mailbox| {
thread::spawn({
let mut counter = Counter::new(counter.clone());
let mut sum_model = SumModel::new(sum.clone());

move || {
let dummy_address = Receiver::new(1).sender();
Expand All @@ -633,7 +714,11 @@ mod tests {
Address(dummy_address),
),
);
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();
block_on(async {
mailbox.recv(&mut sum_model, &dummy_context).await.unwrap();
mailbox.recv(&mut sum_model, &dummy_context).await.unwrap();
mailbox.recv(&mut sum_model, &dummy_context).await.unwrap();
});
}
})
})
Expand All @@ -644,38 +729,127 @@ mod tests {
th.join().unwrap();
}

assert_eq!(counter.load(Ordering::Relaxed), N_RECV);
assert_eq!(
sum.load(Ordering::Relaxed),
N_RECV * ((N_RECV - 1) + BROADCAST_ALL) // Twice the sum of all IDs + N_RECV times the special value
);
}

#[test]
fn broadcast_query_smoke() {
const N_RECV: usize = 4;
const MESSAGE: usize = 42;

let mut mailboxes = Vec::new();
let mut broadcaster = QueryBroadcaster::default();
for _ in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(ReplierSender::new(Counter::fetch_inc, address));
let sender = Box::new(ReplierSender::new(DoubleModel::double, address));

broadcaster.add(sender);
mailboxes.push(mailbox);
}

let th_broadcast = thread::spawn(move || {
let iter = block_on(broadcaster.broadcast(1)).unwrap();
let iter = block_on(broadcaster.broadcast(MESSAGE)).unwrap();
let sum = iter.fold(0, |acc, val| acc + val);

assert_eq!(sum, N_RECV * (N_RECV - 1) / 2); // sum of {0, 1, 2, ..., (N_RECV - 1)}
sum
});

let counter = Arc::new(AtomicUsize::new(0));
let th_recv: Vec<_> = mailboxes
.into_iter()
.map(|mut mailbox| {
thread::spawn({
let mut double_model = DoubleModel::new();

move || {
let dummy_address = Receiver::new(1).sender();
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
String::new(),
LocalScheduler::new(
Scheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
);
block_on(mailbox.recv(&mut double_model, &dummy_context)).unwrap();
thread::sleep(std::time::Duration::from_millis(100));
}
})
})
.collect();

let sum = th_broadcast.join().unwrap();
for th in th_recv {
th.join().unwrap();
}

assert_eq!(sum, N_RECV * MESSAGE * 2);
}

#[test]
fn broadcast_query_filter_map() {
const N_RECV: usize = 4;
const BROADCAST_ALL: usize = 42; // special ID signaling that the message must reach all receivers.

let mut mailboxes = Vec::new();
let mut broadcaster = QueryBroadcaster::default();
for id in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(FilterMapReplierSender::new(
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
|x| 3 * x,
DoubleModel::double,
address,
));

broadcaster.add(sender);
mailboxes.push(mailbox);
}

let th_broadcast = thread::spawn(move || {
block_on(async {
let mut sum = 0;

// Send messages reaching only one receiver each.
for id in 0..N_RECV {
sum += broadcaster
.broadcast(id)
.await
.unwrap()
.fold(0, |acc, val| acc + val);
}

// Broadcast the special value to all receivers.
sum += broadcaster
.broadcast(BROADCAST_ALL)
.await
.unwrap()
.fold(0, |acc, val| acc + val);

// Send again messages reaching only one receiver each.
for id in 0..N_RECV {
sum += broadcaster
.broadcast(id)
.await
.unwrap()
.fold(0, |acc, val| acc + val);
}

sum
})
});

let th_recv: Vec<_> = mailboxes
.into_iter()
.map(|mut mailbox| {
thread::spawn({
let mut counter = Counter::new(counter.clone());
let mut double_model = DoubleModel::new();

move || {
let dummy_address = Receiver::new(1).sender();
Expand All @@ -689,19 +863,36 @@ mod tests {
Address(dummy_address),
),
);
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();

block_on(async {
mailbox
.recv(&mut double_model, &dummy_context)
.await
.unwrap();
mailbox
.recv(&mut double_model, &dummy_context)
.await
.unwrap();
mailbox
.recv(&mut double_model, &dummy_context)
.await
.unwrap();
});
thread::sleep(std::time::Duration::from_millis(100));
}
})
})
.collect();

th_broadcast.join().unwrap();
let sum = th_broadcast.join().unwrap();
for th in th_recv {
th.join().unwrap();
}

assert_eq!(counter.load(Ordering::Relaxed), N_RECV);
assert_eq!(
sum,
N_RECV * ((N_RECV - 1) + BROADCAST_ALL) * 2 * 3, // Twice the sum of all IDs + N_RECV times the special value, then doubled and tripled
);
}
}

Expand Down
Loading
Loading