Skip to content

Commit

Permalink
wip: add implementation for default HttpProxy methods
Browse files Browse the repository at this point in the history
  • Loading branch information
SantiagoPittella committed Dec 11, 2024
1 parent 597cadb commit 63d790e
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/tx-prover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pingora-limits = "0.4"
[dependencies]
async-trait = "0.1"
axum = {version = "0.7", optional = true }
bytes = "1.0"
clap = { version = "4.5", features = ["derive"] }
figment = { version = "0.10", features = ["toml", "env"] }
miden-lib = { workspace = true, default-features = false }
Expand Down
144 changes: 144 additions & 0 deletions bin/tx-prover/src/proxy/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::{collections::VecDeque, future::Future, pin::Pin, sync::Arc, time::Duration};

use async_trait::async_trait;
use bytes::Bytes;
use once_cell::sync::Lazy;
use pingora::{
http::ResponseHeader,
lb::Backend,
prelude::*,
protocols::Digest,
server::ShutdownWatch,
services::background::BackgroundService,
upstreams::peer::{Peer, ALPN},
Expand Down Expand Up @@ -641,6 +644,147 @@ impl ProxyHttp for LoadBalancerWrapper {
self.0.add_available_worker(worker).await;
}
}

// The following methods are a copy of the default implementation defined in the trait, but
// with tracing instrumentation.

async fn early_request_filter(
&self,
_session: &mut Session,
ctx: &mut Self::CTX,
) -> Result<()> {
let span = info_span!(parent: &ctx.parent_span, "proxy:early_request_filter");
let _guard = span.enter();
Ok(())
}

async fn connected_to_upstream(
&self,
_session: &mut Session,
_reused: bool,
_peer: &HttpPeer,
#[cfg(unix)] _fd: std::os::unix::io::RawFd,
#[cfg(windows)] _sock: std::os::windows::io::RawSocket,
_digest: Option<&Digest>,
_ctx: &mut Self::CTX,
) -> Result<()> {
let span = info_span!(parent: &_ctx.parent_span, "proxy:connected_to_upstream");
let _guard = span.enter();

Ok(())
}

async fn request_body_filter(
&self,
_session: &mut Session,
_body: &mut Option<Bytes>,
_end_of_stream: bool,
_ctx: &mut Self::CTX,
) -> Result<()> {
let span = info_span!(parent: &_ctx.parent_span, "proxy:request_body_filter");
let _guard = span.enter();
Ok(())
}

fn upstream_response_filter(
&self,
_session: &mut Session,
_upstream_response: &mut ResponseHeader,
_ctx: &mut Self::CTX,
) {
let span = info_span!(parent: &_ctx.parent_span, "proxy:upstream_response_filter");
let _guard = span.enter();
}

async fn response_filter(
&self,
_session: &mut Session,
_upstream_response: &mut ResponseHeader,
_ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
let span = info_span!(parent: &_ctx.parent_span, "proxy:response_filter");
let _guard = span.enter();
Ok(())
}

fn upstream_response_body_filter(
&self,
_session: &mut Session,
_body: &mut Option<Bytes>,
_end_of_stream: bool,
_ctx: &mut Self::CTX,
) {
let span = info_span!(parent: &_ctx.parent_span, "proxy:upstream_response_body_filter");
let _guard = span.enter();
}

fn response_body_filter(
&self,
_session: &mut Session,
_body: &mut Option<Bytes>,
_end_of_stream: bool,
_ctx: &mut Self::CTX,
) -> Result<Option<Duration>>
where
Self::CTX: Send + Sync,
{
let span = info_span!(parent: &_ctx.parent_span, "proxy:response_body_filter");
let _guard = span.enter();
Ok(None)
}

async fn fail_to_proxy(&self, session: &mut Session, e: &Error, _ctx: &mut Self::CTX) -> u16
where
Self::CTX: Send + Sync,
{
let span = info_span!(parent: &_ctx.parent_span, "proxy:fail_to_proxy");
let _guard = span.enter();

let server_session = session.as_mut();
let code = match e.etype() {
HTTPStatus(code) => *code,
_ => {
match e.esource() {
ErrorSource::Upstream => 502,
ErrorSource::Downstream => {
match e.etype() {
WriteError | ReadError | ConnectionClosed => {
/* conn already dead */
0
},
_ => 400,
}
},
ErrorSource::Internal | ErrorSource::Unset => 500,
}
},
};
if code > 0 {
server_session.respond_error(code).await
}
code
}

fn error_while_proxy(
&self,
peer: &HttpPeer,
session: &mut Session,
e: Box<Error>,
_ctx: &mut Self::CTX,
client_reused: bool,
) -> Box<Error> {
let span = info_span!(parent: &_ctx.parent_span, "proxy:error_while_proxy");
let _guard = span.enter();

let mut e = e.more_context(format!("Peer: {}", peer));
// only reused client connections where retry buffer is not truncated
e.retry
.decide_reuse(client_reused && !session.as_ref().retry_buffer_truncated());
e
}
}

/// Implement the BackgroundService trait for the LoadBalancer
Expand Down

0 comments on commit 63d790e

Please sign in to comment.