Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep admin connections open #221

Merged
merged 1 commit into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/trycp_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub enum MessageResponse {
Null,

/// Encoded response.
Bytes(Vec<u8>)
Bytes(Vec<u8>),
}

impl MessageResponse {
Expand Down
14 changes: 7 additions & 7 deletions crates/trycp_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ impl SignalRecv {
/// Trycp client.
pub struct TrycpClient {
ws: Ws,
pend: Arc<
std::sync::Mutex<HashMap<u64, tokio::sync::oneshot::Sender<Result<MessageResponse>>>>,
>,
pend:
Arc<std::sync::Mutex<HashMap<u64, tokio::sync::oneshot::Sender<Result<MessageResponse>>>>>,
recv_task: tokio::task::JoinHandle<()>,
}

Expand Down Expand Up @@ -79,7 +78,10 @@ impl TrycpClient {
while let Some(Ok(msg)) = stream.next().await {
let msg = match msg {
Message::Close(close_msg) => {
eprintln!("Received websocket close from TryCP server: {}", close_msg.map(|f| f.reason).unwrap_or("No reason".into()));
eprintln!(
"Received websocket close from TryCP server: {}",
close_msg.map(|f| f.reason).unwrap_or("No reason".into())
);
break;
}
Message::Ping(p) => {
Expand All @@ -89,9 +91,7 @@ impl TrycpClient {
Message::Pong(_) => {
continue;
}
Message::Binary(msg) => {
msg
}
Message::Binary(msg) => msg,
_ => {
panic!("Unexpected message from TryCP server: {:?}", msg);
}
Expand Down
108 changes: 70 additions & 38 deletions crates/trycp_server/src/admin_call.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
use crate::{HolochainMessage, WsClientDuplex, PLAYERS};
use futures::lock::Mutex;
use futures::{SinkExt, StreamExt};
use once_cell::sync::Lazy;
use snafu::{OptionExt, ResultExt, Snafu};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::time::error::Elapsed;
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tokio_tungstenite::tungstenite::{
self,
client::IntoClientRequest,
protocol::{frame::coding::CloseCode, CloseFrame, WebSocketConfig},
Message,
self, client::IntoClientRequest, protocol::WebSocketConfig, Message,
};
use tokio_tungstenite::WebSocketStream;

pub(crate) static ADMIN_CONNECTIONS: Lazy<
futures::lock::Mutex<HashMap<String, Arc<futures::lock::Mutex<WebSocketStream<TcpStream>>>>>,
> = Lazy::new(Default::default);

#[derive(Debug, Snafu)]
pub(crate) enum AdminCallError {
Expand All @@ -24,44 +33,45 @@ pub(crate) enum AdminCallError {
pub(crate) async fn admin_call(id: String, message: Vec<u8>) -> Result<Vec<u8>, AdminCallError> {
println!("admin_interface_call id: {:?}", id);

let port = PLAYERS
.read()
.get(&id)
.map(|player| player.admin_port)
.context(PlayerNotConfigured { id })?;

let addr = format!("localhost:{port}");
let stream = tokio::net::TcpStream::connect(addr.clone())
let mut admin_connections = ADMIN_CONNECTIONS.lock().await;
if !admin_connections.contains_key(&id) {
let port = PLAYERS
.read()
.get(&id)
.map(|player| player.admin_port)
.context(PlayerNotConfigured { id: id.clone() })?;

let stream = tokio::net::TcpStream::connect(("localhost", port))
.await
.context(TcpConnect)?;

let uri = format!("ws://localhost:{}", port);
let mut request = uri.clone().into_client_request().expect("not a valid URI");
// needed for admin websocket connection to be accepted
request.headers_mut().insert(
"origin",
"trycp-admin".parse().expect("invalid origin header value"),
);
request.body();

println!("Establishing admin interface with {:?}", uri);

let (ws_stream, _) = tokio_tungstenite::client_async_with_config(
request,
stream,
Some(WebSocketConfig::default()),
)
.await
.context(TcpConnect)?;
let uri = format!("ws://{}", addr);
let mut request = uri.clone().into_client_request().expect("not a valid URI");
// needed for admin websocket connection to be accepted
request.headers_mut().insert(
"origin",
"trycp-admin".parse().expect("invalid origin header value"),
);
request.body();

println!("Establishing admin interface with {:?}", uri);

let (mut ws_stream, _) = tokio_tungstenite::client_async_with_config(
request,
stream,
Some(WebSocketConfig::default()),
)
.await
.context(WsConnect)?;
.context(WsConnect)?;

println!("Established admin interface");
println!("Established admin interface");

admin_connections.insert(id.clone(), Arc::new(futures::lock::Mutex::new(ws_stream)));
}

let mut ws_stream = admin_connections[&id].lock().await;

let call_result = call(&mut ws_stream, 0, message).await;
let _ = ws_stream
.send(Message::Close(Some(CloseFrame {
code: CloseCode::Normal,
reason: "fulfilled purpose".into(),
})))
.await;

Ok(call_result?)
}
Expand Down Expand Up @@ -147,3 +157,25 @@ async fn call(

Ok(response_data)
}

#[derive(Debug, Snafu)]
pub(crate) enum AdminDisconnectError {
#[snafu(display("Couldn't complete closing handshake: {}", source))]
CloseHandshake { source: tungstenite::Error },
}

pub(crate) async fn disconnect(
ws_stream: Arc<Mutex<WebSocketStream<TcpStream>>>,
) -> Result<(), AdminDisconnectError> {
let _ = ws_stream
.lock()
.await
.send(Message::Close(Some(CloseFrame {
code: CloseCode::Normal,
reason: "fulfilled purpose".into(),
})))
.await
.context(CloseHandshake)?;

Ok(())
}
14 changes: 7 additions & 7 deletions crates/trycp_server/src/app_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub(crate) struct Connection {
pending_requests: PendingRequests,
}

pub(crate) static CONNECTIONS: Lazy<
pub(crate) static APP_CONNECTIONS: Lazy<
futures::lock::Mutex<HashMap<u16, Arc<futures::lock::Mutex<Option<Connection>>>>>,
> = Lazy::new(Default::default);

Expand Down Expand Up @@ -61,7 +61,7 @@ pub(crate) async fn connect(
port: u16,
response_writer: Arc<futures::lock::Mutex<WsResponseWriter>>,
) -> Result<(), ConnectError> {
let connection_lock = Arc::clone(CONNECTIONS.lock().await.entry(port).or_default());
let connection_lock = Arc::clone(APP_CONNECTIONS.lock().await.entry(port).or_default());

let mut connection = connection_lock.lock().await;
if connection.is_some() {
Expand Down Expand Up @@ -196,15 +196,15 @@ pub(crate) async fn listen(
}

#[derive(Debug, Snafu)]
pub(crate) enum DisconnectError {
pub(crate) enum AppDisconnectError {
#[snafu(display("Couldn't complete closing handshake: {}", source))]
CloseHandshake { source: tungstenite::Error },
#[snafu(display("Couldn't listen on app interface: {}", source))]
Listen { source: ListenError },
}

pub(crate) async fn disconnect_by_port(port: u16) -> Result<(), DisconnectError> {
let connection_lock = match CONNECTIONS.lock().await.get(&port) {
pub(crate) async fn disconnect_by_port(port: u16) -> Result<(), AppDisconnectError> {
let connection_lock = match APP_CONNECTIONS.lock().await.get(&port) {
Some(connection_lock) => Arc::clone(connection_lock),
None => return Ok(()),
};
Expand All @@ -214,7 +214,7 @@ pub(crate) async fn disconnect_by_port(port: u16) -> Result<(), DisconnectError>

pub(crate) async fn disconnect(
connection_lock: Arc<futures::lock::Mutex<Option<Connection>>>,
) -> Result<(), DisconnectError> {
) -> Result<(), AppDisconnectError> {
let mut connection_guard = connection_lock.lock().await;
let Connection {
mut request_writer,
Expand Down Expand Up @@ -251,7 +251,7 @@ pub(crate) enum CallError {

pub(crate) async fn call(request_id: u64, port: u16, message: Vec<u8>) -> Result<(), CallError> {
let connection_lock = Arc::clone(
CONNECTIONS
APP_CONNECTIONS
.lock()
.await
.get(&port)
Expand Down
12 changes: 6 additions & 6 deletions crates/trycp_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,12 @@ async fn ws_message(
})
.await
.unwrap(),
Request::Shutdown { id, signal } => spawn_blocking(move || {
let resp = shutdown::shutdown(id, signal).map_err(|e| e.to_string());
serialize_resp(request_id, resp)
})
.await
.unwrap(),
Request::Shutdown { id, signal } => serialize_resp(
request_id,
shutdown::shutdown(id, signal)
.await
.map_err(|e| e.to_string()),
),
Request::Reset => spawn_blocking(move || {
serialize_resp(request_id, reset::reset().map_err(|e| e.to_string()))
})
Expand Down
18 changes: 14 additions & 4 deletions crates/trycp_server/src/reset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ pub(crate) enum ResetError {
}

pub(crate) fn reset() -> Result<(), ResetError> {
let (players, connections) = {
let (players, app_connections, admin_connections) = {
let mut players_guard = PLAYERS.write();
let mut connections_guard = futures::executor::block_on(app_interface::CONNECTIONS.lock());
let mut app_connections_guard =
futures::executor::block_on(app_interface::APP_CONNECTIONS.lock());
let mut admin_connections_guard =
futures::executor::block_on(crate::admin_call::ADMIN_CONNECTIONS.lock());
NEXT_ADMIN_PORT.store(FIRST_ADMIN_PORT, atomic::Ordering::SeqCst);
match std::fs::remove_dir_all(PLAYERS_DIR_PATH) {
Ok(()) => {}
Expand All @@ -29,11 +32,12 @@ pub(crate) fn reset() -> Result<(), ResetError> {
}
(
std::mem::take(&mut *players_guard),
std::mem::take(&mut *connections_guard),
std::mem::take(&mut *app_connections_guard),
std::mem::take(&mut *admin_connections_guard),
)
};

for (port, connection) in connections {
for (port, connection) in app_connections {
if let Err(e) = futures::executor::block_on(app_interface::disconnect(connection)) {
println!(
"warn: failed to disconnect app interface at port {}: {}",
Expand All @@ -42,6 +46,12 @@ pub(crate) fn reset() -> Result<(), ResetError> {
}
}

for (_, connection) in admin_connections {
if let Err(e) = futures::executor::block_on(crate::admin_call::disconnect(connection)) {
println!("warn: failed to disconnect admin interface: {}", e);
}
}

for (id, mut player) in players {
if let Err(e) = kill_player(player.processes.get_mut(), &id, Signal::SIGKILL) {
println!("warn: failed to kill player {:?}: {}", id, e);
Expand Down
29 changes: 19 additions & 10 deletions crates/trycp_server/src/shutdown.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::admin_call::ADMIN_CONNECTIONS;
use crate::{kill_player, player_config_exists, KillError, PLAYERS};
use nix::sys::signal::Signal;
use snafu::{ensure, Snafu};

use crate::{kill_player, player_config_exists, KillError, PLAYERS};
use tokio::task::spawn_blocking;

#[derive(Debug, Snafu)]
pub(crate) enum ShutdownError {
Expand All @@ -13,9 +14,11 @@ pub(crate) enum ShutdownError {
Kill { source: KillError },
}

pub(crate) fn shutdown(id: String, signal: Option<String>) -> Result<(), ShutdownError> {
pub(crate) async fn shutdown(id: String, signal: Option<String>) -> Result<(), ShutdownError> {
ensure!(player_config_exists(&id), PlayerNotConfigured { id });

ADMIN_CONNECTIONS.lock().await.remove(&id);

let signal = match signal.as_deref() {
Some("SIGTERM") | None => Signal::SIGTERM,
Some("SIGKILL") => Signal::SIGKILL,
Expand All @@ -27,15 +30,21 @@ pub(crate) fn shutdown(id: String, signal: Option<String>) -> Result<(), Shutdow
}
};

let players_guard = PLAYERS.read();
let processes_lock = match players_guard.get(&id) {
Some(player) => &player.processes,
None => return Ok(()),
};
spawn_blocking(move || -> Result<(), ShutdownError> {
let players_guard = PLAYERS.read();
let processes_lock = match players_guard.get(&id) {
Some(player) => &player.processes,
None => return Ok(()),
};

let mut player_cell = processes_lock.lock();

let mut player_cell = processes_lock.lock();
kill_player(&mut player_cell, &id, signal)?;

kill_player(&mut player_cell, &id, signal)?;
Ok(())
})
.await
.expect("Task to kill player should have completed")?;

Ok(())
}
2 changes: 0 additions & 2 deletions crates/trycp_server/src/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use crate::{
pub enum Error {
#[snafu(display("Could not find a configuration for player with ID {:?}", id))]
PlayerNotConfigured { id: String },
#[snafu(display("Could not create directory for lair-shim at {}: {}", path.display(), source))]
CreateDir { path: PathBuf, source: io::Error },
#[snafu(display("Could not create log file at {} for lair-keystore's stdout: {}", path.display(), source))]
CreateLairStdoutFile { path: PathBuf, source: io::Error },
#[snafu(display("Could not spawn lair-keystore: {}", source))]
Expand Down