diff --git a/.github/workflows/xtask.yml b/.github/workflows/xtask.yml index 34ec5a6..ce9eb84 100644 --- a/.github/workflows/xtask.yml +++ b/.github/workflows/xtask.yml @@ -24,4 +24,5 @@ jobs: - name: compile check run: | cargo clippy -- -D warnings + cargo test cargo xtask run \ No newline at end of file diff --git a/examples/simple/main.rs b/examples/simple/main.rs index 35ad2be..ba3d566 100644 --- a/examples/simple/main.rs +++ b/examples/simple/main.rs @@ -1,19 +1,23 @@ //! Simple demonstration of the Actor trait and corresponding handle //! as implemented for a HelloWorldActor -use glommactor::{handle::ActorHandle, Actor, ActorError, ActorState, Event}; +use glommactor::{ + handle::{ActorHandle, Handle}, + Actor, ActorError, ActorState, Event, +}; use glommio::{executor, Latency, LocalExecutorBuilder, Placement, Shares}; use std::time::Duration; use tracing::Level; use tracing_subscriber::FmtSubscriber; -pub type Reply = flume::Sender<()>; +pub type Reply = flume::Sender; #[derive(Clone, Debug)] pub enum HelloWorldEvent { - SayHello { reply: Reply }, + SayHello { reply: Reply<()> }, Stop, Start, Shutdown, + State { reply: Reply }, } impl Event for HelloWorldEvent {} @@ -36,6 +40,10 @@ struct HandleWrapper { handle: ActorHandle, } +impl Handle for HandleWrapper { + type State = ActorState; +} + impl Clone for HandleWrapper { fn clone(&self) -> Self { Self { @@ -70,6 +78,18 @@ impl HandleWrapper { let _ = self.handle.send(msg).await; Ok(()) } + + async fn state(&self) -> Result<::State, ActorError> { + let (tx, rx) = flume::bounded(1); + + let msg = HelloWorldEvent::State { reply: tx }; + let _ = self.handle.send(msg).await; + Ok(rx.recv_async().await.map_err(|e| { + let msg = format!("Send cancelled {e:}"); + tracing::error!(msg); + ActorError::ActorError(msg) + })?) + } } #[async_trait::async_trait] @@ -134,6 +154,9 @@ impl HelloWorldActor { let (_sender, receiver) = flume::unbounded(); self.receiver = receiver; } + HelloWorldEvent::State { reply } => { + reply.send(self.state.clone()).ok(); + } } tracing::debug!("Processed"); } @@ -192,9 +215,24 @@ fn main() -> Result<(), ActorError> { handle_wrapper.say_hello().await.ok(); tracing::info!("Sent say hello request"); + if let Ok(state) = handle_wrapper.state().await { + tracing::info!("Actor state is {:?}", state); + } + handle_wrapper.stop().await.ok(); tracing::info!("Sent stop request"); + if let Ok(state) = handle_wrapper.state().await { + tracing::info!("Actor state is {:?}", state); + } + + // Expect this to fail due to how the actor is using state to + // drop certain requests (see line 132) + handle_wrapper + .say_hello() + .await + .expect_err("Actor still responded to say_hello despite being stopped"); + // without this call, because we cloned the handle above, the program would never terminate handle_wrapper.shutdown().await.ok(); tracing::info!("Sent shutdown request"); diff --git a/src/handle.rs b/src/handle.rs index ab822f4..619e85b 100644 --- a/src/handle.rs +++ b/src/handle.rs @@ -1,8 +1,14 @@ use crate::{Actor, ActorError, ActorId, ActorState, Event, SupervisorMessage}; -pub trait Handle {} +pub trait Handle { + type State; +} + +pub trait State {} -impl Handle for ActorHandle {} +impl Handle for ActorHandle { + type State = Box; +} #[derive(Clone)] pub struct ActorHandle { @@ -43,7 +49,9 @@ impl Clone for SupervisedActorHandle { } } -impl Handle for SupervisedActorHandle {} +impl Handle for SupervisedActorHandle { + type State = ActorState; +} #[async_trait::async_trait] pub trait SupervisedHandle: Handle { @@ -52,7 +60,7 @@ pub trait SupervisedHandle: Handle { async fn send_shutdown(&self) -> Result<(), Self::Error>; async fn send_start(&self) -> Result<(), Self::Error>; async fn subscribe_direct(&self) -> Self::Rx; - async fn actor_state(&self) -> Result; + async fn actor_state(&self) -> Result<::State, Self::Error>; } #[async_trait::async_trait] @@ -68,7 +76,7 @@ impl SupervisedHandle for SupervisedActorHandle { async fn subscribe_direct(&self) -> Self::Rx { self.handle_subscribe_direct().await } - async fn actor_state(&self) -> Result { + async fn actor_state(&self) -> Result<::State, Self::Error> { self.get_state().await } } diff --git a/src/lib.rs b/src/lib.rs index b622055..b121615 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,3 +10,90 @@ pub use error::ActorError; pub use supervisor::{Supervision, Supervisor, SupervisorHandle, SupervisorMessage}; pub type ActorId = u16; + +#[cfg(test)] +mod tests { + use super::handle::ActorHandle; + use super::*; + + #[test] + fn single_event() { + pub type Reply = flume::Sender; + pub enum HelloEvent { + SayHello { reply: Reply<()> }, + } + impl Event for HelloEvent {} + struct HelloActor { + receiver: flume::Receiver, + } + + impl HelloActor { + fn new(receiver: flume::Receiver) -> Self { + Self { receiver } + } + async fn say_hello(&mut self) { + println!("Hello hi hey!!"); + } + + async fn get_single_event(mut self) -> Result<(), ActorError> { + let HelloEvent::SayHello { reply } = self + .receiver + .recv_async() + .await + .expect("Failed to receive event"); + self.say_hello().await; + reply.send(()).expect("Failed to send reply"); + + Ok(()) + } + } + + #[async_trait::async_trait] + impl Actor for HelloActor + where + HelloEvent: Event + Send, + { + type Rx = futures::channel::mpsc::Receiver; + type Error = ActorError; + type Result = Result<(), Self::Error>; + async fn run(self) -> Self::Result { + self.get_single_event().await + } + } + + impl ActorHandle { + async fn say_hello(&self) -> Result<(), ActorError> { + let (tx, rx) = flume::bounded(1); + let msg = HelloEvent::SayHello { reply: tx }; + self.send(msg) + .await + .expect("Faied to send from actor handle"); + rx.recv_async() + .await + .expect("Failed to recv from actor handle"); + Ok(()) + } + } + + let (actor, handle) = ActorHandle::new(HelloActor::new); + + let handle = glommio::LocalExecutorBuilder::new(glommio::Placement::Fixed(0)) + .name(&format!("{}{}", "test-handle", 0)) + .spawn(move || async move { + let tq = glommio::executor().create_task_queue( + glommio::Shares::default(), + glommio::Latency::NotImportant, + "test", + ); + + let task = glommio::spawn_local_into(actor.run(), tq) + .map(|t| t.detach()) + .expect("Failed to spawn detached task"); + handle.say_hello().await.expect("Failed to say hello"); + task.await; + }) + .expect("Failed to execute"); + + handle.join().expect("Failed to join glommio join handle"); + } +}