diff --git a/nexosim/Cargo.toml b/nexosim/Cargo.toml index e7cd495..6813d52 100644 --- a/nexosim/Cargo.toml +++ b/nexosim/Cargo.toml @@ -20,11 +20,27 @@ description = """ A high performance asychronous compute framework for system simulation. """ categories = ["simulation", "aerospace", "science"] -keywords = ["simulation", "discrete-event", "systems", "cyberphysical", "real-time"] +keywords = [ + "simulation", + "discrete-event", + "systems", + "cyberphysical", + "real-time", +] [features] # gRPC service. -grpc = ["dep:bytes", "dep:ciborium", "dep:prost", "dep:prost-types", "dep:serde", "dep:tonic", "dep:tokio", "dep:tonic"] +grpc = [ + "dep:bytes", + "dep:ciborium", + "dep:prost", + "dep:prost-types", + "dep:serde", + "dep:tonic", + "dep:tokio", + "dep:tonic", + "tai-time/serde", +] tracing = ["dep:tracing", "dep:tracing-subscriber"] # DEVELOPMENT ONLY: API-unstable public exports meant for external test/benchmarking. @@ -54,15 +70,24 @@ ciborium = { version = "0.2.2", optional = true } prost = { version = "0.13", optional = true } prost-types = { version = "0.13", optional = true } serde = { version = "1", optional = true } -tokio = { version = "1.0", features=["net", "rt-multi-thread"], optional = true } -tonic = { version = "0.12", default-features = false, features=["codegen", "prost", "server"], optional = true } -tracing = { version= "0.1.40", default-features = false, features=["std"], optional = true } -tracing-subscriber = { version= "0.3.18", optional = true } +tokio = { version = "1.0", features = [ + "net", + "rt-multi-thread", +], optional = true } +tonic = { version = "0.12", default-features = false, features = [ + "codegen", + "prost", + "server", +], optional = true } +tracing = { version = "0.1.40", default-features = false, features = [ + "std", +], optional = true } +tracing-subscriber = { version = "0.3.18", optional = true } [dev-dependencies] futures-util = "0.3" futures-executor = "0.3" -tracing-subscriber = { version= "0.3.18", features=["env-filter"] } +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } [target.'cfg(nexosim_loom)'.dev-dependencies] loom = "0.7" @@ -74,7 +99,10 @@ tonic-build = { version = "0.12" } [lints.rust] # `nexosim_loom` flag: run loom-based tests. # `nexosim_grpc_codegen` flag: regenerate gRPC code from .proto definitions. -unexpected_cfgs = { level = "warn", check-cfg = ['cfg(nexosim_loom)', 'cfg(nexosim_grpc_codegen)'] } +unexpected_cfgs = { level = "warn", check-cfg = [ + 'cfg(nexosim_loom)', + 'cfg(nexosim_grpc_codegen)', +] } [package.metadata.docs.rs] all-features = true diff --git a/nexosim/src/grpc/api/simulation.proto b/nexosim/src/grpc/api/simulation.proto index b40a998..1841f63 100644 --- a/nexosim/src/grpc/api/simulation.proto +++ b/nexosim/src/grpc/api/simulation.proto @@ -9,24 +9,25 @@ import "google/protobuf/empty.proto"; enum ErrorCode { INTERNAL_ERROR = 0; - SIMULATION_NOT_STARTED = 1; - SIMULATION_TERMINATED = 2; - SIMULATION_DEADLOCK = 3; - SIMULATION_MESSAGE_LOSS = 4; - SIMULATION_NO_RECIPIENT = 5; - SIMULATION_PANIC = 6; - SIMULATION_TIMEOUT = 7; - SIMULATION_OUT_OF_SYNC = 8; - SIMULATION_BAD_QUERY = 9; - SIMULATION_TIME_OUT_OF_RANGE = 10; - MISSING_ARGUMENT = 20; - INVALID_TIME = 30; - INVALID_PERIOD = 31; - INVALID_DEADLINE = 32; - INVALID_MESSAGE = 33; - INVALID_KEY = 34; - SOURCE_NOT_FOUND = 40; - SINK_NOT_FOUND = 41; + MISSING_ARGUMENT = 1; + INVALID_TIME = 2; + INVALID_PERIOD = 3; + INVALID_DEADLINE = 4; + INVALID_MESSAGE = 5; + INVALID_KEY = 6; + INITIALIZER_PANIC = 10; + SIMULATION_NOT_STARTED = 11; + SIMULATION_TERMINATED = 12; + SIMULATION_DEADLOCK = 13; + SIMULATION_MESSAGE_LOSS = 14; + SIMULATION_NO_RECIPIENT = 15; + SIMULATION_PANIC = 16; + SIMULATION_TIMEOUT = 17; + SIMULATION_OUT_OF_SYNC = 18; + SIMULATION_BAD_QUERY = 19; + SIMULATION_TIME_OUT_OF_RANGE = 20; + SOURCE_NOT_FOUND = 30; + SINK_NOT_FOUND = 31; } message Error { @@ -39,9 +40,7 @@ message EventKey { uint64 subkey2 = 2; } -message InitRequest { - bytes cfg = 2; -} +message InitRequest { bytes cfg = 2; } message InitReply { oneof result { // Always returns exactly 1 variant. google.protobuf.Empty empty = 1; diff --git a/nexosim/src/grpc/codegen/simulation.v1.rs b/nexosim/src/grpc/codegen/simulation.v1.rs index 8346678..bbbd6a5 100644 --- a/nexosim/src/grpc/codegen/simulation.v1.rs +++ b/nexosim/src/grpc/codegen/simulation.v1.rs @@ -335,24 +335,25 @@ pub mod any_request { #[repr(i32)] pub enum ErrorCode { InternalError = 0, - SimulationNotStarted = 1, - SimulationTerminated = 2, - SimulationDeadlock = 3, - SimulationMessageLoss = 4, - SimulationNoRecipient = 5, - SimulationPanic = 6, - SimulationTimeout = 7, - SimulationOutOfSync = 8, - SimulationBadQuery = 9, - SimulationTimeOutOfRange = 10, - MissingArgument = 20, - InvalidTime = 30, - InvalidPeriod = 31, - InvalidDeadline = 32, - InvalidMessage = 33, - InvalidKey = 34, - SourceNotFound = 40, - SinkNotFound = 41, + MissingArgument = 1, + InvalidTime = 2, + InvalidPeriod = 3, + InvalidDeadline = 4, + InvalidMessage = 5, + InvalidKey = 6, + InitializerPanic = 10, + SimulationNotStarted = 11, + SimulationTerminated = 12, + SimulationDeadlock = 13, + SimulationMessageLoss = 14, + SimulationNoRecipient = 15, + SimulationPanic = 16, + SimulationTimeout = 17, + SimulationOutOfSync = 18, + SimulationBadQuery = 19, + SimulationTimeOutOfRange = 20, + SourceNotFound = 30, + SinkNotFound = 31, } impl ErrorCode { /// String value of the enum field names used in the ProtoBuf definition. @@ -362,6 +363,13 @@ impl ErrorCode { pub fn as_str_name(&self) -> &'static str { match self { Self::InternalError => "INTERNAL_ERROR", + Self::MissingArgument => "MISSING_ARGUMENT", + Self::InvalidTime => "INVALID_TIME", + Self::InvalidPeriod => "INVALID_PERIOD", + Self::InvalidDeadline => "INVALID_DEADLINE", + Self::InvalidMessage => "INVALID_MESSAGE", + Self::InvalidKey => "INVALID_KEY", + Self::InitializerPanic => "INITIALIZER_PANIC", Self::SimulationNotStarted => "SIMULATION_NOT_STARTED", Self::SimulationTerminated => "SIMULATION_TERMINATED", Self::SimulationDeadlock => "SIMULATION_DEADLOCK", @@ -372,12 +380,6 @@ impl ErrorCode { Self::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC", Self::SimulationBadQuery => "SIMULATION_BAD_QUERY", Self::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE", - Self::MissingArgument => "MISSING_ARGUMENT", - Self::InvalidTime => "INVALID_TIME", - Self::InvalidPeriod => "INVALID_PERIOD", - Self::InvalidDeadline => "INVALID_DEADLINE", - Self::InvalidMessage => "INVALID_MESSAGE", - Self::InvalidKey => "INVALID_KEY", Self::SourceNotFound => "SOURCE_NOT_FOUND", Self::SinkNotFound => "SINK_NOT_FOUND", } @@ -386,6 +388,13 @@ impl ErrorCode { pub fn from_str_name(value: &str) -> ::core::option::Option { match value { "INTERNAL_ERROR" => Some(Self::InternalError), + "MISSING_ARGUMENT" => Some(Self::MissingArgument), + "INVALID_TIME" => Some(Self::InvalidTime), + "INVALID_PERIOD" => Some(Self::InvalidPeriod), + "INVALID_DEADLINE" => Some(Self::InvalidDeadline), + "INVALID_MESSAGE" => Some(Self::InvalidMessage), + "INVALID_KEY" => Some(Self::InvalidKey), + "INITIALIZER_PANIC" => Some(Self::InitializerPanic), "SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted), "SIMULATION_TERMINATED" => Some(Self::SimulationTerminated), "SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock), @@ -396,12 +405,6 @@ impl ErrorCode { "SIMULATION_OUT_OF_SYNC" => Some(Self::SimulationOutOfSync), "SIMULATION_BAD_QUERY" => Some(Self::SimulationBadQuery), "SIMULATION_TIME_OUT_OF_RANGE" => Some(Self::SimulationTimeOutOfRange), - "MISSING_ARGUMENT" => Some(Self::MissingArgument), - "INVALID_TIME" => Some(Self::InvalidTime), - "INVALID_PERIOD" => Some(Self::InvalidPeriod), - "INVALID_DEADLINE" => Some(Self::InvalidDeadline), - "INVALID_MESSAGE" => Some(Self::InvalidMessage), - "INVALID_KEY" => Some(Self::InvalidKey), "SOURCE_NOT_FOUND" => Some(Self::SourceNotFound), "SINK_NOT_FOUND" => Some(Self::SinkNotFound), _ => None, diff --git a/nexosim/src/grpc/key_registry.rs b/nexosim/src/grpc/key_registry.rs index 0c6678e..8247a2d 100644 --- a/nexosim/src/grpc/key_registry.rs +++ b/nexosim/src/grpc/key_registry.rs @@ -13,8 +13,8 @@ pub(crate) struct KeyRegistry { impl KeyRegistry { /// Inserts an `ActionKey` into the registry. /// - /// The provided expiration deadline is the latest time at which the key may - /// still be active. + /// The provided expiration deadline is the latest time at which the key is + /// guaranteed to be extractable. pub(crate) fn insert_key( &mut self, action_key: ActionKey, diff --git a/nexosim/src/grpc/run.rs b/nexosim/src/grpc/run.rs index d74f255..a5c2d02 100644 --- a/nexosim/src/grpc/run.rs +++ b/nexosim/src/grpc/run.rs @@ -1,6 +1,7 @@ //! gRPC simulation service. use std::net::SocketAddr; +use std::sync::Arc; use std::sync::Mutex; use std::sync::MutexGuard; @@ -13,7 +14,7 @@ use crate::simulation::{Simulation, SimulationError}; use super::codegen::simulation::*; use super::key_registry::KeyRegistry; use super::services::InitService; -use super::services::{ControllerService, MonitorService}; +use super::services::{ControllerService, MonitorService, SchedulerService}; /// Runs a gRPC simulation server. /// @@ -37,7 +38,8 @@ fn run_service( service: GrpcSimulationService, addr: SocketAddr, ) -> Result<(), Box> { - // Use 2 threads so that the controller and monitor services can be used + // Use 2 threads so that the even if the controller service is blocked due + // to ongoing simulation execution, other services can still be used // concurrently. let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(2) @@ -58,6 +60,7 @@ struct GrpcSimulationService { init_service: Mutex, controller_service: Mutex, monitor_service: Mutex, + scheduler_service: Mutex, } impl GrpcSimulationService { @@ -76,6 +79,7 @@ impl GrpcSimulationService { init_service: Mutex::new(InitService::new(sim_gen)), controller_service: Mutex::new(ControllerService::NotStarted), monitor_service: Mutex::new(MonitorService::NotStarted), + scheduler_service: Mutex::new(SchedulerService::NotStarted), } } @@ -93,6 +97,11 @@ impl GrpcSimulationService { fn monitor(&self) -> MutexGuard<'_, MonitorService> { self.monitor_service.lock().unwrap() } + + /// Locks the scheduler and returns the mutex guard. + fn scheduler(&self) -> MutexGuard<'_, SchedulerService> { + self.scheduler_service.lock().unwrap() + } } #[tonic::async_trait] @@ -103,15 +112,22 @@ impl simulation_server::Simulation for GrpcSimulationService { let (reply, bench) = self.initializer().init(request); if let Some((simulation, scheduler, endpoint_registry)) = bench { + let event_source_registry = Arc::new(endpoint_registry.event_source_registry); + let query_source_registry = endpoint_registry.query_source_registry; + let event_sink_registry = endpoint_registry.event_sink_registry; + *self.controller() = ControllerService::Started { simulation, - scheduler, - event_source_registry: endpoint_registry.event_source_registry, - query_source_registry: endpoint_registry.query_source_registry, - key_registry: KeyRegistry::default(), + event_source_registry: event_source_registry.clone(), + query_source_registry, }; *self.monitor() = MonitorService::Started { - event_sink_registry: endpoint_registry.event_sink_registry, + event_sink_registry, + }; + *self.scheduler() = SchedulerService::Started { + scheduler, + event_source_registry, + key_registry: KeyRegistry::default(), }; } @@ -120,7 +136,7 @@ impl simulation_server::Simulation for GrpcSimulationService { async fn time(&self, request: Request) -> Result, Status> { let request = request.into_inner(); - Ok(Response::new(self.controller().time(request))) + Ok(Response::new(self.scheduler().time(request))) } async fn step(&self, request: Request) -> Result, Status> { let request = request.into_inner(); @@ -141,7 +157,7 @@ impl simulation_server::Simulation for GrpcSimulationService { ) -> Result, Status> { let request = request.into_inner(); - Ok(Response::new(self.controller().schedule_event(request))) + Ok(Response::new(self.scheduler().schedule_event(request))) } async fn cancel_event( &self, @@ -149,7 +165,7 @@ impl simulation_server::Simulation for GrpcSimulationService { ) -> Result, Status> { let request = request.into_inner(); - Ok(Response::new(self.controller().cancel_event(request))) + Ok(Response::new(self.scheduler().cancel_event(request))) } async fn process_event( &self, diff --git a/nexosim/src/grpc/services.rs b/nexosim/src/grpc/services.rs index 4907401..05b5e9e 100644 --- a/nexosim/src/grpc/services.rs +++ b/nexosim/src/grpc/services.rs @@ -1,6 +1,7 @@ mod controller_service; mod init_service; mod monitor_service; +mod scheduler_service; use std::time::Duration; @@ -13,6 +14,7 @@ use crate::simulation::{ExecutionError, SchedulingError, SimulationError}; pub(crate) use controller_service::ControllerService; pub(crate) use init_service::InitService; pub(crate) use monitor_service::MonitorService; +pub(crate) use scheduler_service::SchedulerService; /// Transforms an error code and a message into a Protobuf error. fn to_error(code: ErrorCode, message: impl Into) -> Error { diff --git a/nexosim/src/grpc/services/controller_service.rs b/nexosim/src/grpc/services/controller_service.rs index 32f5ac7..ae93e0e 100644 --- a/nexosim/src/grpc/services/controller_service.rs +++ b/nexosim/src/grpc/services/controller_service.rs @@ -1,59 +1,32 @@ use std::fmt; +use std::sync::Arc; use prost_types::Timestamp; -use crate::grpc::key_registry::{KeyRegistry, KeyRegistryId}; use crate::registry::{EventSourceRegistry, QuerySourceRegistry}; -use crate::simulation::{Scheduler, Simulation}; +use crate::simulation::Simulation; use super::super::codegen::simulation::*; use super::{ - map_execution_error, map_scheduling_error, monotonic_to_timestamp, - simulation_not_started_error, timestamp_to_monotonic, to_error, to_positive_duration, - to_strictly_positive_duration, + map_execution_error, monotonic_to_timestamp, simulation_not_started_error, + timestamp_to_monotonic, to_error, to_positive_duration, }; -/// Protobuf-based simulation manager. +/// Protobuf-based simulation controller. /// -/// A `ControllerService` enables the management of the lifecycle of a -/// simulation. -/// -/// Its methods map the various RPC simulation control service methods defined -/// in `simulation.proto`. +/// A `ControllerService` controls the execution of the simulation. Note that +/// all its methods block until execution completes. #[allow(clippy::large_enum_variant)] pub(crate) enum ControllerService { NotStarted, Started { simulation: Simulation, - scheduler: Scheduler, - event_source_registry: EventSourceRegistry, + event_source_registry: Arc, query_source_registry: QuerySourceRegistry, - key_registry: KeyRegistry, }, } impl ControllerService { - /// Returns the current simulation time. - pub(crate) fn time(&mut self, _request: TimeRequest) -> TimeReply { - let reply = match self { - Self::Started { simulation, .. } => { - if let Some(timestamp) = monotonic_to_timestamp(simulation.time()) { - time_reply::Result::Time(timestamp) - } else { - time_reply::Result::Error(to_error( - ErrorCode::SimulationTimeOutOfRange, - "the final simulation time is out of range", - )) - } - } - Self::NotStarted => time_reply::Result::Error(simulation_not_started_error()), - }; - - TimeReply { - result: Some(reply), - } - } - /// Advances simulation time to that of the next scheduled event, processing /// that event as well as all other events scheduled for the same time. /// @@ -144,155 +117,6 @@ impl ControllerService { } } - /// Schedules an event at a future time. - pub(crate) fn schedule_event(&mut self, request: ScheduleEventRequest) -> ScheduleEventReply { - let reply = match self { - Self::Started { - simulation, - scheduler, - event_source_registry, - key_registry, - .. - } => move || -> Result, Error> { - let source_name = &request.source_name; - let event = &request.event; - let with_key = request.with_key; - let period = request - .period - .map(|period| { - to_strictly_positive_duration(period).ok_or(to_error( - ErrorCode::InvalidPeriod, - "the specified event period is not strictly positive", - )) - }) - .transpose()?; - - let deadline = request.deadline.ok_or(to_error( - ErrorCode::MissingArgument, - "missing deadline argument", - ))?; - - let deadline = match deadline { - schedule_event_request::Deadline::Time(time) => timestamp_to_monotonic(time) - .ok_or(to_error( - ErrorCode::InvalidTime, - "out-of-range nanosecond field", - ))?, - schedule_event_request::Deadline::Duration(duration) => { - let duration = to_strictly_positive_duration(duration).ok_or(to_error( - ErrorCode::InvalidDeadline, - "the specified scheduling deadline is not in the future", - ))?; - - simulation.time() + duration - } - }; - - let source = event_source_registry.get_mut(source_name).ok_or(to_error( - ErrorCode::SourceNotFound, - "no event source is registered with the name '{}'".to_string(), - ))?; - - let (action, action_key) = match (with_key, period) { - (false, None) => source.event(event).map(|action| (action, None)), - (false, Some(period)) => source - .periodic_event(period, event) - .map(|action| (action, None)), - (true, None) => source - .keyed_event(event) - .map(|(action, key)| (action, Some(key))), - (true, Some(period)) => source - .keyed_periodic_event(period, event) - .map(|(action, key)| (action, Some(key))), - } - .map_err(|e| { - to_error( - ErrorCode::InvalidMessage, - format!( - "the event could not be deserialized as type '{}': {}", - source.event_type_name(), - e - ), - ) - })?; - - let key_id = action_key.map(|action_key| { - // Free stale keys from the registry. - key_registry.remove_expired_keys(simulation.time()); - - if period.is_some() { - key_registry.insert_eternal_key(action_key) - } else { - key_registry.insert_key(action_key, deadline) - } - }); - - scheduler - .schedule(deadline, action) - .map_err(map_scheduling_error)?; - - Ok(key_id) - }(), - Self::NotStarted => Err(simulation_not_started_error()), - }; - - ScheduleEventReply { - result: Some(match reply { - Ok(Some(key_id)) => { - let (subkey1, subkey2) = key_id.into_raw_parts(); - schedule_event_reply::Result::Key(EventKey { - subkey1: subkey1 - .try_into() - .expect("action key index is too large to be serialized"), - subkey2, - }) - } - Ok(None) => schedule_event_reply::Result::Empty(()), - Err(error) => schedule_event_reply::Result::Error(error), - }), - } - } - - /// Cancels a keyed event. - pub(crate) fn cancel_event(&mut self, request: CancelEventRequest) -> CancelEventReply { - let reply = match self { - Self::Started { - simulation, - key_registry, - .. - } => move || -> Result<(), Error> { - let key = request - .key - .ok_or(to_error(ErrorCode::MissingArgument, "missing key argument"))?; - let subkey1: usize = key - .subkey1 - .try_into() - .map_err(|_| to_error(ErrorCode::InvalidKey, "invalid event key"))?; - let subkey2 = key.subkey2; - - let key_id = KeyRegistryId::from_raw_parts(subkey1, subkey2); - - key_registry.remove_expired_keys(simulation.time()); - let key = key_registry.extract_key(key_id).ok_or(to_error( - ErrorCode::InvalidKey, - "invalid or expired event key", - ))?; - - key.cancel(); - - Ok(()) - }(), - Self::NotStarted => Err(simulation_not_started_error()), - }; - - CancelEventReply { - result: Some(match reply { - Ok(()) => cancel_event_reply::Result::Empty(()), - Err(error) => cancel_event_reply::Result::Error(error), - }), - } - } - /// Broadcasts an event from an event source immediately, blocking until /// completion. /// @@ -307,7 +131,7 @@ impl ControllerService { let source_name = &request.source_name; let event = &request.event; - let source = event_source_registry.get_mut(source_name).ok_or(to_error( + let source = event_source_registry.get(source_name).ok_or(to_error( ErrorCode::SourceNotFound, "no source is registered with the name '{}'".to_string(), ))?; @@ -350,7 +174,7 @@ impl ControllerService { let source_name = &request.source_name; let request = &request.request; - let source = query_source_registry.get_mut(source_name).ok_or(to_error( + let source = query_source_registry.get(source_name).ok_or(to_error( ErrorCode::SourceNotFound, "no source is registered with the name '{}'".to_string(), ))?; diff --git a/nexosim/src/grpc/services/init_service.rs b/nexosim/src/grpc/services/init_service.rs index 9eac3f4..7fca63b 100644 --- a/nexosim/src/grpc/services/init_service.rs +++ b/nexosim/src/grpc/services/init_service.rs @@ -1,3 +1,5 @@ +use std::panic::{self, AssertUnwindSafe}; + use ciborium; use serde::de::DeserializeOwned; @@ -16,8 +18,6 @@ type SimGen = Box Result + /// /// An `InitService` creates a new simulation bench based on a serialized /// initialization configuration. -/// -/// It maps the `Init` method defined in `simulation.proto`. pub(crate) struct InitService { sim_gen: SimGen, } @@ -51,17 +51,39 @@ impl InitService { &mut self, request: InitRequest, ) -> (InitReply, Option<(Simulation, Scheduler, EndpointRegistry)>) { - let reply = (self.sim_gen)(&request.cfg) - .map_err(|e| { - to_error( - ErrorCode::InvalidMessage, + let reply = panic::catch_unwind(AssertUnwindSafe(|| (self.sim_gen)(&request.cfg))) + .map_err(|payload| { + let panic_msg: Option<&str> = if let Some(s) = payload.downcast_ref::<&str>() { + Some(s) + } else if let Some(s) = payload.downcast_ref::() { + Some(s) + } else { + None + }; + + let error_msg = if let Some(panic_msg) = panic_msg { format!( - "the initialization configuration could not be deserialized: {}", - e - ), - ) + "the simulation initializer has panicked with the message `{}`", + panic_msg + ) + } else { + String::from("the simulation initializer has panicked") + }; + + to_error(ErrorCode::InitializerPanic, error_msg) }) - .and_then(|init_result| init_result.map_err(map_simulation_error)); + .and_then(|res| { + res.map_err(|e| { + to_error( + ErrorCode::InvalidMessage, + format!( + "the initializer configuration could not be deserialized: {}", + e + ), + ) + }) + .and_then(|init_result| init_result.map_err(map_simulation_error)) + }); let (reply, bench) = match reply { Ok((simulation, registry)) => { diff --git a/nexosim/src/grpc/services/monitor_service.rs b/nexosim/src/grpc/services/monitor_service.rs index b6f1fc1..964715b 100644 --- a/nexosim/src/grpc/services/monitor_service.rs +++ b/nexosim/src/grpc/services/monitor_service.rs @@ -9,9 +9,6 @@ use super::{simulation_not_started_error, to_error}; /// /// A `MonitorService` enables the monitoring of the event sinks of a /// [`Simulation`](crate::simulation::Simulation). -/// -/// Its methods map the various RPC monitoring service methods defined in -/// `simulation.proto`. pub(crate) enum MonitorService { Started { event_sink_registry: EventSinkRegistry, diff --git a/nexosim/src/grpc/services/scheduler_service.rs b/nexosim/src/grpc/services/scheduler_service.rs new file mode 100644 index 0000000..7c818cf --- /dev/null +++ b/nexosim/src/grpc/services/scheduler_service.rs @@ -0,0 +1,200 @@ +use std::fmt; +use std::sync::Arc; + +use crate::grpc::key_registry::{KeyRegistry, KeyRegistryId}; +use crate::registry::EventSourceRegistry; +use crate::simulation::Scheduler; + +use super::super::codegen::simulation::*; +use super::{ + map_scheduling_error, monotonic_to_timestamp, simulation_not_started_error, + timestamp_to_monotonic, to_error, to_strictly_positive_duration, +}; + +/// Protobuf-based simulation scheduler. +/// +/// A `SchedulerService` enables the scheduling of simulation events. +#[allow(clippy::large_enum_variant)] +pub(crate) enum SchedulerService { + NotStarted, + Started { + scheduler: Scheduler, + event_source_registry: Arc, + key_registry: KeyRegistry, + }, +} + +impl SchedulerService { + /// Returns the current simulation time. + pub(crate) fn time(&mut self, _request: TimeRequest) -> TimeReply { + let reply = match self { + Self::Started { scheduler, .. } => { + if let Some(timestamp) = monotonic_to_timestamp(scheduler.time()) { + time_reply::Result::Time(timestamp) + } else { + time_reply::Result::Error(to_error( + ErrorCode::SimulationTimeOutOfRange, + "the final simulation time is out of range", + )) + } + } + Self::NotStarted => time_reply::Result::Error(simulation_not_started_error()), + }; + + TimeReply { + result: Some(reply), + } + } + + /// Schedules an event at a future time. + pub(crate) fn schedule_event(&mut self, request: ScheduleEventRequest) -> ScheduleEventReply { + let reply = match self { + Self::Started { + scheduler, + event_source_registry, + key_registry, + } => move || -> Result, Error> { + let source_name = &request.source_name; + let event = &request.event; + let with_key = request.with_key; + let period = request + .period + .map(|period| { + to_strictly_positive_duration(period).ok_or(to_error( + ErrorCode::InvalidPeriod, + "the specified event period is not strictly positive", + )) + }) + .transpose()?; + + let source = event_source_registry.get(source_name).ok_or(to_error( + ErrorCode::SourceNotFound, + "no event source is registered with the name '{}'".to_string(), + ))?; + + let (action, action_key) = match (with_key, period) { + (false, None) => source.event(event).map(|action| (action, None)), + (false, Some(period)) => source + .periodic_event(period, event) + .map(|action| (action, None)), + (true, None) => source + .keyed_event(event) + .map(|(action, key)| (action, Some(key))), + (true, Some(period)) => source + .keyed_periodic_event(period, event) + .map(|(action, key)| (action, Some(key))), + } + .map_err(|e| { + to_error( + ErrorCode::InvalidMessage, + format!( + "the event could not be deserialized as type '{}': {}", + source.event_type_name(), + e + ), + ) + })?; + + let deadline = request.deadline.ok_or(to_error( + ErrorCode::MissingArgument, + "missing deadline argument", + ))?; + + let deadline = match deadline { + schedule_event_request::Deadline::Time(time) => timestamp_to_monotonic(time) + .ok_or(to_error( + ErrorCode::InvalidTime, + "out-of-range nanosecond field", + ))?, + schedule_event_request::Deadline::Duration(duration) => { + let duration = to_strictly_positive_duration(duration).ok_or(to_error( + ErrorCode::InvalidDeadline, + "the specified scheduling deadline is not in the future", + ))?; + + scheduler.time() + duration + } + }; + + let key_id = action_key.map(|action_key| { + key_registry.remove_expired_keys(scheduler.time()); + + if period.is_some() { + key_registry.insert_eternal_key(action_key) + } else { + key_registry.insert_key(action_key, deadline) + } + }); + + scheduler + .schedule(deadline, action) + .map_err(map_scheduling_error)?; + + Ok(key_id) + }(), + Self::NotStarted => Err(simulation_not_started_error()), + }; + + ScheduleEventReply { + result: Some(match reply { + Ok(Some(key_id)) => { + let (subkey1, subkey2) = key_id.into_raw_parts(); + schedule_event_reply::Result::Key(EventKey { + subkey1: subkey1 + .try_into() + .expect("action key index is too large to be serialized"), + subkey2, + }) + } + Ok(None) => schedule_event_reply::Result::Empty(()), + Err(error) => schedule_event_reply::Result::Error(error), + }), + } + } + + /// Cancels a keyed event. + pub(crate) fn cancel_event(&mut self, request: CancelEventRequest) -> CancelEventReply { + let reply = match self { + Self::Started { + scheduler, + key_registry, + .. + } => move || -> Result<(), Error> { + let key = request + .key + .ok_or(to_error(ErrorCode::MissingArgument, "missing key argument"))?; + let subkey1: usize = key + .subkey1 + .try_into() + .map_err(|_| to_error(ErrorCode::InvalidKey, "invalid event key"))?; + let subkey2 = key.subkey2; + + let key_id = KeyRegistryId::from_raw_parts(subkey1, subkey2); + + key_registry.remove_expired_keys(scheduler.time()); + let key = key_registry.extract_key(key_id).ok_or(to_error( + ErrorCode::InvalidKey, + "invalid or expired event key", + ))?; + + key.cancel(); + + Ok(()) + }(), + Self::NotStarted => Err(simulation_not_started_error()), + }; + + CancelEventReply { + result: Some(match reply { + Ok(()) => cancel_event_reply::Result::Empty(()), + Err(error) => cancel_event_reply::Result::Error(error), + }), + } + } +} + +impl fmt::Debug for SchedulerService { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SchedulerService").finish_non_exhaustive() + } +} diff --git a/nexosim/src/ports/source/broadcaster.rs b/nexosim/src/ports/source/broadcaster.rs index da46d98..1a1bdac 100644 --- a/nexosim/src/ports/source/broadcaster.rs +++ b/nexosim/src/ports/source/broadcaster.rs @@ -728,6 +728,8 @@ mod tests { #[cfg(all(test, nexosim_loom))] mod tests { + use std::sync::Mutex; + use futures_channel::mpsc; use futures_util::StreamExt; @@ -743,14 +745,14 @@ mod tests { struct TestEvent { // The receiver is actually used only once in tests, so it is moved out // of the `Option` on first use. - receiver: Option>>, + receiver: Mutex>>>, } impl Sender<(), R> for TestEvent { fn send( - &mut self, + &self, _arg: &(), ) -> Option> + Send>>> { - let receiver = self.receiver.take().unwrap(); + let receiver = self.receiver.lock().unwrap().take().unwrap(); Some(Box::pin(async move { let mut stream = Box::pin(receiver.filter_map(|item| async { item })); @@ -779,7 +781,7 @@ mod tests { ( TestEvent { - receiver: Some(receiver), + receiver: Mutex::new(Some(receiver)), }, TestEventWaker { sender }, ) diff --git a/nexosim/src/registry/event_source_registry.rs b/nexosim/src/registry/event_source_registry.rs index 86258ec..848e361 100644 --- a/nexosim/src/registry/event_source_registry.rs +++ b/nexosim/src/registry/event_source_registry.rs @@ -42,8 +42,8 @@ impl EventSourceRegistry { /// Returns a mutable reference to the specified event source if it is in /// the registry. - pub(crate) fn get_mut(&mut self, name: &str) -> Option<&mut dyn EventSourceAny> { - self.0.get_mut(name).map(|s| s.as_mut()) + pub(crate) fn get(&self, name: &str) -> Option<&dyn EventSourceAny> { + self.0.get(name).map(|s| s.as_ref()) } } @@ -54,7 +54,7 @@ impl fmt::Debug for EventSourceRegistry { } /// A type-erased `EventSource` that operates on CBOR-encoded serialized events. -pub(crate) trait EventSourceAny: Send + 'static { +pub(crate) trait EventSourceAny: Send + Sync + 'static { /// Returns an action which, when processed, broadcasts an event to all /// connected input ports. /// diff --git a/nexosim/src/registry/query_source_registry.rs b/nexosim/src/registry/query_source_registry.rs index 97f9fbc..eeed7c5 100644 --- a/nexosim/src/registry/query_source_registry.rs +++ b/nexosim/src/registry/query_source_registry.rs @@ -43,8 +43,8 @@ impl QuerySourceRegistry { /// Returns a mutable reference to the specified query source if it is in /// the registry. - pub(crate) fn get_mut(&mut self, name: &str) -> Option<&mut dyn QuerySourceAny> { - self.0.get_mut(name).map(|s| s.as_mut()) + pub(crate) fn get(&self, name: &str) -> Option<&dyn QuerySourceAny> { + self.0.get(name).map(|s| s.as_ref()) } } @@ -56,7 +56,7 @@ impl fmt::Debug for QuerySourceRegistry { /// A type-erased `QuerySource` that operates on CBOR-encoded serialized queries /// and returns CBOR-encoded replies. -pub(crate) trait QuerySourceAny: Send + 'static { +pub(crate) trait QuerySourceAny: Send + Sync + 'static { /// Returns an action which, when processed, broadcasts a query to all /// connected replier ports. /// diff --git a/nexosim/src/simulation/scheduler.rs b/nexosim/src/simulation/scheduler.rs index 9ea00bf..e938d4c 100644 --- a/nexosim/src/simulation/scheduler.rs +++ b/nexosim/src/simulation/scheduler.rs @@ -33,6 +33,9 @@ impl Scheduler { /// Returns the current simulation time. /// + /// Beware that, if the scheduler runs in a separate thread as the + /// simulation, the time may change concurrently. + /// /// # Examples /// /// ```