Skip to content

Commit

Permalink
Add first opentelemetry metric
Browse files Browse the repository at this point in the history
  • Loading branch information
Ongy committed Jan 3, 2025
1 parent ace8389 commit 79824d2
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 0 deletions.
72 changes: 72 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ tracing-subscriber = { version = "0.3.19", features = ["env-filter", "fmt", "loc
url = "2.5.4"
urlencoding = "2.1.3"
uuid = { version = "1.11.0", features = ["v7", "serde"] }
prometheus = "0.13.4"
opentelemetry = "0.27.1"
opentelemetry_sdk = "0.27.1"
opentelemetry-prometheus = "0.27.0"

[target.'cfg(not(target_family = "unix"))'.dependencies]
crossterm = { version = "0.28.1" }
Expand Down
22 changes: 22 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod embedded_certificate;
mod metrics;
mod protocols;
mod restrictions;
#[cfg(test)]
Expand All @@ -22,6 +23,7 @@ use clap::Parser;
use hyper::header::HOST;
use hyper::http::{HeaderName, HeaderValue};
use log::{debug, warn};
use opentelemetry::global;
use parking_lot::{Mutex, RwLock};
use std::collections::BTreeMap;
use std::fmt::Debug;
Expand Down Expand Up @@ -76,6 +78,15 @@ struct Wstunnel {
default_value = "INFO"
)]
log_lvl: String,

/// Set the listen address for the prometheus metrics exporter.
#[arg(
long,
global = true,
verbatim_doc_comment,
default_value = None,
)]
metrics_provider_address: Option<String>,
}

#[derive(clap::Subcommand, Debug)]
Expand Down Expand Up @@ -739,6 +750,17 @@ async fn main() -> anyhow::Result<()> {
warn!("Failed to set soft filelimit to hard file limit: {}", err)
}

if let Some(addr) = args.metrics_provider_address {
match metrics::setup_metrics_provider(addr.as_str()).await {
Ok(provider) => {
let _ = global::set_meter_provider(provider);
}
Err(err) => {
panic!("Failed to setup metrics server: {err:?}")
}
}
}

match args.commands {
Commands::Client(args) => {
let (tls_certificate, tls_key) = if let (Some(cert), Some(key)) =
Expand Down
71 changes: 71 additions & 0 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use bytes::Bytes;
use http_body_util::Full;
use hyper::service::service_fn;
use hyper::{body, Request, Response, Version};
use hyper_util::rt::TokioExecutor;

use opentelemetry_sdk::metrics::SdkMeterProvider;
use prometheus::{Encoder, TextEncoder};
use tokio::net::TcpListener;
use tracing::{error, info, warn};

pub async fn setup_metrics_provider(addr: &str) -> anyhow::Result<SdkMeterProvider> {
let registry = prometheus::Registry::new();

// configure OpenTelemetry to use this registry
let exporter = opentelemetry_prometheus::exporter()
.with_registry(registry.clone())
.build()?;

// set up a meter to create instruments
let provider = SdkMeterProvider::builder().with_reader(exporter).build();
let listener = TcpListener::bind(addr).await?;
info!("Started metrics server on {}", addr);

tokio::spawn(async move {
loop {
let (stream, _) = match listener.accept().await {
Ok(ret) => ret,
Err(err) => {
warn!("Error while accepting connection on metrics port {:?}", err);
continue;
}
};

let stream = hyper_util::rt::TokioIo::new(stream);
let conn = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
let fut = conn
.serve_connection(
stream,
service_fn(|req: Request<body::Incoming>| {
// Create handler local registry for ownership
let registry = registry.clone();
async move {
let encoder = TextEncoder::new();
let metric_families = registry.gather();
let mut result = Vec::new();
if let Err(err) = encoder.encode(&metric_families, &mut result) {
error!("Failed to encode prometheus metrics: {:?}", err);
return Err("failed to create metrics export")
}

if req.version() == Version::HTTP_11 {
Ok(Response::new(Full::<Bytes>::from(result)))
} else {
// Note: it's usually better to return a Response
// with an appropriate StatusCode instead of an Err.
Err("not HTTP/1.1, abort connection")
}
}
}),
)
.await;

if let Err(err) = fut {
warn!("Failed to handle metrics connection: {:?}", err)
}
}
});

return Ok(provider);
}
21 changes: 21 additions & 0 deletions src/tunnel/server/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use anyhow::anyhow;
use futures_util::FutureExt;
use http_body_util::Either;
use opentelemetry::metrics::Counter;
use opentelemetry::{global, KeyValue};
use std::fmt;
use std::fmt::{Debug, Formatter};

Expand Down Expand Up @@ -64,15 +66,27 @@ pub struct WsServerConfig {
pub http_proxy: Option<Url>,
}

pub struct WsServerMetrics {
pub connections: Counter<u64>,
}

#[derive(Clone)]
pub struct WsServer {
pub config: Arc<WsServerConfig>,
pub metrics: Arc<WsServerMetrics>,
}

impl WsServer {
pub fn new(config: WsServerConfig) -> Self {
let meter = global::meter_provider().meter("wstunnel");
Self {
config: Arc::new(config),
metrics: Arc::new(WsServerMetrics {
connections: meter
.u64_counter("connections_created")
.with_description("Counts the connections created. Attributes allow to split by remote host")
.build(),
}),
}
}

Expand Down Expand Up @@ -142,6 +156,13 @@ impl WsServer {
}
};

self.metrics.connections.add(
1,
&[
KeyValue::new("remote_host", format!("{}", remote.host)),
KeyValue::new("remote_port", i64::from(remote.port)),
],
);
let req_protocol = remote.protocol.clone();
let inject_cookie = req_protocol.is_dynamic_reverse_tunnel();
let tunnel = match self.exec_tunnel(restriction, remote, client_addr).await {
Expand Down

0 comments on commit 79824d2

Please sign in to comment.