Skip to content

Commit

Permalink
Add GlommioSleep struct and associated sleep methods
Browse files Browse the repository at this point in the history
  • Loading branch information
nand-nor committed Jan 3, 2025
1 parent d88a9eb commit cc799bc
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 23 deletions.
10 changes: 2 additions & 8 deletions examples/priority/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,8 @@ impl HelloWorldActor {
}

async fn event_loop(mut self) -> Result<(), ActorError<HelloWorldEvent>> {
loop {
match self.receiver.recv().await {
Ok((event, priority)) => self.process(event, priority).await,
Err(e) => {
tracing::warn!("Channel error {e:}");
break;
}
}
while let Ok((event, priority)) = self.receiver.recv().await {
self.process(event, priority).await
}
Ok(())
}
Expand Down
11 changes: 4 additions & 7 deletions examples/simple/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,10 @@ impl HelloWorldActor {

async fn event_loop(mut self) -> Result<(), ActorError<HelloWorldEvent>> {
self.state = ActorState::Running;
loop {
match self.receiver.recv_async().await {
Ok(event) => self.process(event).await,
Err(e) => {
tracing::warn!("Channel error {e:}");
break;
}
while let Ok(event) = self.receiver.recv_async().await {
self.process(event).await;
if self.state == ActorState::Stopping {
break;
}
}
self.state = ActorState::Stopped;
Expand Down
4 changes: 2 additions & 2 deletions examples/supervisor_core/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures::FutureExt;
use glommactor::{
handle::SupervisedActorHandle, spawn_exec_actor_with_shutdown,
spawn_exec_handle_fut_with_shutdown, Actor, ActorError, ActorId, ActorState, Event,
SupervisedActor, Supervision, SupervisorHandle, SupervisorMessage,
GlommioSleep, SupervisedActor, Supervision, SupervisorHandle, SupervisorMessage,
};
use glommio::{executor, Latency, LocalExecutorBuilder, Placement, Shares};
use std::time::Duration;
Expand Down Expand Up @@ -235,7 +235,7 @@ fn main() -> Result<(), ActorError<HelloWorldEvent>> {
tracing::error!("Failed to send say hello {e:}");
}
// wait a few seconds
glommio::timer::sleep(Duration::from_secs(5)).await;
GlommioSleep::sleep(Duration::from_secs(5)).await;
tracing::info!("Supervisor suspending actor... (demonstrating restart!)");
supervised_handle.suspend_actor(id).await.ok();
};
Expand Down
42 changes: 38 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ pub mod channel;
mod error;
pub mod handle;
mod supervisor;

use std::future::Future;

pub use actor::{Actor, ActorState, Event, PriorityActor, PriorityEvent, SupervisedActor};
pub use channel::{ChannelRx, ChannelTx, PriorityChannelRx, PriorityRx};
pub use error::ActorError;
pub use handle::{ActorHandle, Handle, PriorityHandle, SupervisedActorHandle, SupervisedHandle};
pub use supervisor::{Supervision, Supervisor, SupervisorHandle, SupervisorMessage};

pub type ActorId = u16;

pub type PriorityActorHandle<T> = handle::ActorHandle<
Expand All @@ -36,6 +34,42 @@ pub fn new_priority_actor_with_handle<T: Event + Send, A: Actor<T> + Sized + Unp
handle::ActorHandle::new(constructor, tx, rx)
}

pub struct GlommioSleep {
pub inner: glommio::timer::Timer,
}

impl futures::Future for GlommioSleep {
type Output = ();

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
match std::pin::Pin::new(&mut self.inner).poll(cx) {
std::task::Poll::Ready(_) => std::task::Poll::Ready(()),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}

unsafe impl Send for GlommioSleep {}
unsafe impl Sync for GlommioSleep {}

impl GlommioSleep {
pub fn sleep_until(deadline: std::time::Instant) -> impl futures::Future<Output = ()> {
let duration = deadline.saturating_duration_since(std::time::Instant::now());
Box::pin(GlommioSleep {
inner: glommio::timer::Timer::new(duration),
})
}

pub fn sleep(duration: std::time::Duration) -> impl futures::Future<Output = ()> {
Box::pin(GlommioSleep {
inner: glommio::timer::Timer::new(duration),
})
}
}

/// # Panics
///
/// Will panic if spawning actor onto tq fails
Expand Down Expand Up @@ -214,15 +248,15 @@ pub fn spawn_exec_handle_futs_with_shutdown<
>(
placement: Placement,
name: &'static str,
mut task_vec: Fuse<F>, //Vec<glommio::task::JoinHandle<()>>,
mut task_vec: Fuse<F>,
mut receiver: async_broadcast::Receiver<()>,
) -> Result<ExecutorJoinHandle<()>, GlommioError<()>> {
LocalExecutorBuilder::new(placement)
.name(name)
.spawn(move || async move {
futures::select! {
_ = receiver.recv().fuse() => {
tracing::info!("Shutdown notice received by rt-supervisor");
tracing::info!("Shutdown notice received");
}
_ = task_vec => {}
};
Expand Down
7 changes: 5 additions & 2 deletions src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use crate::{
handle::{ActorHandle, Handle},
Actor, ActorError, ActorId, ActorState, Event,
Actor, ActorError, ActorId, ActorState, Event, GlommioSleep,
};
use futures::FutureExt;

Expand Down Expand Up @@ -354,7 +354,10 @@ impl SupervisorHandle {
let (tx, rx) = flume::bounded(1);
let msg = SupervisorAction::Heartbeat { id, reply: tx };
self.handle.send(msg).await?;
let mut fused_timer = glommio::timer::Timer::new(std::time::Duration::from_secs(5)).fuse();
let mut fused_timer = GlommioSleep {
inner: glommio::timer::Timer::new(std::time::Duration::from_secs(5)),
}
.fuse();
futures::select! {
_ = fused_timer => {
Err(ActorError::HeartbeatTimeout(id))
Expand Down

0 comments on commit cc799bc

Please sign in to comment.