Skip to content

Commit

Permalink
Merge pull request #221 from holochain/trycp-keep-admin-connections-open
Browse files Browse the repository at this point in the history
Keep admin connections open
  • Loading branch information
ThetaSinner authored Jul 17, 2024
2 parents 99273a1 + d7f9b76 commit 8e2cd6b
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 75 deletions.
2 changes: 1 addition & 1 deletion crates/trycp_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub enum MessageResponse {
Null,

/// Encoded response.
Bytes(Vec<u8>)
Bytes(Vec<u8>),
}

impl MessageResponse {
Expand Down
14 changes: 7 additions & 7 deletions crates/trycp_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ impl SignalRecv {
/// Trycp client.
pub struct TrycpClient {
ws: Ws,
pend: Arc<
std::sync::Mutex<HashMap<u64, tokio::sync::oneshot::Sender<Result<MessageResponse>>>>,
>,
pend:
Arc<std::sync::Mutex<HashMap<u64, tokio::sync::oneshot::Sender<Result<MessageResponse>>>>>,
recv_task: tokio::task::JoinHandle<()>,
}

Expand Down Expand Up @@ -79,7 +78,10 @@ impl TrycpClient {
while let Some(Ok(msg)) = stream.next().await {
let msg = match msg {
Message::Close(close_msg) => {
eprintln!("Received websocket close from TryCP server: {}", close_msg.map(|f| f.reason).unwrap_or("No reason".into()));
eprintln!(
"Received websocket close from TryCP server: {}",
close_msg.map(|f| f.reason).unwrap_or("No reason".into())
);
break;
}
Message::Ping(p) => {
Expand All @@ -89,9 +91,7 @@ impl TrycpClient {
Message::Pong(_) => {
continue;
}
Message::Binary(msg) => {
msg
}
Message::Binary(msg) => msg,
_ => {
panic!("Unexpected message from TryCP server: {:?}", msg);
}
Expand Down
108 changes: 70 additions & 38 deletions crates/trycp_server/src/admin_call.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
use crate::{HolochainMessage, WsClientDuplex, PLAYERS};
use futures::lock::Mutex;
use futures::{SinkExt, StreamExt};
use once_cell::sync::Lazy;
use snafu::{OptionExt, ResultExt, Snafu};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::time::error::Elapsed;
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tokio_tungstenite::tungstenite::{
self,
client::IntoClientRequest,
protocol::{frame::coding::CloseCode, CloseFrame, WebSocketConfig},
Message,
self, client::IntoClientRequest, protocol::WebSocketConfig, Message,
};
use tokio_tungstenite::WebSocketStream;

pub(crate) static ADMIN_CONNECTIONS: Lazy<
futures::lock::Mutex<HashMap<String, Arc<futures::lock::Mutex<WebSocketStream<TcpStream>>>>>,
> = Lazy::new(Default::default);

#[derive(Debug, Snafu)]
pub(crate) enum AdminCallError {
Expand All @@ -24,44 +33,45 @@ pub(crate) enum AdminCallError {
pub(crate) async fn admin_call(id: String, message: Vec<u8>) -> Result<Vec<u8>, AdminCallError> {
println!("admin_interface_call id: {:?}", id);

let port = PLAYERS
.read()
.get(&id)
.map(|player| player.admin_port)
.context(PlayerNotConfigured { id })?;

let addr = format!("localhost:{port}");
let stream = tokio::net::TcpStream::connect(addr.clone())
let mut admin_connections = ADMIN_CONNECTIONS.lock().await;
if !admin_connections.contains_key(&id) {
let port = PLAYERS
.read()
.get(&id)
.map(|player| player.admin_port)
.context(PlayerNotConfigured { id: id.clone() })?;

let stream = tokio::net::TcpStream::connect(("localhost", port))
.await
.context(TcpConnect)?;

let uri = format!("ws://localhost:{}", port);
let mut request = uri.clone().into_client_request().expect("not a valid URI");
// needed for admin websocket connection to be accepted
request.headers_mut().insert(
"origin",
"trycp-admin".parse().expect("invalid origin header value"),
);
request.body();

println!("Establishing admin interface with {:?}", uri);

let (ws_stream, _) = tokio_tungstenite::client_async_with_config(
request,
stream,
Some(WebSocketConfig::default()),
)
.await
.context(TcpConnect)?;
let uri = format!("ws://{}", addr);
let mut request = uri.clone().into_client_request().expect("not a valid URI");
// needed for admin websocket connection to be accepted
request.headers_mut().insert(
"origin",
"trycp-admin".parse().expect("invalid origin header value"),
);
request.body();

println!("Establishing admin interface with {:?}", uri);

let (mut ws_stream, _) = tokio_tungstenite::client_async_with_config(
request,
stream,
Some(WebSocketConfig::default()),
)
.await
.context(WsConnect)?;
.context(WsConnect)?;

println!("Established admin interface");
println!("Established admin interface");

admin_connections.insert(id.clone(), Arc::new(futures::lock::Mutex::new(ws_stream)));
}

let mut ws_stream = admin_connections[&id].lock().await;

let call_result = call(&mut ws_stream, 0, message).await;
let _ = ws_stream
.send(Message::Close(Some(CloseFrame {
code: CloseCode::Normal,
reason: "fulfilled purpose".into(),
})))
.await;

Ok(call_result?)
}
Expand Down Expand Up @@ -147,3 +157,25 @@ async fn call(

Ok(response_data)
}

#[derive(Debug, Snafu)]
pub(crate) enum AdminDisconnectError {
#[snafu(display("Couldn't complete closing handshake: {}", source))]
CloseHandshake { source: tungstenite::Error },
}

pub(crate) async fn disconnect(
ws_stream: Arc<Mutex<WebSocketStream<TcpStream>>>,
) -> Result<(), AdminDisconnectError> {
let _ = ws_stream
.lock()
.await
.send(Message::Close(Some(CloseFrame {
code: CloseCode::Normal,
reason: "fulfilled purpose".into(),
})))
.await
.context(CloseHandshake)?;

Ok(())
}
14 changes: 7 additions & 7 deletions crates/trycp_server/src/app_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub(crate) struct Connection {
pending_requests: PendingRequests,
}

pub(crate) static CONNECTIONS: Lazy<
pub(crate) static APP_CONNECTIONS: Lazy<
futures::lock::Mutex<HashMap<u16, Arc<futures::lock::Mutex<Option<Connection>>>>>,
> = Lazy::new(Default::default);

Expand Down Expand Up @@ -61,7 +61,7 @@ pub(crate) async fn connect(
port: u16,
response_writer: Arc<futures::lock::Mutex<WsResponseWriter>>,
) -> Result<(), ConnectError> {
let connection_lock = Arc::clone(CONNECTIONS.lock().await.entry(port).or_default());
let connection_lock = Arc::clone(APP_CONNECTIONS.lock().await.entry(port).or_default());

let mut connection = connection_lock.lock().await;
if connection.is_some() {
Expand Down Expand Up @@ -196,15 +196,15 @@ pub(crate) async fn listen(
}

#[derive(Debug, Snafu)]
pub(crate) enum DisconnectError {
pub(crate) enum AppDisconnectError {
#[snafu(display("Couldn't complete closing handshake: {}", source))]
CloseHandshake { source: tungstenite::Error },
#[snafu(display("Couldn't listen on app interface: {}", source))]
Listen { source: ListenError },
}

pub(crate) async fn disconnect_by_port(port: u16) -> Result<(), DisconnectError> {
let connection_lock = match CONNECTIONS.lock().await.get(&port) {
pub(crate) async fn disconnect_by_port(port: u16) -> Result<(), AppDisconnectError> {
let connection_lock = match APP_CONNECTIONS.lock().await.get(&port) {
Some(connection_lock) => Arc::clone(connection_lock),
None => return Ok(()),
};
Expand All @@ -214,7 +214,7 @@ pub(crate) async fn disconnect_by_port(port: u16) -> Result<(), DisconnectError>

pub(crate) async fn disconnect(
connection_lock: Arc<futures::lock::Mutex<Option<Connection>>>,
) -> Result<(), DisconnectError> {
) -> Result<(), AppDisconnectError> {
let mut connection_guard = connection_lock.lock().await;
let Connection {
mut request_writer,
Expand Down Expand Up @@ -251,7 +251,7 @@ pub(crate) enum CallError {

pub(crate) async fn call(request_id: u64, port: u16, message: Vec<u8>) -> Result<(), CallError> {
let connection_lock = Arc::clone(
CONNECTIONS
APP_CONNECTIONS
.lock()
.await
.get(&port)
Expand Down
12 changes: 6 additions & 6 deletions crates/trycp_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,12 @@ async fn ws_message(
})
.await
.unwrap(),
Request::Shutdown { id, signal } => spawn_blocking(move || {
let resp = shutdown::shutdown(id, signal).map_err(|e| e.to_string());
serialize_resp(request_id, resp)
})
.await
.unwrap(),
Request::Shutdown { id, signal } => serialize_resp(
request_id,
shutdown::shutdown(id, signal)
.await
.map_err(|e| e.to_string()),
),
Request::Reset => spawn_blocking(move || {
serialize_resp(request_id, reset::reset().map_err(|e| e.to_string()))
})
Expand Down
18 changes: 14 additions & 4 deletions crates/trycp_server/src/reset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ pub(crate) enum ResetError {
}

pub(crate) fn reset() -> Result<(), ResetError> {
let (players, connections) = {
let (players, app_connections, admin_connections) = {
let mut players_guard = PLAYERS.write();
let mut connections_guard = futures::executor::block_on(app_interface::CONNECTIONS.lock());
let mut app_connections_guard =
futures::executor::block_on(app_interface::APP_CONNECTIONS.lock());
let mut admin_connections_guard =
futures::executor::block_on(crate::admin_call::ADMIN_CONNECTIONS.lock());
NEXT_ADMIN_PORT.store(FIRST_ADMIN_PORT, atomic::Ordering::SeqCst);
match std::fs::remove_dir_all(PLAYERS_DIR_PATH) {
Ok(()) => {}
Expand All @@ -29,11 +32,12 @@ pub(crate) fn reset() -> Result<(), ResetError> {
}
(
std::mem::take(&mut *players_guard),
std::mem::take(&mut *connections_guard),
std::mem::take(&mut *app_connections_guard),
std::mem::take(&mut *admin_connections_guard),
)
};

for (port, connection) in connections {
for (port, connection) in app_connections {
if let Err(e) = futures::executor::block_on(app_interface::disconnect(connection)) {
println!(
"warn: failed to disconnect app interface at port {}: {}",
Expand All @@ -42,6 +46,12 @@ pub(crate) fn reset() -> Result<(), ResetError> {
}
}

for (_, connection) in admin_connections {
if let Err(e) = futures::executor::block_on(crate::admin_call::disconnect(connection)) {
println!("warn: failed to disconnect admin interface: {}", e);
}
}

for (id, mut player) in players {
if let Err(e) = kill_player(player.processes.get_mut(), &id, Signal::SIGKILL) {
println!("warn: failed to kill player {:?}: {}", id, e);
Expand Down
29 changes: 19 additions & 10 deletions crates/trycp_server/src/shutdown.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::admin_call::ADMIN_CONNECTIONS;
use crate::{kill_player, player_config_exists, KillError, PLAYERS};
use nix::sys::signal::Signal;
use snafu::{ensure, Snafu};

use crate::{kill_player, player_config_exists, KillError, PLAYERS};
use tokio::task::spawn_blocking;

#[derive(Debug, Snafu)]
pub(crate) enum ShutdownError {
Expand All @@ -13,9 +14,11 @@ pub(crate) enum ShutdownError {
Kill { source: KillError },
}

pub(crate) fn shutdown(id: String, signal: Option<String>) -> Result<(), ShutdownError> {
pub(crate) async fn shutdown(id: String, signal: Option<String>) -> Result<(), ShutdownError> {
ensure!(player_config_exists(&id), PlayerNotConfigured { id });

ADMIN_CONNECTIONS.lock().await.remove(&id);

let signal = match signal.as_deref() {
Some("SIGTERM") | None => Signal::SIGTERM,
Some("SIGKILL") => Signal::SIGKILL,
Expand All @@ -27,15 +30,21 @@ pub(crate) fn shutdown(id: String, signal: Option<String>) -> Result<(), Shutdow
}
};

let players_guard = PLAYERS.read();
let processes_lock = match players_guard.get(&id) {
Some(player) => &player.processes,
None => return Ok(()),
};
spawn_blocking(move || -> Result<(), ShutdownError> {
let players_guard = PLAYERS.read();
let processes_lock = match players_guard.get(&id) {
Some(player) => &player.processes,
None => return Ok(()),
};

let mut player_cell = processes_lock.lock();

let mut player_cell = processes_lock.lock();
kill_player(&mut player_cell, &id, signal)?;

kill_player(&mut player_cell, &id, signal)?;
Ok(())
})
.await
.expect("Task to kill player should have completed")?;

Ok(())
}
2 changes: 0 additions & 2 deletions crates/trycp_server/src/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use crate::{
pub enum Error {
#[snafu(display("Could not find a configuration for player with ID {:?}", id))]
PlayerNotConfigured { id: String },
#[snafu(display("Could not create directory for lair-shim at {}: {}", path.display(), source))]
CreateDir { path: PathBuf, source: io::Error },
#[snafu(display("Could not create log file at {} for lair-keystore's stdout: {}", path.display(), source))]
CreateLairStdoutFile { path: PathBuf, source: io::Error },
#[snafu(display("Could not spawn lair-keystore: {}", source))]
Expand Down

0 comments on commit 8e2cd6b

Please sign in to comment.