Skip to content

Commit

Permalink
Various gRPC improvements
Browse files Browse the repository at this point in the history
This patch in particular allows asynchronous gRPC access to the scheduler
and to the monitoring functions.
  • Loading branch information
sbarral committed Jan 9, 2025
1 parent 4340774 commit 3cd04cb
Show file tree
Hide file tree
Showing 13 changed files with 371 additions and 277 deletions.
44 changes: 36 additions & 8 deletions nexosim/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,27 @@ description = """
A high performance asychronous compute framework for system simulation.
"""
categories = ["simulation", "aerospace", "science"]
keywords = ["simulation", "discrete-event", "systems", "cyberphysical", "real-time"]
keywords = [
"simulation",
"discrete-event",
"systems",
"cyberphysical",
"real-time",
]

[features]
# gRPC service.
grpc = ["dep:bytes", "dep:ciborium", "dep:prost", "dep:prost-types", "dep:serde", "dep:tonic", "dep:tokio", "dep:tonic"]
grpc = [
"dep:bytes",
"dep:ciborium",
"dep:prost",
"dep:prost-types",
"dep:serde",
"dep:tonic",
"dep:tokio",
"dep:tonic",
"tai-time/serde",
]
tracing = ["dep:tracing", "dep:tracing-subscriber"]

# DEVELOPMENT ONLY: API-unstable public exports meant for external test/benchmarking.
Expand Down Expand Up @@ -54,15 +70,24 @@ ciborium = { version = "0.2.2", optional = true }
prost = { version = "0.13", optional = true }
prost-types = { version = "0.13", optional = true }
serde = { version = "1", optional = true }
tokio = { version = "1.0", features=["net", "rt-multi-thread"], optional = true }
tonic = { version = "0.12", default-features = false, features=["codegen", "prost", "server"], optional = true }
tracing = { version= "0.1.40", default-features = false, features=["std"], optional = true }
tracing-subscriber = { version= "0.3.18", optional = true }
tokio = { version = "1.0", features = [
"net",
"rt-multi-thread",
], optional = true }
tonic = { version = "0.12", default-features = false, features = [
"codegen",
"prost",
"server",
], optional = true }
tracing = { version = "0.1.40", default-features = false, features = [
"std",
], optional = true }
tracing-subscriber = { version = "0.3.18", optional = true }

[dev-dependencies]
futures-util = "0.3"
futures-executor = "0.3"
tracing-subscriber = { version= "0.3.18", features=["env-filter"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

[target.'cfg(nexosim_loom)'.dev-dependencies]
loom = "0.7"
Expand All @@ -74,7 +99,10 @@ tonic-build = { version = "0.12" }
[lints.rust]
# `nexosim_loom` flag: run loom-based tests.
# `nexosim_grpc_codegen` flag: regenerate gRPC code from .proto definitions.
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(nexosim_loom)', 'cfg(nexosim_grpc_codegen)'] }
unexpected_cfgs = { level = "warn", check-cfg = [
'cfg(nexosim_loom)',
'cfg(nexosim_grpc_codegen)',
] }

[package.metadata.docs.rs]
all-features = true
Expand Down
41 changes: 20 additions & 21 deletions nexosim/src/grpc/api/simulation.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,25 @@ import "google/protobuf/empty.proto";

enum ErrorCode {
INTERNAL_ERROR = 0;
SIMULATION_NOT_STARTED = 1;
SIMULATION_TERMINATED = 2;
SIMULATION_DEADLOCK = 3;
SIMULATION_MESSAGE_LOSS = 4;
SIMULATION_NO_RECIPIENT = 5;
SIMULATION_PANIC = 6;
SIMULATION_TIMEOUT = 7;
SIMULATION_OUT_OF_SYNC = 8;
SIMULATION_BAD_QUERY = 9;
SIMULATION_TIME_OUT_OF_RANGE = 10;
MISSING_ARGUMENT = 20;
INVALID_TIME = 30;
INVALID_PERIOD = 31;
INVALID_DEADLINE = 32;
INVALID_MESSAGE = 33;
INVALID_KEY = 34;
SOURCE_NOT_FOUND = 40;
SINK_NOT_FOUND = 41;
MISSING_ARGUMENT = 1;
INVALID_TIME = 2;
INVALID_PERIOD = 3;
INVALID_DEADLINE = 4;
INVALID_MESSAGE = 5;
INVALID_KEY = 6;
INITIALIZER_PANIC = 10;
SIMULATION_NOT_STARTED = 11;
SIMULATION_TERMINATED = 12;
SIMULATION_DEADLOCK = 13;
SIMULATION_MESSAGE_LOSS = 14;
SIMULATION_NO_RECIPIENT = 15;
SIMULATION_PANIC = 16;
SIMULATION_TIMEOUT = 17;
SIMULATION_OUT_OF_SYNC = 18;
SIMULATION_BAD_QUERY = 19;
SIMULATION_TIME_OUT_OF_RANGE = 20;
SOURCE_NOT_FOUND = 30;
SINK_NOT_FOUND = 31;
}

message Error {
Expand All @@ -39,9 +40,7 @@ message EventKey {
uint64 subkey2 = 2;
}

message InitRequest {
bytes cfg = 2;
}
message InitRequest { bytes cfg = 2; }
message InitReply {
oneof result { // Always returns exactly 1 variant.
google.protobuf.Empty empty = 1;
Expand Down
63 changes: 33 additions & 30 deletions nexosim/src/grpc/codegen/simulation.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,24 +335,25 @@ pub mod any_request {
#[repr(i32)]
pub enum ErrorCode {
InternalError = 0,
SimulationNotStarted = 1,
SimulationTerminated = 2,
SimulationDeadlock = 3,
SimulationMessageLoss = 4,
SimulationNoRecipient = 5,
SimulationPanic = 6,
SimulationTimeout = 7,
SimulationOutOfSync = 8,
SimulationBadQuery = 9,
SimulationTimeOutOfRange = 10,
MissingArgument = 20,
InvalidTime = 30,
InvalidPeriod = 31,
InvalidDeadline = 32,
InvalidMessage = 33,
InvalidKey = 34,
SourceNotFound = 40,
SinkNotFound = 41,
MissingArgument = 1,
InvalidTime = 2,
InvalidPeriod = 3,
InvalidDeadline = 4,
InvalidMessage = 5,
InvalidKey = 6,
InitializerPanic = 10,
SimulationNotStarted = 11,
SimulationTerminated = 12,
SimulationDeadlock = 13,
SimulationMessageLoss = 14,
SimulationNoRecipient = 15,
SimulationPanic = 16,
SimulationTimeout = 17,
SimulationOutOfSync = 18,
SimulationBadQuery = 19,
SimulationTimeOutOfRange = 20,
SourceNotFound = 30,
SinkNotFound = 31,
}
impl ErrorCode {
/// String value of the enum field names used in the ProtoBuf definition.
Expand All @@ -362,6 +363,13 @@ impl ErrorCode {
pub fn as_str_name(&self) -> &'static str {
match self {
Self::InternalError => "INTERNAL_ERROR",
Self::MissingArgument => "MISSING_ARGUMENT",
Self::InvalidTime => "INVALID_TIME",
Self::InvalidPeriod => "INVALID_PERIOD",
Self::InvalidDeadline => "INVALID_DEADLINE",
Self::InvalidMessage => "INVALID_MESSAGE",
Self::InvalidKey => "INVALID_KEY",
Self::InitializerPanic => "INITIALIZER_PANIC",
Self::SimulationNotStarted => "SIMULATION_NOT_STARTED",
Self::SimulationTerminated => "SIMULATION_TERMINATED",
Self::SimulationDeadlock => "SIMULATION_DEADLOCK",
Expand All @@ -372,12 +380,6 @@ impl ErrorCode {
Self::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC",
Self::SimulationBadQuery => "SIMULATION_BAD_QUERY",
Self::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE",
Self::MissingArgument => "MISSING_ARGUMENT",
Self::InvalidTime => "INVALID_TIME",
Self::InvalidPeriod => "INVALID_PERIOD",
Self::InvalidDeadline => "INVALID_DEADLINE",
Self::InvalidMessage => "INVALID_MESSAGE",
Self::InvalidKey => "INVALID_KEY",
Self::SourceNotFound => "SOURCE_NOT_FOUND",
Self::SinkNotFound => "SINK_NOT_FOUND",
}
Expand All @@ -386,6 +388,13 @@ impl ErrorCode {
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"INTERNAL_ERROR" => Some(Self::InternalError),
"MISSING_ARGUMENT" => Some(Self::MissingArgument),
"INVALID_TIME" => Some(Self::InvalidTime),
"INVALID_PERIOD" => Some(Self::InvalidPeriod),
"INVALID_DEADLINE" => Some(Self::InvalidDeadline),
"INVALID_MESSAGE" => Some(Self::InvalidMessage),
"INVALID_KEY" => Some(Self::InvalidKey),
"INITIALIZER_PANIC" => Some(Self::InitializerPanic),
"SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted),
"SIMULATION_TERMINATED" => Some(Self::SimulationTerminated),
"SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock),
Expand All @@ -396,12 +405,6 @@ impl ErrorCode {
"SIMULATION_OUT_OF_SYNC" => Some(Self::SimulationOutOfSync),
"SIMULATION_BAD_QUERY" => Some(Self::SimulationBadQuery),
"SIMULATION_TIME_OUT_OF_RANGE" => Some(Self::SimulationTimeOutOfRange),
"MISSING_ARGUMENT" => Some(Self::MissingArgument),
"INVALID_TIME" => Some(Self::InvalidTime),
"INVALID_PERIOD" => Some(Self::InvalidPeriod),
"INVALID_DEADLINE" => Some(Self::InvalidDeadline),
"INVALID_MESSAGE" => Some(Self::InvalidMessage),
"INVALID_KEY" => Some(Self::InvalidKey),
"SOURCE_NOT_FOUND" => Some(Self::SourceNotFound),
"SINK_NOT_FOUND" => Some(Self::SinkNotFound),
_ => None,
Expand Down
4 changes: 2 additions & 2 deletions nexosim/src/grpc/key_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ pub(crate) struct KeyRegistry {
impl KeyRegistry {
/// Inserts an `ActionKey` into the registry.
///
/// The provided expiration deadline is the latest time at which the key may
/// still be active.
/// The provided expiration deadline is the latest time at which the key is
/// guaranteed to be extractable.
pub(crate) fn insert_key(
&mut self,
action_key: ActionKey,
Expand Down
36 changes: 26 additions & 10 deletions nexosim/src/grpc/run.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! gRPC simulation service.
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;

Expand All @@ -13,7 +14,7 @@ use crate::simulation::{Simulation, SimulationError};
use super::codegen::simulation::*;
use super::key_registry::KeyRegistry;
use super::services::InitService;
use super::services::{ControllerService, MonitorService};
use super::services::{ControllerService, MonitorService, SchedulerService};

/// Runs a gRPC simulation server.
///
Expand All @@ -37,7 +38,8 @@ fn run_service(
service: GrpcSimulationService,
addr: SocketAddr,
) -> Result<(), Box<dyn std::error::Error>> {
// Use 2 threads so that the controller and monitor services can be used
// Use 2 threads so that the even if the controller service is blocked due
// to ongoing simulation execution, other services can still be used
// concurrently.
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
Expand All @@ -58,6 +60,7 @@ struct GrpcSimulationService {
init_service: Mutex<InitService>,
controller_service: Mutex<ControllerService>,
monitor_service: Mutex<MonitorService>,
scheduler_service: Mutex<SchedulerService>,
}

impl GrpcSimulationService {
Expand All @@ -76,6 +79,7 @@ impl GrpcSimulationService {
init_service: Mutex::new(InitService::new(sim_gen)),
controller_service: Mutex::new(ControllerService::NotStarted),
monitor_service: Mutex::new(MonitorService::NotStarted),
scheduler_service: Mutex::new(SchedulerService::NotStarted),
}
}

Expand All @@ -93,6 +97,11 @@ impl GrpcSimulationService {
fn monitor(&self) -> MutexGuard<'_, MonitorService> {
self.monitor_service.lock().unwrap()
}

/// Locks the scheduler and returns the mutex guard.
fn scheduler(&self) -> MutexGuard<'_, SchedulerService> {
self.scheduler_service.lock().unwrap()
}
}

#[tonic::async_trait]
Expand All @@ -103,15 +112,22 @@ impl simulation_server::Simulation for GrpcSimulationService {
let (reply, bench) = self.initializer().init(request);

if let Some((simulation, scheduler, endpoint_registry)) = bench {
let event_source_registry = Arc::new(endpoint_registry.event_source_registry);
let query_source_registry = endpoint_registry.query_source_registry;
let event_sink_registry = endpoint_registry.event_sink_registry;

*self.controller() = ControllerService::Started {
simulation,
scheduler,
event_source_registry: endpoint_registry.event_source_registry,
query_source_registry: endpoint_registry.query_source_registry,
key_registry: KeyRegistry::default(),
event_source_registry: event_source_registry.clone(),
query_source_registry,
};
*self.monitor() = MonitorService::Started {
event_sink_registry: endpoint_registry.event_sink_registry,
event_sink_registry,
};
*self.scheduler() = SchedulerService::Started {
scheduler,
event_source_registry,
key_registry: KeyRegistry::default(),
};
}

Expand All @@ -120,7 +136,7 @@ impl simulation_server::Simulation for GrpcSimulationService {
async fn time(&self, request: Request<TimeRequest>) -> Result<Response<TimeReply>, Status> {
let request = request.into_inner();

Ok(Response::new(self.controller().time(request)))
Ok(Response::new(self.scheduler().time(request)))
}
async fn step(&self, request: Request<StepRequest>) -> Result<Response<StepReply>, Status> {
let request = request.into_inner();
Expand All @@ -141,15 +157,15 @@ impl simulation_server::Simulation for GrpcSimulationService {
) -> Result<Response<ScheduleEventReply>, Status> {
let request = request.into_inner();

Ok(Response::new(self.controller().schedule_event(request)))
Ok(Response::new(self.scheduler().schedule_event(request)))
}
async fn cancel_event(
&self,
request: Request<CancelEventRequest>,
) -> Result<Response<CancelEventReply>, Status> {
let request = request.into_inner();

Ok(Response::new(self.controller().cancel_event(request)))
Ok(Response::new(self.scheduler().cancel_event(request)))
}
async fn process_event(
&self,
Expand Down
2 changes: 2 additions & 0 deletions nexosim/src/grpc/services.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod controller_service;
mod init_service;
mod monitor_service;
mod scheduler_service;

use std::time::Duration;

Expand All @@ -13,6 +14,7 @@ use crate::simulation::{ExecutionError, SchedulingError, SimulationError};
pub(crate) use controller_service::ControllerService;
pub(crate) use init_service::InitService;
pub(crate) use monitor_service::MonitorService;
pub(crate) use scheduler_service::SchedulerService;

/// Transforms an error code and a message into a Protobuf error.
fn to_error(code: ErrorCode, message: impl Into<String>) -> Error {
Expand Down
Loading

0 comments on commit 3cd04cb

Please sign in to comment.