Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add error for tx prover proxy #1038

Merged
merged 10 commits into from
Jan 2, 2025
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/tx-prover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ serde_qs = { version = "0.13" }
tokio = { version = "1.38", optional = true, features = ["full"] }
tokio-stream = { version = "0.1", optional = true, features = [ "net" ]}
toml = { version = "0.8" }
thiserror = { workspace = true }
tonic-health = { version = "0.12" }
tonic-web = { version = "0.12", optional = true }
tracing = { version = "0.1", optional = true }
Expand Down
15 changes: 12 additions & 3 deletions bin/tx-prover/src/commands/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use pingora_proxy::http_proxy_service;
use tracing::warn;

use crate::{
error::TxProverServiceError,
proxy::{LoadBalancer, LoadBalancerState},
utils::MIDEN_TX_PROVER,
};
Expand All @@ -30,6 +31,12 @@ impl StartProxy {
///
/// This method will first read the config file to get the parameters for the proxy. It will
/// then start a proxy with each worker passed as command argument as a backend.
///
/// Errors:
/// - If the config file cannot be read.
/// - If the backend cannot be created.
/// - If the Pingora configuration fails.
/// - If the server cannot be started.
bobbinth marked this conversation as resolved.
Show resolved Hide resolved
#[tracing::instrument(target = MIDEN_TX_PROVER, name = "proxy:execute")]
pub async fn execute(&self) -> Result<(), String> {
let mut server = Server::new(Some(Opt::default())).map_err(|err| err.to_string())?;
Expand All @@ -40,8 +47,8 @@ impl StartProxy {
let workers = self
.workers
.iter()
.map(|worker| Backend::new(worker).map_err(|err| err.to_string()))
.collect::<Result<Vec<Backend>, String>>()?;
.map(|worker| Backend::new(worker).map_err(TxProverServiceError::BackendCreationFailed))
.collect::<Result<Vec<Backend>, TxProverServiceError>>()?;

if workers.is_empty() {
warn!("Starting the proxy without any workers");
Expand All @@ -58,7 +65,9 @@ impl StartProxy {
let proxy_host = proxy_config.host;
let proxy_port = proxy_config.port.to_string();
lb.add_tcp(format!("{}:{}", proxy_host, proxy_port).as_str());
let logic = lb.app_logic_mut().ok_or("Failed to get app logic")?;
let logic = lb
.app_logic_mut()
.ok_or(TxProverServiceError::PingoraConfigFailed("app logic not found".to_string()))?;
let mut http_server_options = HttpServerOptions::default();

// Enable HTTP/2 for plaintext
Expand Down
23 changes: 23 additions & 0 deletions bin/tx-prover/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use axum::http::uri::InvalidUri;
use thiserror::Error;

// TX PROVER ERROR
bobbinth marked this conversation as resolved.
Show resolved Hide resolved
// ================================================================================================

#[derive(Debug, Error)]
pub enum TxProverServiceError {
#[error("invalid uri")]
InvalidURI(#[source] InvalidUri),
#[error("failed to connect to worker")]
ConnectionFailed(#[source] tonic::transport::Error),
#[error("failed to create backend for worker")]
BackendCreationFailed(#[source] Box<pingora::Error>),
#[error("failed to setup pingora")]
PingoraConfigFailed(String),
PhilippGackstatter marked this conversation as resolved.
Show resolved Hide resolved
}

impl From<TxProverServiceError> for String {
fn from(err: TxProverServiceError) -> Self {
err.to_string()
}
}
1 change: 1 addition & 0 deletions bin/tx-prover/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod api;
pub mod commands;
pub mod error;
mod proxy;
mod utils;
use commands::Cli;
Expand Down
14 changes: 9 additions & 5 deletions bin/tx-prover/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::{
update_workers::{Action, UpdateWorkers},
ProxyConfig,
},
error::TxProverServiceError,
utils::{
create_queue_full_response, create_response_with_error_message,
create_too_many_requests_response, create_workers_updated_response, MIDEN_TX_PROVER,
Expand All @@ -36,7 +37,7 @@ mod worker;
/// Localhost address
const LOCALHOST_ADDR: &str = "127.0.0.1";

// LOAD BALANCER
// LOAD BALANCER STATE
// ================================================================================================

/// Load balancer that uses a round robin strategy
Expand All @@ -54,11 +55,14 @@ pub struct LoadBalancerState {

impl LoadBalancerState {
/// Create a new load balancer
///
/// Errors:
/// - If the worker cannot be created.
bobbinth marked this conversation as resolved.
Show resolved Hide resolved
#[tracing::instrument(name = "proxy:new_load_balancer", skip(initial_workers))]
pub async fn new(
initial_workers: Vec<Backend>,
config: &ProxyConfig,
) -> core::result::Result<Self, String> {
) -> core::result::Result<Self, TxProverServiceError> {
let mut workers: Vec<Worker> = Vec::with_capacity(initial_workers.len());

let connection_timeout = Duration::from_secs(config.connection_timeout_secs);
Expand Down Expand Up @@ -120,7 +124,7 @@ impl LoadBalancerState {
pub async fn update_workers(
&self,
update_workers: UpdateWorkers,
) -> std::result::Result<(), String> {
) -> std::result::Result<(), TxProverServiceError> {
let mut workers = self.workers.write().await;
info!("Current workers: {:?}", workers);

Expand All @@ -129,7 +133,7 @@ impl LoadBalancerState {
.iter()
.map(|worker| Backend::new(worker))
.collect::<Result<Vec<Backend>, _>>()
.map_err(|err| format!("Failed to create backend: {}", err))?;
.map_err(TxProverServiceError::BackendCreationFailed)?;

let mut native_workers = Vec::new();

Expand Down Expand Up @@ -326,7 +330,7 @@ impl RequestContext {
}
}

// LOAD BALANCER WRAPPER
// LOAD BALANCER
// ================================================================================================

/// Wrapper around the load balancer that implements the ProxyHttp trait
Expand Down
9 changes: 7 additions & 2 deletions bin/tx-prover/src/proxy/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tonic_health::pb::{
};
use tracing::error;

use crate::utils::create_health_check_client;
use crate::{error::TxProverServiceError, utils::create_health_check_client};

// WORKER
// ================================================================================================
Expand All @@ -24,11 +24,16 @@ pub struct Worker {
}

impl Worker {
/// Creates a new worker and a gRPC health check client for the given worker address.
///
/// Errors:
/// - Returns [TxProverServiceError::InvalidURI] if the worker address is invalid.
/// - Returns [TxProverServiceError::ConnectionFailed] if the connection to the worker fails.
bobbinth marked this conversation as resolved.
Show resolved Hide resolved
pub async fn new(
worker: Backend,
connection_timeout: Duration,
total_timeout: Duration,
) -> Result<Self, String> {
) -> Result<Self, TxProverServiceError> {
let health_check_client =
create_health_check_client(worker.addr.to_string(), connection_timeout, total_timeout)
.await?;
Expand Down
17 changes: 11 additions & 6 deletions bin/tx-prover/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use tonic::transport::Channel;
use tonic_health::pb::health_client::HealthClient;
use tracing_subscriber::{layer::SubscriberExt, Registry};

use crate::error::TxProverServiceError;

pub const MIDEN_TX_PROVER: &str = "miden-tx-prover";

const RESOURCE_EXHAUSTED_CODE: u16 = 8;
Expand Down Expand Up @@ -158,18 +160,21 @@ pub async fn create_response_with_error_message(

/// Create a gRPC [HealthClient] for the given worker address.
///
/// It will panic if the worker URI is invalid.
/// Errors:
/// - [TxProverServiceError::InvalidURI] if the worker address is invalid.
/// - [TxProverServiceError::ConnectionFailed] if the connection to the worker fails.
bobbinth marked this conversation as resolved.
Show resolved Hide resolved
pub async fn create_health_check_client(
address: String,
connection_timeout: Duration,
total_timeout: Duration,
) -> Result<HealthClient<Channel>, String> {
Channel::from_shared(format!("http://{}", address))
.map_err(|err| format!("Invalid format for worker URI: {}", err))?
) -> Result<HealthClient<Channel>, TxProverServiceError> {
let channel = Channel::from_shared(format!("http://{}", address))
.map_err(TxProverServiceError::InvalidURI)?
.connect_timeout(connection_timeout)
.timeout(total_timeout)
.connect()
.await
.map(HealthClient::new)
.map_err(|err| format!("Failed to create health check client for worker: {}", err))
.map_err(TxProverServiceError::ConnectionFailed)?;

Ok(HealthClient::new(channel))
}
Loading