diff --git a/Cargo.toml b/Cargo.toml index c925092..bf19e1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,15 +7,17 @@ edition = "2021" [dependencies] thiserror = "2.0.9" futures = "0.3.31" -glommio = { version = "0.9.1", git="https://github.com/nand-nor/glommio", rev="bf0c68d"} +glommio = { version = "0.9.0" } flume = {version= "0.11.1", features=["async"]} -async-trait = "0.1.83" +async-trait = "0.1.84" tracing = {version = "0.1.41"} async-priority-channel = "0.2.0" async-broadcast = "0.7.2" [dev-dependencies] tracing-subscriber = { version="0.3.0"} +bencher = "0.1.5" +act-zero = {version = "0.4.0", git="https://github.com/nand-nor/act-zero", rev="eb01aec", features=["glommio"], default-features=false } [[example]] name = "simple" @@ -27,4 +29,9 @@ name = "supervised_simple" name = "supervisor_core" [[example]] -name = "priority" \ No newline at end of file +name = "priority" + +[[bench]] +name = "actors" +path = "benches/actors.rs" +harness = false \ No newline at end of file diff --git a/README.md b/README.md index a4c2612..c412265 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,27 @@ # glommactor -An simple actor framework for the glommio runtime (for learning purposes only)! And someday -maybe other runtimes if the design allows for it. +An simple actor framework for the glommio runtime. This crate is for learning purposes only. It is heavily inspired by [act-zero](https://www.github.com/Diggsey/act-zero) and Alice Ryhl's +[tokio actors](https://ryhl.io/blog/actors-with-tokio/). Honorable mention to Erlang's Actor model as well. -Heavily inspired by [act-zero](https://www.github.com/Diggsey/act-zero) and Alice Ryhl's -[tokio actors](https://ryhl.io/blog/actors-with-tokio/). \ No newline at end of file +# Comparison to Other Actor Frameworks + +The key difference between glommactor and other actor frameworks is that it is +intended to run within [glommio, a thread-per-core runtime](https://www.github.com/DataDogHQ/glommio). Almost all of the (reasonably usable) actor crates with async support I found are not runtime agnostic; most assume tokio with a handful supporting async-std. The standout exception here is the [act-zero crate](https://www.github.com/Diggsey/act-zero), which in general is an excellent runtime-agnostic actor framework. However I found that act-zero does not support all my desired use cases. This of course is solveable, but the crate also seems to not really be maintained anymore. So I decided to build something simple to support my use cases while learning more about actors. + +# Usecases + +The main usecase I wanted to build for is allowing messaging/actor interaction across cores. So the goal is to have an actor that is pinned to a specific core, where that actor can be interacted with from a handle within tasks executing on separate cores, as well as on the actor-pinned core. This is mainly intended for command and control but could potentially also be used for data processing in limited scenarios (since this arch is intended for zero-sharing between cores). + +# Benchmarks + +To get some simple comparison benchmarks I implemented glommio support in a [fork of the act-zero crate](https://github.com/nand-nor/act-zero/tree/add/glommio) and have used that for the benchmarks listed below. + +The OS is WSL (5.15.167.4-microsoft-standard-WSL2 #1 SMP x86_64) using cores 0, 1, 2, and 3 of a 12th Gen Intel(R) Core(TM) i9-12900K CPU: +``` + Running benches/actors.rs (target/release/deps/actors-b03984d68620e47f) + +running 2 tests +test test_actzero ... bench: 2,342,768 ns/iter (+/- 831,846) +test test_glom ... bench: 2,474,159 ns/iter (+/- 845,877) + +test result: ok. 0 passed; 0 failed; 0 ignored; 2 measured +``` \ No newline at end of file diff --git a/benches/actors.rs b/benches/actors.rs new file mode 100644 index 0000000..918c860 --- /dev/null +++ b/benches/actors.rs @@ -0,0 +1,282 @@ +//! Benchmarks comparing act_zero and glommactor +#[macro_use] +extern crate bencher; + +use act_zero::runtimes::glommio::spawn_actor_with_tq; +use act_zero::*; +use bencher::Bencher; +use glommactor::{ + handle::{ActorHandle, Handle}, + spawn_exec_actor, spawn_exec_handle_fut, Actor, ActorError, Event, +}; +use glommio::{executor, Latency, LocalExecutorBuilder, Placement, Shares}; +use std::time::Duration; + +pub type Reply = flume::Sender; + +#[derive(Clone, Debug)] +pub enum HelloWorldEvent { + SayHello { reply: Reply<()> }, + State { reply: Reply }, +} + +#[derive(Clone, Debug)] +pub enum HelloState { + Stopped, + Started, + Running, +} + +impl Event for HelloWorldEvent {} + +struct HelloWorldActor { + receiver: flume::Receiver, + state: HelloState, +} + +impl HelloWorldActor { + fn new(receiver: flume::Receiver) -> Self { + Self { + receiver, + state: HelloState::Started, + } + } +} + +// impls needed for act_zero +impl act_zero::Actor for HelloWorldActor {} + +impl act_zero::IntoActorResult for HelloState { + type Output = HelloState; + + fn into_actor_result(self) -> ActorResult { + Ok(Produces::Value(self)) + } +} + +struct HandleWrapper { + handle: ActorHandle< + HelloWorldEvent, + flume::Sender, + flume::Receiver, + >, +} + +#[async_trait::async_trait] +impl Handle for HandleWrapper { + type State = HelloState; + type Result = , + flume::Receiver, + > as Handle>::Result; + async fn send(&self, event: HelloWorldEvent) -> Self::Result { + self.handle.send(event).await + } +} + +impl Clone for HandleWrapper { + fn clone(&self) -> Self { + Self { + handle: self.handle.clone(), + } + } +} + +impl HandleWrapper { + async fn say_hello(&self) -> Result<(), ActorError> { + let (tx, rx) = flume::bounded(1); + let msg = HelloWorldEvent::SayHello { reply: tx }; + self.handle.send(msg).await.ok(); + + rx.recv_async().await.map_err(|e| { + let msg = format!("Send cancelled {e:}"); + tracing::error!(msg); + ActorError::ActorError(msg) + })?; + + Ok(()) + } + + async fn state( + &self, + ) -> Result<>::State, ActorError> { + let (tx, rx) = flume::bounded(1); + + let msg = HelloWorldEvent::State { reply: tx }; + let _ = self.handle.send(msg).await; + rx.recv_async().await.map_err(|e| { + let msg = format!("Send cancelled {e:}"); + tracing::error!(msg); + ActorError::ActorError(msg) + }) + } +} + +#[async_trait::async_trait] +impl Actor for HelloWorldActor +where + HelloWorldEvent: Event + Send, +{ + type Rx = futures::channel::mpsc::Receiver; + type Error = ActorError; + type Result = Result<(), Self::Error>; + async fn run(self) -> Self::Result { + self.event_loop().await + } +} + +impl HelloWorldActor { + async fn say_hello(&mut self) { + tracing::info!("Hello, world!"); + } + + async fn state(&mut self) -> HelloState { + self.state.clone() + } + + async fn event_loop(mut self) -> Result<(), ActorError> { + self.state = HelloState::Running; + while let Ok(event) = self.receiver.recv_async().await { + self.process(event).await + } + Ok(()) + } + + async fn process(&mut self, event: HelloWorldEvent) { + match event { + HelloWorldEvent::SayHello { reply } => { + { + self.say_hello().await; + reply.send(()) + } + .ok(); + } + HelloWorldEvent::State { reply } => { + reply.send(self.state().await).ok(); + } + } + } +} + +fn glommactor_main() -> Result<(), ActorError> { + let mut handle_vec = vec![]; + + let (tx, rx) = flume::unbounded(); + // create actor and handle before running in local executor tasks + let (actor, handle) = ActorHandle::new(HelloWorldActor::new, tx, rx); + let handle_wrapper = HandleWrapper { handle }; + + // pin actor to core 0 + handle_vec.push( + spawn_exec_actor( + actor, + 10, + Latency::Matters(Duration::from_millis(1)), + Placement::Fixed(5), + "rt-actor", + "tq-actor", + ) + .expect("Unable to spawn actor onto runtime"), + ); + + // define a future for the handle spawner function to execute + let fut = async move { + handle_wrapper.say_hello().await.ok(); + let _state = handle_wrapper.state().await.expect("Failed to get state"); + }; + + // pins future where handle to actor is operating to core 1 + handle_vec.push( + spawn_exec_handle_fut( + 10, + Latency::Matters(Duration::from_millis(10)), + Placement::Fixed(6), + "rt-handle", + "tq-handle", + fut, + ) + .expect("Unable to spawn actor onto runtime"), + ); + + for handle in handle_vec { + handle.join().unwrap(); + } + + Ok(()) +} + +fn act_zero_main() -> Result<(), act_zero::ActorError> { + let mut handle_vec = vec![]; + + handle_vec.push( + LocalExecutorBuilder::new(Placement::Fixed(2)) + .name(&format!("{}{}", "actor", 0)) + .spawn(move || async move { + let tq: glommio::TaskQueueHandle = executor().create_task_queue( + Shares::Static(10), + Latency::Matters(std::time::Duration::from_millis(10)), + "other-actor-tq", + ); + + let (_tx, rx) = flume::unbounded(); + let addr = spawn_actor_with_tq( + HelloWorldActor { + receiver: rx, + state: HelloState::Stopped, + }, + tq, + ); + call!(addr.say_hello()).await.unwrap(); + let _state = call!(addr.state()).await.unwrap(); + + /* Add some other work to the task queue + let (addr_tx, addr_rx) = flume::unbounded(); + addr_tx.send_async(addr.clone()).await.expect("Failed"); + + // add some other work to the task queue + let fut = async move { + let fut_addr = addr_rx.recv_async().await.expect("Failed"); + call!(fut_addr.say_hello()).await.unwrap(); + let _state = call!(fut_addr.state()).await.unwrap(); + }; + + let t = glommio::spawn_local_into(fut, tq).map(|t| t.detach()).expect("Failed"); + t.await; + */ + }) + .unwrap(), + ); + + handle_vec.push( + LocalExecutorBuilder::new(Placement::Fixed(3)) + .name(&format!("{}{}", "busy-work", 1)) + .spawn(move || async move { + // busy work to simulate action on 2 cores + for i in 0..1000 { + let _x = i + 1; + } + }) + .unwrap(), + ); + + for handle in handle_vec { + handle.join().unwrap(); + } + + Ok(()) +} + +fn test_glom(bencher: &mut Bencher) { + bencher.iter(|| glommactor_main().expect("Failure to bench using glommactor_main")); +} + +fn test_actzero(bencher: &mut Bencher) { + bencher.iter(|| act_zero_main().expect("Failure to bench using act_zero_main")); +} + +benchmark_group!(test_glommactor, test_glom); + +benchmark_group!(test_act_zero, test_actzero); + +benchmark_main!(test_glommactor, test_act_zero);