Skip to content

Commit

Permalink
Merge pull request #6 from nand-nor/add/tests
Browse files Browse the repository at this point in the history
Add test to CI
  • Loading branch information
nand-nor authored Dec 31, 2024
2 parents 7fe457c + 3516e37 commit 1c27016
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 8 deletions.
1 change: 1 addition & 0 deletions .github/workflows/xtask.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ jobs:
- name: compile check
run: |
cargo clippy -- -D warnings
cargo test
cargo xtask run
44 changes: 41 additions & 3 deletions examples/simple/main.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
//! Simple demonstration of the Actor trait and corresponding handle
//! as implemented for a HelloWorldActor
use glommactor::{handle::ActorHandle, Actor, ActorError, ActorState, Event};
use glommactor::{
handle::{ActorHandle, Handle},
Actor, ActorError, ActorState, Event,
};
use glommio::{executor, Latency, LocalExecutorBuilder, Placement, Shares};
use std::time::Duration;
use tracing::Level;
use tracing_subscriber::FmtSubscriber;

pub type Reply = flume::Sender<()>;
pub type Reply<T> = flume::Sender<T>;

#[derive(Clone, Debug)]
pub enum HelloWorldEvent {
SayHello { reply: Reply },
SayHello { reply: Reply<()> },
Stop,
Start,
Shutdown,
State { reply: Reply<ActorState> },
}

impl Event for HelloWorldEvent {}
Expand All @@ -36,6 +40,10 @@ struct HandleWrapper {
handle: ActorHandle<HelloWorldEvent>,
}

impl Handle for HandleWrapper {
type State = ActorState;
}

impl Clone for HandleWrapper {
fn clone(&self) -> Self {
Self {
Expand Down Expand Up @@ -70,6 +78,18 @@ impl HandleWrapper {
let _ = self.handle.send(msg).await;
Ok(())
}

async fn state(&self) -> Result<<Self as Handle>::State, ActorError<HelloWorldEvent>> {
let (tx, rx) = flume::bounded(1);

let msg = HelloWorldEvent::State { reply: tx };
let _ = self.handle.send(msg).await;
Ok(rx.recv_async().await.map_err(|e| {
let msg = format!("Send cancelled {e:}");
tracing::error!(msg);
ActorError::ActorError(msg)
})?)
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -134,6 +154,9 @@ impl HelloWorldActor {
let (_sender, receiver) = flume::unbounded();
self.receiver = receiver;
}
HelloWorldEvent::State { reply } => {
reply.send(self.state.clone()).ok();
}
}
tracing::debug!("Processed");
}
Expand Down Expand Up @@ -192,9 +215,24 @@ fn main() -> Result<(), ActorError<HelloWorldEvent>> {
handle_wrapper.say_hello().await.ok();
tracing::info!("Sent say hello request");

if let Ok(state) = handle_wrapper.state().await {
tracing::info!("Actor state is {:?}", state);
}

handle_wrapper.stop().await.ok();
tracing::info!("Sent stop request");

if let Ok(state) = handle_wrapper.state().await {
tracing::info!("Actor state is {:?}", state);
}

// Expect this to fail due to how the actor is using state to
// drop certain requests (see line 132)
handle_wrapper
.say_hello()
.await
.expect_err("Actor still responded to say_hello despite being stopped");

// without this call, because we cloned the handle above, the program would never terminate
handle_wrapper.shutdown().await.ok();
tracing::info!("Sent shutdown request");
Expand Down
18 changes: 13 additions & 5 deletions src/handle.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
use crate::{Actor, ActorError, ActorId, ActorState, Event, SupervisorMessage};

pub trait Handle {}
pub trait Handle {
type State;
}

pub trait State {}

impl<T: Event + Send> Handle for ActorHandle<T> {}
impl<T: Event + Send> Handle for ActorHandle<T> {
type State = Box<dyn State>;
}

#[derive(Clone)]
pub struct ActorHandle<T: Event + Send> {
Expand Down Expand Up @@ -43,7 +49,9 @@ impl<T: Event + Send> Clone for SupervisedActorHandle<T> {
}
}

impl<T: Event + Send> Handle for SupervisedActorHandle<T> {}
impl<T: Event + Send> Handle for SupervisedActorHandle<T> {
type State = ActorState;
}

#[async_trait::async_trait]
pub trait SupervisedHandle: Handle {
Expand All @@ -52,7 +60,7 @@ pub trait SupervisedHandle: Handle {
async fn send_shutdown(&self) -> Result<(), Self::Error>;
async fn send_start(&self) -> Result<(), Self::Error>;
async fn subscribe_direct(&self) -> Self::Rx;
async fn actor_state(&self) -> Result<ActorState, Self::Error>;
async fn actor_state(&self) -> Result<<Self as Handle>::State, Self::Error>;
}

#[async_trait::async_trait]
Expand All @@ -68,7 +76,7 @@ impl<T: Event + Send> SupervisedHandle for SupervisedActorHandle<T> {
async fn subscribe_direct(&self) -> Self::Rx {
self.handle_subscribe_direct().await
}
async fn actor_state(&self) -> Result<ActorState, Self::Error> {
async fn actor_state(&self) -> Result<<Self as Handle>::State, Self::Error> {
self.get_state().await
}
}
Expand Down
87 changes: 87 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,90 @@ pub use error::ActorError;
pub use supervisor::{Supervision, Supervisor, SupervisorHandle, SupervisorMessage};

pub type ActorId = u16;

#[cfg(test)]
mod tests {
use super::handle::ActorHandle;
use super::*;

#[test]
fn single_event() {
pub type Reply<T> = flume::Sender<T>;
pub enum HelloEvent {
SayHello { reply: Reply<()> },
}
impl Event for HelloEvent {}
struct HelloActor {
receiver: flume::Receiver<HelloEvent>,
}

impl HelloActor {
fn new(receiver: flume::Receiver<HelloEvent>) -> Self {
Self { receiver }
}
async fn say_hello(&mut self) {
println!("Hello hi hey!!");
}

async fn get_single_event(mut self) -> Result<(), ActorError<HelloEvent>> {
let HelloEvent::SayHello { reply } = self
.receiver
.recv_async()
.await
.expect("Failed to receive event");
self.say_hello().await;
reply.send(()).expect("Failed to send reply");

Ok(())
}
}

#[async_trait::async_trait]
impl Actor<HelloEvent> for HelloActor
where
HelloEvent: Event + Send,
{
type Rx = futures::channel::mpsc::Receiver<HelloEvent>;
type Error = ActorError<HelloEvent>;
type Result = Result<(), Self::Error>;
async fn run(self) -> Self::Result {
self.get_single_event().await
}
}

impl ActorHandle<HelloEvent> {
async fn say_hello(&self) -> Result<(), ActorError<HelloEvent>> {
let (tx, rx) = flume::bounded(1);
let msg = HelloEvent::SayHello { reply: tx };
self.send(msg)
.await
.expect("Faied to send from actor handle");
rx.recv_async()
.await
.expect("Failed to recv from actor handle");
Ok(())
}
}

let (actor, handle) = ActorHandle::new(HelloActor::new);

let handle = glommio::LocalExecutorBuilder::new(glommio::Placement::Fixed(0))
.name(&format!("{}{}", "test-handle", 0))
.spawn(move || async move {
let tq = glommio::executor().create_task_queue(
glommio::Shares::default(),
glommio::Latency::NotImportant,
"test",
);

let task = glommio::spawn_local_into(actor.run(), tq)
.map(|t| t.detach())
.expect("Failed to spawn detached task");
handle.say_hello().await.expect("Failed to say hello");
task.await;
})
.expect("Failed to execute");

handle.join().expect("Failed to join glommio join handle");
}
}

0 comments on commit 1c27016

Please sign in to comment.