diff --git a/crates/trycp_api/src/lib.rs b/crates/trycp_api/src/lib.rs index 14f8f2b9..a50cef5a 100644 --- a/crates/trycp_api/src/lib.rs +++ b/crates/trycp_api/src/lib.rs @@ -117,7 +117,7 @@ pub enum MessageResponse { Null, /// Encoded response. - Bytes(Vec) + Bytes(Vec), } impl MessageResponse { diff --git a/crates/trycp_client/src/lib.rs b/crates/trycp_client/src/lib.rs index b0fa5b70..f632327c 100644 --- a/crates/trycp_client/src/lib.rs +++ b/crates/trycp_client/src/lib.rs @@ -38,9 +38,8 @@ impl SignalRecv { /// Trycp client. pub struct TrycpClient { ws: Ws, - pend: Arc< - std::sync::Mutex>>>, - >, + pend: + Arc>>>>, recv_task: tokio::task::JoinHandle<()>, } @@ -79,7 +78,10 @@ impl TrycpClient { while let Some(Ok(msg)) = stream.next().await { let msg = match msg { Message::Close(close_msg) => { - eprintln!("Received websocket close from TryCP server: {}", close_msg.map(|f| f.reason).unwrap_or("No reason".into())); + eprintln!( + "Received websocket close from TryCP server: {}", + close_msg.map(|f| f.reason).unwrap_or("No reason".into()) + ); break; } Message::Ping(p) => { @@ -89,9 +91,7 @@ impl TrycpClient { Message::Pong(_) => { continue; } - Message::Binary(msg) => { - msg - } + Message::Binary(msg) => msg, _ => { panic!("Unexpected message from TryCP server: {:?}", msg); } diff --git a/crates/trycp_server/src/admin_call.rs b/crates/trycp_server/src/admin_call.rs index e950eff1..2103cb85 100644 --- a/crates/trycp_server/src/admin_call.rs +++ b/crates/trycp_server/src/admin_call.rs @@ -1,13 +1,22 @@ use crate::{HolochainMessage, WsClientDuplex, PLAYERS}; +use futures::lock::Mutex; use futures::{SinkExt, StreamExt}; +use once_cell::sync::Lazy; use snafu::{OptionExt, ResultExt, Snafu}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::net::TcpStream; use tokio::time::error::Elapsed; +use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; +use tokio_tungstenite::tungstenite::protocol::CloseFrame; use tokio_tungstenite::tungstenite::{ - self, - client::IntoClientRequest, - protocol::{frame::coding::CloseCode, CloseFrame, WebSocketConfig}, - Message, + self, client::IntoClientRequest, protocol::WebSocketConfig, Message, }; +use tokio_tungstenite::WebSocketStream; + +pub(crate) static ADMIN_CONNECTIONS: Lazy< + futures::lock::Mutex>>>>, +> = Lazy::new(Default::default); #[derive(Debug, Snafu)] pub(crate) enum AdminCallError { @@ -24,44 +33,45 @@ pub(crate) enum AdminCallError { pub(crate) async fn admin_call(id: String, message: Vec) -> Result, AdminCallError> { println!("admin_interface_call id: {:?}", id); - let port = PLAYERS - .read() - .get(&id) - .map(|player| player.admin_port) - .context(PlayerNotConfigured { id })?; - - let addr = format!("localhost:{port}"); - let stream = tokio::net::TcpStream::connect(addr.clone()) + let mut admin_connections = ADMIN_CONNECTIONS.lock().await; + if !admin_connections.contains_key(&id) { + let port = PLAYERS + .read() + .get(&id) + .map(|player| player.admin_port) + .context(PlayerNotConfigured { id: id.clone() })?; + + let stream = tokio::net::TcpStream::connect(("localhost", port)) + .await + .context(TcpConnect)?; + + let uri = format!("ws://localhost:{}", port); + let mut request = uri.clone().into_client_request().expect("not a valid URI"); + // needed for admin websocket connection to be accepted + request.headers_mut().insert( + "origin", + "trycp-admin".parse().expect("invalid origin header value"), + ); + request.body(); + + println!("Establishing admin interface with {:?}", uri); + + let (ws_stream, _) = tokio_tungstenite::client_async_with_config( + request, + stream, + Some(WebSocketConfig::default()), + ) .await - .context(TcpConnect)?; - let uri = format!("ws://{}", addr); - let mut request = uri.clone().into_client_request().expect("not a valid URI"); - // needed for admin websocket connection to be accepted - request.headers_mut().insert( - "origin", - "trycp-admin".parse().expect("invalid origin header value"), - ); - request.body(); - - println!("Establishing admin interface with {:?}", uri); - - let (mut ws_stream, _) = tokio_tungstenite::client_async_with_config( - request, - stream, - Some(WebSocketConfig::default()), - ) - .await - .context(WsConnect)?; + .context(WsConnect)?; - println!("Established admin interface"); + println!("Established admin interface"); + + admin_connections.insert(id.clone(), Arc::new(futures::lock::Mutex::new(ws_stream))); + } + + let mut ws_stream = admin_connections[&id].lock().await; let call_result = call(&mut ws_stream, 0, message).await; - let _ = ws_stream - .send(Message::Close(Some(CloseFrame { - code: CloseCode::Normal, - reason: "fulfilled purpose".into(), - }))) - .await; Ok(call_result?) } @@ -147,3 +157,25 @@ async fn call( Ok(response_data) } + +#[derive(Debug, Snafu)] +pub(crate) enum AdminDisconnectError { + #[snafu(display("Couldn't complete closing handshake: {}", source))] + CloseHandshake { source: tungstenite::Error }, +} + +pub(crate) async fn disconnect( + ws_stream: Arc>>, +) -> Result<(), AdminDisconnectError> { + let _ = ws_stream + .lock() + .await + .send(Message::Close(Some(CloseFrame { + code: CloseCode::Normal, + reason: "fulfilled purpose".into(), + }))) + .await + .context(CloseHandshake)?; + + Ok(()) +} diff --git a/crates/trycp_server/src/app_interface.rs b/crates/trycp_server/src/app_interface.rs index 3e971c23..72fb0b69 100644 --- a/crates/trycp_server/src/app_interface.rs +++ b/crates/trycp_server/src/app_interface.rs @@ -20,7 +20,7 @@ pub(crate) struct Connection { pending_requests: PendingRequests, } -pub(crate) static CONNECTIONS: Lazy< +pub(crate) static APP_CONNECTIONS: Lazy< futures::lock::Mutex>>>>, > = Lazy::new(Default::default); @@ -61,7 +61,7 @@ pub(crate) async fn connect( port: u16, response_writer: Arc>, ) -> Result<(), ConnectError> { - let connection_lock = Arc::clone(CONNECTIONS.lock().await.entry(port).or_default()); + let connection_lock = Arc::clone(APP_CONNECTIONS.lock().await.entry(port).or_default()); let mut connection = connection_lock.lock().await; if connection.is_some() { @@ -196,15 +196,15 @@ pub(crate) async fn listen( } #[derive(Debug, Snafu)] -pub(crate) enum DisconnectError { +pub(crate) enum AppDisconnectError { #[snafu(display("Couldn't complete closing handshake: {}", source))] CloseHandshake { source: tungstenite::Error }, #[snafu(display("Couldn't listen on app interface: {}", source))] Listen { source: ListenError }, } -pub(crate) async fn disconnect_by_port(port: u16) -> Result<(), DisconnectError> { - let connection_lock = match CONNECTIONS.lock().await.get(&port) { +pub(crate) async fn disconnect_by_port(port: u16) -> Result<(), AppDisconnectError> { + let connection_lock = match APP_CONNECTIONS.lock().await.get(&port) { Some(connection_lock) => Arc::clone(connection_lock), None => return Ok(()), }; @@ -214,7 +214,7 @@ pub(crate) async fn disconnect_by_port(port: u16) -> Result<(), DisconnectError> pub(crate) async fn disconnect( connection_lock: Arc>>, -) -> Result<(), DisconnectError> { +) -> Result<(), AppDisconnectError> { let mut connection_guard = connection_lock.lock().await; let Connection { mut request_writer, @@ -251,7 +251,7 @@ pub(crate) enum CallError { pub(crate) async fn call(request_id: u64, port: u16, message: Vec) -> Result<(), CallError> { let connection_lock = Arc::clone( - CONNECTIONS + APP_CONNECTIONS .lock() .await .get(&port) diff --git a/crates/trycp_server/src/main.rs b/crates/trycp_server/src/main.rs index 1c1f5a62..a660df69 100644 --- a/crates/trycp_server/src/main.rs +++ b/crates/trycp_server/src/main.rs @@ -217,12 +217,12 @@ async fn ws_message( }) .await .unwrap(), - Request::Shutdown { id, signal } => spawn_blocking(move || { - let resp = shutdown::shutdown(id, signal).map_err(|e| e.to_string()); - serialize_resp(request_id, resp) - }) - .await - .unwrap(), + Request::Shutdown { id, signal } => serialize_resp( + request_id, + shutdown::shutdown(id, signal) + .await + .map_err(|e| e.to_string()), + ), Request::Reset => spawn_blocking(move || { serialize_resp(request_id, reset::reset().map_err(|e| e.to_string())) }) diff --git a/crates/trycp_server/src/reset.rs b/crates/trycp_server/src/reset.rs index e3c1914c..a9d37ef4 100644 --- a/crates/trycp_server/src/reset.rs +++ b/crates/trycp_server/src/reset.rs @@ -18,9 +18,12 @@ pub(crate) enum ResetError { } pub(crate) fn reset() -> Result<(), ResetError> { - let (players, connections) = { + let (players, app_connections, admin_connections) = { let mut players_guard = PLAYERS.write(); - let mut connections_guard = futures::executor::block_on(app_interface::CONNECTIONS.lock()); + let mut app_connections_guard = + futures::executor::block_on(app_interface::APP_CONNECTIONS.lock()); + let mut admin_connections_guard = + futures::executor::block_on(crate::admin_call::ADMIN_CONNECTIONS.lock()); NEXT_ADMIN_PORT.store(FIRST_ADMIN_PORT, atomic::Ordering::SeqCst); match std::fs::remove_dir_all(PLAYERS_DIR_PATH) { Ok(()) => {} @@ -29,11 +32,12 @@ pub(crate) fn reset() -> Result<(), ResetError> { } ( std::mem::take(&mut *players_guard), - std::mem::take(&mut *connections_guard), + std::mem::take(&mut *app_connections_guard), + std::mem::take(&mut *admin_connections_guard), ) }; - for (port, connection) in connections { + for (port, connection) in app_connections { if let Err(e) = futures::executor::block_on(app_interface::disconnect(connection)) { println!( "warn: failed to disconnect app interface at port {}: {}", @@ -42,6 +46,12 @@ pub(crate) fn reset() -> Result<(), ResetError> { } } + for (_, connection) in admin_connections { + if let Err(e) = futures::executor::block_on(crate::admin_call::disconnect(connection)) { + println!("warn: failed to disconnect admin interface: {}", e); + } + } + for (id, mut player) in players { if let Err(e) = kill_player(player.processes.get_mut(), &id, Signal::SIGKILL) { println!("warn: failed to kill player {:?}: {}", id, e); diff --git a/crates/trycp_server/src/shutdown.rs b/crates/trycp_server/src/shutdown.rs index 312386c1..c03f4147 100644 --- a/crates/trycp_server/src/shutdown.rs +++ b/crates/trycp_server/src/shutdown.rs @@ -1,7 +1,8 @@ +use crate::admin_call::ADMIN_CONNECTIONS; +use crate::{kill_player, player_config_exists, KillError, PLAYERS}; use nix::sys::signal::Signal; use snafu::{ensure, Snafu}; - -use crate::{kill_player, player_config_exists, KillError, PLAYERS}; +use tokio::task::spawn_blocking; #[derive(Debug, Snafu)] pub(crate) enum ShutdownError { @@ -13,9 +14,11 @@ pub(crate) enum ShutdownError { Kill { source: KillError }, } -pub(crate) fn shutdown(id: String, signal: Option) -> Result<(), ShutdownError> { +pub(crate) async fn shutdown(id: String, signal: Option) -> Result<(), ShutdownError> { ensure!(player_config_exists(&id), PlayerNotConfigured { id }); + ADMIN_CONNECTIONS.lock().await.remove(&id); + let signal = match signal.as_deref() { Some("SIGTERM") | None => Signal::SIGTERM, Some("SIGKILL") => Signal::SIGKILL, @@ -27,15 +30,21 @@ pub(crate) fn shutdown(id: String, signal: Option) -> Result<(), Shutdow } }; - let players_guard = PLAYERS.read(); - let processes_lock = match players_guard.get(&id) { - Some(player) => &player.processes, - None => return Ok(()), - }; + spawn_blocking(move || -> Result<(), ShutdownError> { + let players_guard = PLAYERS.read(); + let processes_lock = match players_guard.get(&id) { + Some(player) => &player.processes, + None => return Ok(()), + }; + + let mut player_cell = processes_lock.lock(); - let mut player_cell = processes_lock.lock(); + kill_player(&mut player_cell, &id, signal)?; - kill_player(&mut player_cell, &id, signal)?; + Ok(()) + }) + .await + .expect("Task to kill player should have completed")?; Ok(()) } diff --git a/crates/trycp_server/src/startup.rs b/crates/trycp_server/src/startup.rs index 3d7e1637..c1b38a95 100644 --- a/crates/trycp_server/src/startup.rs +++ b/crates/trycp_server/src/startup.rs @@ -16,8 +16,6 @@ use crate::{ pub enum Error { #[snafu(display("Could not find a configuration for player with ID {:?}", id))] PlayerNotConfigured { id: String }, - #[snafu(display("Could not create directory for lair-shim at {}: {}", path.display(), source))] - CreateDir { path: PathBuf, source: io::Error }, #[snafu(display("Could not create log file at {} for lair-keystore's stdout: {}", path.display(), source))] CreateLairStdoutFile { path: PathBuf, source: io::Error }, #[snafu(display("Could not spawn lair-keystore: {}", source))]