Skip to content

Commit

Permalink
Clean up stale testruns & logging
Browse files Browse the repository at this point in the history
- log gw identity key
- better agent testrun logging
- log responses
- change response code for agents
  • Loading branch information
dynco-nym committed Oct 30, 2024
1 parent faad721 commit e918901
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 42 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion nym-node-status-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[package]
name = "nym-node-status-api"
version = "0.1.2"
version = "0.1.3"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
Expand Down
6 changes: 0 additions & 6 deletions nym-node-status-api/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,13 @@ async fn main() -> Result<()> {
// not a valid windows path... but hey, it works...
println!("cargo::rustc-env=DATABASE_URL=sqlite:///{}", &database_path);

rerun_if_changed();
Ok(())
}

fn read_env_var(var: &str) -> Result<String> {
std::env::var(var).map_err(|_| anyhow!("You need to set {} env var", var))
}

fn rerun_if_changed() {
println!("cargo::rerun-if-changed=migrations");
println!("cargo::rerun-if-changed=src/db/queries");
}

/// use `./enter_db.sh` to inspect DB
async fn write_db_path_to_file(out_dir: &str, db_filename: &str) -> anyhow::Result<()> {
let mut file = File::create("enter_db.sh").await?;
Expand Down
2 changes: 1 addition & 1 deletion nym-node-status-api/src/db/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ pub struct TestRunDto {
pub(crate) enum TestRunStatus {
Complete = 2,
InProgress = 1,
Pending = 0,
Queued = 0,
}

#[derive(Debug, Clone)]
Expand Down
20 changes: 20 additions & 0 deletions nym-node-status-api/src/db/queries/gateways.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,28 @@ use crate::{
};
use futures_util::TryStreamExt;
use nym_validator_client::models::DescribedGateway;
use sqlx::{pool::PoolConnection, Sqlite};
use tracing::error;

pub(crate) async fn select_gateway_identity(
conn: &mut PoolConnection<Sqlite>,
gateway_pk: i64,
) -> anyhow::Result<String> {
let record = sqlx::query!(
r#"SELECT
gateway_identity_key
FROM
gateways
WHERE
id = ?"#,
gateway_pk
)
.fetch_one(conn.as_mut())
.await?;

Ok(record.gateway_identity_key)
}

pub(crate) async fn insert_gateways(
pool: &DbPool,
gateways: Vec<GatewayRecord>,
Expand Down
2 changes: 1 addition & 1 deletion nym-node-status-api/src/db/queries/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod summary;
pub(crate) mod testruns;

pub(crate) use gateways::{
ensure_gateways_still_bonded, get_all_gateways, insert_gateways,
ensure_gateways_still_bonded, get_all_gateways, insert_gateways, select_gateway_identity,
write_blacklisted_gateways_to_db,
};
pub(crate) use misc::insert_summaries;
Expand Down
54 changes: 46 additions & 8 deletions nym-node-status-api/src/db/queries/testruns.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use crate::db::DbPool;
use crate::http::models::TestrunAssignment;
use crate::{
db::models::{TestRunDto, TestRunStatus},
testruns::now_utc,
};
use anyhow::Context;
use chrono::Duration;
use sqlx::{pool::PoolConnection, Sqlite};

pub(crate) async fn get_testrun_by_id(
pub(crate) async fn get_in_progress_testrun_by_id(
conn: &mut PoolConnection<Sqlite>,
testrun_id: i64,
) -> anyhow::Result<TestRunDto> {
Expand All @@ -20,20 +22,56 @@ pub(crate) async fn get_testrun_by_id(
ip_address as "ip_address!",
log as "log!"
FROM testruns
WHERE id = ?
WHERE
id = ?
AND
status = ?
ORDER BY timestamp_utc"#,
testrun_id
testrun_id,
TestRunStatus::InProgress as i64,
)
.fetch_one(conn.as_mut())
.await
.context(format!("Couldn't retrieve testrun {testrun_id}"))
}

pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> anyhow::Result<u64> {
let mut conn = db.acquire().await?;
let previous_run = now_utc() - age;
let cutoff_timestamp = previous_run.timestamp();

let res = sqlx::query!(
r#"UPDATE
testruns
SET
status = ?
WHERE
status = ?
AND
timestamp_utc < ?
"#,
TestRunStatus::Queued as i64,
TestRunStatus::InProgress as i64,
cutoff_timestamp
)
.execute(conn.as_mut())
.await?;

let stale_testruns = res.rows_affected();
if stale_testruns > 0 {
tracing::debug!(
"Refreshed {} stale testruns, scheduled before {} but not yet finished",
stale_testruns,
previous_run
);
}

Ok(stale_testruns)
}

pub(crate) async fn get_oldest_testrun_and_make_it_pending(
// TODO dz accept mut reference, repeat in all similar functions
conn: PoolConnection<Sqlite>,
conn: &mut PoolConnection<Sqlite>,
) -> anyhow::Result<Option<TestrunAssignment>> {
let mut conn = conn;
let assignment = sqlx::query_as!(
TestrunAssignment,
r#"UPDATE testruns
Expand All @@ -51,9 +89,9 @@ pub(crate) async fn get_oldest_testrun_and_make_it_pending(
gateway_id as "gateway_pk_id!"
"#,
TestRunStatus::InProgress as i64,
TestRunStatus::Pending as i64,
TestRunStatus::Queued as i64,
)
.fetch_optional(&mut *conn)
.fetch_optional(conn.as_mut())
.await?;

Ok(assignment)
Expand Down
10 changes: 8 additions & 2 deletions nym-node-status-api/src/http/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use anyhow::anyhow;
use axum::{response::Redirect, Router};
use tokio::net::ToSocketAddrs;
use tower_http::{cors::CorsLayer, trace::TraceLayer};
use tower_http::{
cors::CorsLayer,
trace::{DefaultOnResponse, TraceLayer},
};
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;

Expand Down Expand Up @@ -58,7 +61,10 @@ impl RouterBuilder {
// CORS layer needs to wrap other API layers
.layer(setup_cors())
// logger should be outermost layer
.layer(TraceLayer::new_for_http())
.layer(
TraceLayer::new_for_http()
.on_response(DefaultOnResponse::new().level(tracing::Level::DEBUG)),
)
}
}

Expand Down
47 changes: 32 additions & 15 deletions nym-node-status-api/src/http/api/testruns.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use axum::extract::DefaultBodyLimit;
use axum::Json;
use axum::{
extract::{Path, State},
Expand All @@ -23,31 +24,40 @@ pub(crate) fn routes() -> Router<AppState> {
Router::new()
.route("/", axum::routing::get(request_testrun))
.route("/:testrun_id", axum::routing::post(submit_testrun))
.layer(DefaultBodyLimit::max(1024 * 1024 * 5))
}

#[tracing::instrument(level = "debug", skip_all)]
async fn request_testrun(State(state): State<AppState>) -> HttpResult<Json<TestrunAssignment>> {
// TODO dz log agent's key
// TODO dz log agent's network probe version
tracing::debug!("Agent X requested testrun");
tracing::debug!("Agent requested testrun");

let db = state.db_pool();
let conn = db
let mut conn = db
.acquire()
.await
.map_err(HttpError::internal_with_logging)?;

return match db::queries::testruns::get_oldest_testrun_and_make_it_pending(conn).await {
return match db::queries::testruns::get_oldest_testrun_and_make_it_pending(&mut conn).await {
Ok(res) => {
if let Some(testrun) = res {
let gw_identity =
db::queries::select_gateway_identity(&mut conn, testrun.gateway_pk_id)
.await
.map_err(|_| {
// should never happen:
HttpError::internal_with_logging("No gateway found for testrun")
})?;
// TODO dz consider adding a column to testruns table with agent's public key
tracing::debug!(
"🏃‍ Assigned testrun row_id {} to agent X",
&testrun.testrun_id
"🏃‍ Assigned testrun row_id {} gateway {} to agent",
&testrun.testrun_id,
gw_identity
);
Ok(Json(testrun))
} else {
Err(HttpError::not_found("No testruns available"))
Err(HttpError::no_available_testruns())
}
}
Err(err) => Err(HttpError::internal_with_logging(err)),
Expand All @@ -61,25 +71,32 @@ async fn submit_testrun(
State(state): State<AppState>,
body: String,
) -> HttpResult<StatusCode> {
tracing::debug!(
"Agent submitted testrun {}. Total length: {}",
testrun_id,
body.len(),
);
// TODO dz store testrun results

let db = state.db_pool();
let mut conn = db
.acquire()
.await
.map_err(HttpError::internal_with_logging)?;

let testrun = queries::testruns::get_testrun_by_id(&mut conn, testrun_id)
let testrun = queries::testruns::get_in_progress_testrun_by_id(&mut conn, testrun_id)
.await
.map_err(|e| {
tracing::error!("{e}");
HttpError::not_found(testrun_id)
})?;

let gw_identity = db::queries::select_gateway_identity(&mut conn, testrun.gateway_id)
.await
.map_err(|_| {
// should never happen:
HttpError::internal_with_logging("No gateway found for testrun")
})?;
tracing::debug!(
"Agent submitted testrun {} for gateway {} ({} bytes)",
testrun_id,
gw_identity,
body.len(),
);

// TODO dz this should be part of a single transaction: commit after everything is done
queries::testruns::update_testrun_status(&mut conn, testrun_id, TestRunStatus::Complete)
.await
Expand All @@ -99,7 +116,7 @@ async fn submit_testrun(
tracing::info!(
"✅ Testrun row_id {} for gateway {} complete",
testrun.id,
testrun.gateway_id
gw_identity
);

Ok(StatusCode::CREATED)
Expand Down
10 changes: 8 additions & 2 deletions nym-node-status-api/src/http/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ pub(crate) struct HttpError {
}

impl HttpError {
pub(crate) fn invalid_input(message: String) -> Self {
pub(crate) fn invalid_input(msg: impl Display) -> Self {
Self {
message,
message: serde_json::json!({"message": msg.to_string()}).to_string(),
status: axum::http::StatusCode::BAD_REQUEST,
}
}
Expand All @@ -27,6 +27,12 @@ impl HttpError {
}
}

pub(crate) fn no_available_testruns() -> Self {
Self {
message: serde_json::json!({"message": "No available testruns"}).to_string(),
status: axum::http::StatusCode::SERVICE_UNAVAILABLE,
}
}
pub(crate) fn not_found(msg: impl Display) -> Self {
Self {
message: serde_json::json!({"message": msg.to_string()}).to_string(),
Expand Down
16 changes: 13 additions & 3 deletions nym-node-status-api/src/testruns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ pub(crate) use queue::now_utc;
pub(crate) async fn spawn(pool: DbPool, refresh_interval: Duration) {
tokio::spawn(async move {
loop {
tracing::info!("Spawning testruns...");
if let Err(e) = refresh_stale_testruns(&pool, refresh_interval).await {
tracing::error!("{e}");
}

if let Err(e) = run(&pool).await {
tracing::error!("Cron job failed: {}", e);
tracing::error!("Assigning testruns failed: {}", e);
}
tracing::debug!("Sleeping for {}s...", refresh_interval.as_secs());
tokio::time::sleep(refresh_interval).await;
Expand All @@ -24,9 +26,9 @@ pub(crate) async fn spawn(pool: DbPool, refresh_interval: Duration) {

// TODO dz make number of max agents configurable

// TODO dz periodically clean up stale pending testruns
#[instrument(level = "debug", name = "testrun_queue", skip_all)]
async fn run(pool: &DbPool) -> anyhow::Result<()> {
tracing::info!("Spawning testruns...");
if pool.is_closed() {
tracing::debug!("DB pool closed, returning early");
return Ok(());
Expand Down Expand Up @@ -74,3 +76,11 @@ async fn run(pool: &DbPool) -> anyhow::Result<()> {

Ok(())
}

#[instrument(level = "debug", skip_all)]
async fn refresh_stale_testruns(pool: &DbPool, refresh_interval: Duration) -> anyhow::Result<()> {
let chrono_duration = chrono::Duration::from_std(refresh_interval)?;
crate::db::queries::testruns::update_testruns_older_than(pool, chrono_duration).await?;

Ok(())
}
4 changes: 2 additions & 2 deletions nym-node-status-api/src/testruns/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub(crate) async fn try_queue_testrun(
//
// save test run
//
let status = TestRunStatus::Pending as u32;
let status = TestRunStatus::Queued as u32;
let log = format!(
"Test for {identity_key} requested at {} UTC\n\n",
timestamp_pretty
Expand All @@ -103,7 +103,7 @@ pub(crate) async fn try_queue_testrun(
Ok(TestRun {
id: id as u32,
identity_key,
status: format!("{}", TestRunStatus::Pending),
status: format!("{}", TestRunStatus::Queued),
log,
})
}
Expand Down

0 comments on commit e918901

Please sign in to comment.