Skip to content

Commit

Permalink
Merge pull request #8 from nand-nor/add/benchmarks
Browse files Browse the repository at this point in the history
Add/benchmarks
  • Loading branch information
nand-nor authored Jan 3, 2025
2 parents 91e27b7 + e569a3b commit f8f1918
Show file tree
Hide file tree
Showing 9 changed files with 399 additions and 33 deletions.
34 changes: 31 additions & 3 deletions .github/workflows/xtask.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,36 @@ jobs:

- uses: actions/checkout@v3

- name: compile check
- name: compile & run check
run: |
cargo xtask run
test:
name: Tests
runs-on: ubuntu-latest

steps:
- uses: dtolnay/rust-toolchain@stable
with:
target: x86_64-unknown-linux-gnu

- uses: actions/checkout@v3

- name: test check
run: |
cargo clippy -- -D warnings
cargo test
cargo xtask run
clippy:
name: Clippy
runs-on: ubuntu-latest

steps:
- uses: dtolnay/rust-toolchain@stable
with:
target: x86_64-unknown-linux-gnu

- uses: actions/checkout@v3

- name: clippy check
run: |
cargo clippy --all-features -- -D warnings
13 changes: 10 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ edition = "2021"
[dependencies]
thiserror = "2.0.9"
futures = "0.3.31"
glommio = { version = "0.9.1", git="https://github.com/nand-nor/glommio", rev="bf0c68d"}
glommio = { version = "0.9.0" }
flume = {version= "0.11.1", features=["async"]}
async-trait = "0.1.83"
async-trait = "0.1.84"
tracing = {version = "0.1.41"}
async-priority-channel = "0.2.0"
async-broadcast = "0.7.2"

[dev-dependencies]
tracing-subscriber = { version="0.3.0"}
bencher = "0.1.5"
act-zero = {version = "0.4.0", git="https://github.com/nand-nor/act-zero", rev="eb01aec", features=["glommio"], default-features=false }

[[example]]
name = "simple"
Expand All @@ -27,4 +29,9 @@ name = "supervised_simple"
name = "supervisor_core"

[[example]]
name = "priority"
name = "priority"

[[bench]]
name = "actors"
path = "benches/actors.rs"
harness = false
29 changes: 25 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
# glommactor
An simple actor framework for the glommio runtime (for learning purposes only)! And someday
maybe other runtimes if the design allows for it.
An simple actor framework for the glommio runtime. This crate is for learning purposes only. It is heavily inspired by [act-zero](https://www.github.com/Diggsey/act-zero) and Alice Ryhl's
[tokio actors](https://ryhl.io/blog/actors-with-tokio/). Honorable mention to Erlang's Actor model as well.

Heavily inspired by [act-zero](https://www.github.com/Diggsey/act-zero) and Alice Ryhl's
[tokio actors](https://ryhl.io/blog/actors-with-tokio/).
# Comparison to Other Actor Frameworks

The key difference between glommactor and other actor frameworks is that it is
intended to run within [glommio, a thread-per-core runtime](https://www.github.com/DataDogHQ/glommio). Almost all of the (reasonably usable) actor crates with async support I found are not runtime agnostic; most assume tokio with a handful supporting async-std. The standout exception here is the [act-zero crate](https://www.github.com/Diggsey/act-zero), which in general is an excellent runtime-agnostic actor framework. However I found that act-zero does not support all my desired use cases. This of course is solveable, but the crate also seems to not really be maintained anymore. So I decided to build something simple to support my use cases while learning more about actors.

# Usecases

The main usecase I wanted to build for is allowing messaging/actor interaction across cores. So the goal is to have an actor that is pinned to a specific core, where that actor can be interacted with from a handle within tasks executing on separate cores, as well as on the actor-pinned core. This is mainly intended for command and control but could potentially also be used for data processing in limited scenarios (since this arch is intended for zero-sharing between cores).

# Benchmarks

To get some simple comparison benchmarks I implemented glommio support in a [fork of the act-zero crate](https://github.com/nand-nor/act-zero/tree/add/glommio) and have used that for the benchmarks listed below.

The OS is WSL (5.15.167.4-microsoft-standard-WSL2 #1 SMP x86_64) using cores 0, 1, 2, and 3 of a 12th Gen Intel(R) Core(TM) i9-12900K CPU:
```
Running benches/actors.rs (target/release/deps/actors-b03984d68620e47f)
running 2 tests
test test_actzero ... bench: 2,342,768 ns/iter (+/- 831,846)
test test_glom ... bench: 2,474,159 ns/iter (+/- 845,877)
test result: ok. 0 passed; 0 failed; 0 ignored; 2 measured
```
282 changes: 282 additions & 0 deletions benches/actors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
//! Benchmarks comparing act_zero and glommactor
#[macro_use]
extern crate bencher;

use act_zero::runtimes::glommio::spawn_actor_with_tq;
use act_zero::*;
use bencher::Bencher;
use glommactor::{
handle::{ActorHandle, Handle},
spawn_exec_actor, spawn_exec_handle_fut, Actor, ActorError, Event,
};
use glommio::{executor, Latency, LocalExecutorBuilder, Placement, Shares};
use std::time::Duration;

pub type Reply<T> = flume::Sender<T>;

#[derive(Clone, Debug)]
pub enum HelloWorldEvent {
SayHello { reply: Reply<()> },
State { reply: Reply<HelloState> },
}

#[derive(Clone, Debug)]
pub enum HelloState {
Stopped,
Started,
Running,
}

impl Event for HelloWorldEvent {}

struct HelloWorldActor {
receiver: flume::Receiver<HelloWorldEvent>,
state: HelloState,
}

impl HelloWorldActor {
fn new(receiver: flume::Receiver<HelloWorldEvent>) -> Self {
Self {
receiver,
state: HelloState::Started,
}
}
}

// impls needed for act_zero
impl act_zero::Actor for HelloWorldActor {}

impl act_zero::IntoActorResult for HelloState {
type Output = HelloState;

fn into_actor_result(self) -> ActorResult<Self::Output> {
Ok(Produces::Value(self))
}
}

struct HandleWrapper {
handle: ActorHandle<
HelloWorldEvent,
flume::Sender<HelloWorldEvent>,
flume::Receiver<HelloWorldEvent>,
>,
}

#[async_trait::async_trait]
impl Handle<HelloWorldEvent> for HandleWrapper {
type State = HelloState;
type Result = <ActorHandle<
HelloWorldEvent,
flume::Sender<HelloWorldEvent>,
flume::Receiver<HelloWorldEvent>,
> as Handle<HelloWorldEvent>>::Result;
async fn send(&self, event: HelloWorldEvent) -> Self::Result {
self.handle.send(event).await
}
}

impl Clone for HandleWrapper {
fn clone(&self) -> Self {
Self {
handle: self.handle.clone(),
}
}
}

impl HandleWrapper {
async fn say_hello(&self) -> Result<(), ActorError<HelloWorldEvent>> {
let (tx, rx) = flume::bounded(1);
let msg = HelloWorldEvent::SayHello { reply: tx };
self.handle.send(msg).await.ok();

rx.recv_async().await.map_err(|e| {
let msg = format!("Send cancelled {e:}");
tracing::error!(msg);
ActorError::ActorError(msg)
})?;

Ok(())
}

async fn state(
&self,
) -> Result<<Self as Handle<HelloWorldEvent>>::State, ActorError<HelloWorldEvent>> {
let (tx, rx) = flume::bounded(1);

let msg = HelloWorldEvent::State { reply: tx };
let _ = self.handle.send(msg).await;
rx.recv_async().await.map_err(|e| {
let msg = format!("Send cancelled {e:}");
tracing::error!(msg);
ActorError::ActorError(msg)
})
}
}

#[async_trait::async_trait]
impl Actor<HelloWorldEvent> for HelloWorldActor
where
HelloWorldEvent: Event + Send,
{
type Rx = futures::channel::mpsc::Receiver<HelloWorldEvent>;
type Error = ActorError<HelloWorldEvent>;
type Result = Result<(), Self::Error>;
async fn run(self) -> Self::Result {
self.event_loop().await
}
}

impl HelloWorldActor {
async fn say_hello(&mut self) {
tracing::info!("Hello, world!");
}

async fn state(&mut self) -> HelloState {
self.state.clone()
}

async fn event_loop(mut self) -> Result<(), ActorError<HelloWorldEvent>> {
self.state = HelloState::Running;
while let Ok(event) = self.receiver.recv_async().await {
self.process(event).await
}
Ok(())
}

async fn process(&mut self, event: HelloWorldEvent) {
match event {
HelloWorldEvent::SayHello { reply } => {
{
self.say_hello().await;
reply.send(())
}
.ok();
}
HelloWorldEvent::State { reply } => {
reply.send(self.state().await).ok();
}
}
}
}

fn glommactor_main() -> Result<(), ActorError<HelloWorldEvent>> {
let mut handle_vec = vec![];

let (tx, rx) = flume::unbounded();
// create actor and handle before running in local executor tasks
let (actor, handle) = ActorHandle::new(HelloWorldActor::new, tx, rx);
let handle_wrapper = HandleWrapper { handle };

// pin actor to core 0
handle_vec.push(
spawn_exec_actor(
actor,
10,
Latency::Matters(Duration::from_millis(1)),
Placement::Fixed(5),
"rt-actor",
"tq-actor",
)
.expect("Unable to spawn actor onto runtime"),
);

// define a future for the handle spawner function to execute
let fut = async move {
handle_wrapper.say_hello().await.ok();
let _state = handle_wrapper.state().await.expect("Failed to get state");
};

// pins future where handle to actor is operating to core 1
handle_vec.push(
spawn_exec_handle_fut(
10,
Latency::Matters(Duration::from_millis(10)),
Placement::Fixed(6),
"rt-handle",
"tq-handle",
fut,
)
.expect("Unable to spawn actor onto runtime"),
);

for handle in handle_vec {
handle.join().unwrap();
}

Ok(())
}

fn act_zero_main() -> Result<(), act_zero::ActorError> {
let mut handle_vec = vec![];

handle_vec.push(
LocalExecutorBuilder::new(Placement::Fixed(2))
.name(&format!("{}{}", "actor", 0))
.spawn(move || async move {
let tq: glommio::TaskQueueHandle = executor().create_task_queue(
Shares::Static(10),
Latency::Matters(std::time::Duration::from_millis(10)),
"other-actor-tq",
);

let (_tx, rx) = flume::unbounded();
let addr = spawn_actor_with_tq(
HelloWorldActor {
receiver: rx,
state: HelloState::Stopped,
},
tq,
);
call!(addr.say_hello()).await.unwrap();
let _state = call!(addr.state()).await.unwrap();

/* Add some other work to the task queue
let (addr_tx, addr_rx) = flume::unbounded();
addr_tx.send_async(addr.clone()).await.expect("Failed");
// add some other work to the task queue
let fut = async move {
let fut_addr = addr_rx.recv_async().await.expect("Failed");
call!(fut_addr.say_hello()).await.unwrap();
let _state = call!(fut_addr.state()).await.unwrap();
};
let t = glommio::spawn_local_into(fut, tq).map(|t| t.detach()).expect("Failed");
t.await;
*/
})
.unwrap(),
);

handle_vec.push(
LocalExecutorBuilder::new(Placement::Fixed(3))
.name(&format!("{}{}", "busy-work", 1))
.spawn(move || async move {
// busy work to simulate action on 2 cores
for i in 0..1000 {
let _x = i + 1;
}
})
.unwrap(),
);

for handle in handle_vec {
handle.join().unwrap();
}

Ok(())
}

fn test_glom(bencher: &mut Bencher) {
bencher.iter(|| glommactor_main().expect("Failure to bench using glommactor_main"));
}

fn test_actzero(bencher: &mut Bencher) {
bencher.iter(|| act_zero_main().expect("Failure to bench using act_zero_main"));
}

benchmark_group!(test_glommactor, test_glom);

benchmark_group!(test_act_zero, test_actzero);

benchmark_main!(test_glommactor, test_act_zero);
Loading

0 comments on commit f8f1918

Please sign in to comment.