Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update hyper and axum #137

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.env
/target
/Cargo.lock
.direnv
Expand Down
10 changes: 6 additions & 4 deletions cargo-doc-ngrok/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ description = "A cargo subcommand to build and serve documentation via ngrok"
repository = "https://github.com/ngrok/ngrok-rust"

[dependencies]
axum = "0.6.1"
awaitdrop = "0.1.2"
axum = "0.7.4"
bstr = "1.4.0"
cargo_metadata = "0.15.2"
clap = { version = "4.0.29", features = ["derive"] }
futures = "0.3.25"
http = "0.2.8"
hyper = { version = "0.14.23", features = ["server"] }
hyper-staticfile = "0.9.2"
http = "1.0.0"
hyper = { version = "1.1.0", features = ["server"] }
hyper-staticfile = "0.10.0"
hyper-util = { version = "0.1.3", features = ["server", "tokio", "server-auto", "http1"] }
ngrok = { path = "../ngrok", version = "0.14.0-pre.1", features = ["hyper"] }
tokio = { version = "1.23.0", features = ["full"] }
watchexec = "2.3.0"
Expand Down
57 changes: 44 additions & 13 deletions cargo-doc-ngrok/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
use std::{
error::Error,
io,
path::PathBuf,
process::Stdio,
sync::Arc,
};

use axum::BoxError;
use clap::{
Args,
Parser,
Subcommand,
};
use hyper::service::make_service_fn;
use futures::TryStreamExt;
use hyper::service::service_fn;
use hyper_util::{
rt::TokioExecutor,
server,
};
use ngrok::prelude::*;
use watchexec::{
action::{
Expand Down Expand Up @@ -56,7 +61,7 @@ struct DocNgrok {
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
async fn main() -> Result<(), BoxError> {
let Cmd::DocNgrok(args) = Cargo::parse().cmd;

std::process::Command::new("cargo")
Expand All @@ -82,31 +87,57 @@ async fn main() -> Result<(), Box<dyn Error>> {
.connect()
.await?;

let mut tunnel_cfg = sess.http_endpoint();
let mut listen_cfg = sess.http_endpoint();
if let Some(domain) = args.domain {
tunnel_cfg.domain(domain);
listen_cfg.domain(domain);
}

let tunnel = tunnel_cfg.listen().await?;
let mut listener = listen_cfg.listen().await?;

let service = service_fn(move |req| {
let stat = hyper_staticfile::Static::new(&doc_dir);
stat.serve(req)
});

println!(
"serving docs on: {}/{}/",
tunnel.url(),
listener.url(),
default_package.replace('-', "_")
);

let srv = hyper::server::Server::builder(tunnel).serve(make_service_fn(move |_| {
let stat = hyper_staticfile::Static::new(&doc_dir);
async move { Result::<_, String>::Ok(stat) }
}));
let server = async move {
let (dropref, waiter) = awaitdrop::awaitdrop();

// Continuously accept new connections.
while let Some(conn) = listener.try_next().await? {
let service = service.clone();
let dropref = dropref.clone();
// Spawn a task to handle the connection. That way we can multiple connections
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe "That way we can handle multiple"

// concurrently.
tokio::spawn(async move {
if let Err(err) = server::conn::auto::Builder::new(TokioExecutor::new())
.serve_connection(conn, service)
.await
{
eprintln!("failed to serve connection: {err:#}");
}
drop(dropref);
});
}

// Wait until all children have finished, not just the listener.
drop(dropref);
waiter.await;

Ok::<(), BoxError>(())
};

if args.watch {
let we = make_watcher(args.doc_args, root_dir, target_dir)?;
tokio::spawn(srv);

we.main().await??;
} else {
srv.await?;
server.await?;
}

Ok(())
Expand Down
18 changes: 15 additions & 3 deletions ngrok/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ tracing = "0.1.37"
futures-rustls = { version = "0.25.1" }
tokio-util = { version = "0.7.4", features = ["compat"] }
futures = "0.3.25"
hyper = { version = "0.14.23" }
axum = { version = "0.6.1", features = ["tokio"], optional = true }
hyper-0-14 = { package = "hyper", version = "0.14" }
hyper = { version = "1.1.0", optional = true }
axum = { version = "0.7.4", features = ["tokio"], optional = true }
rustls-pemfile = "2.0.0"
async-trait = "0.1.59"
bytes = "1.3.0"
Expand All @@ -43,11 +44,20 @@ url = "2.4.0"
rustls-native-certs = "0.7.0"
proxy-protocol = "0.5.0"
pin-project = "1.1.3"
axum-core = "0.4.3"
futures-util = "0.3.30"

[target.'cfg(windows)'.dependencies]
windows-sys = { version = "0.45.0", features = ["Win32_Foundation"] }

[dev-dependencies]
hyper = "1.1.0"
hyper-util = { version = "0.1.3", features = [
"tokio",
"server",
"http1",
"http2",
] }
tokio = { version = "1.23.0", features = ["full"] }
anyhow = "1.0.66"
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
Expand All @@ -56,10 +66,12 @@ flate2 = "1.0.25"
tracing-test = "0.2.3"
rand = "0.8.5"
paste = "1.0.11"
tokio-tungstenite = { version = "0.18.0", features = [
tokio-tungstenite = { version = "0.21.0", features = [
"rustls",
"rustls-tls-webpki-roots",
] }
tower = "0.4.13"
axum = { version = "0.7.4", features = ["tokio"] }

[[example]]
name = "tls"
Expand Down
79 changes: 57 additions & 22 deletions ngrok/examples/axum.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
use std::net::SocketAddr;
use std::{
convert::Infallible,
net::SocketAddr,
};

use axum::{
extract::ConnectInfo,
routing::get,
Router,
};
use ngrok::{
prelude::*,
tunnel::HttpTunnel,
use axum_core::BoxError;
use futures::stream::TryStreamExt;
use hyper::{
body::Incoming,
Request,
};
use hyper_util::{
rt::TokioExecutor,
server,
};
use ngrok::prelude::*;
use tower::{
Service,
ServiceExt,
};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
async fn main() -> Result<(), BoxError> {
// build our application with a single route
let app = Router::new().route(
"/",
Expand All @@ -22,21 +36,7 @@ async fn main() -> anyhow::Result<()> {
),
);

// run it with hyper on localhost:8000
// axum::Server::bind(&"0.0.0.0:8000".parse().unwrap())
// Or with an ngrok tunnel
axum::Server::builder(start_tunnel().await?)
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
.await
.unwrap();

Ok(())
}

// const CA_CERT: &[u8] = include_bytes!("ca.crt");

async fn start_tunnel() -> anyhow::Result<HttpTunnel> {
let tun = ngrok::Session::builder()
let mut listener = ngrok::Session::builder()
.authtoken_from_env()
.connect()
.await?
Expand Down Expand Up @@ -74,9 +74,35 @@ async fn start_tunnel() -> anyhow::Result<HttpTunnel> {
.listen()
.await?;

println!("Tunnel started on URL: {:?}", tun.url());
println!("Listener started on URL: {:?}", listener.url());

let mut make_service = app.into_make_service_with_connect_info::<SocketAddr>();

Ok(tun)
let server = async move {
while let Some(conn) = listener.try_next().await? {
let remote_addr = conn.remote_addr();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block shows up a bit, could it be made into a re-usable component for ourselves and users, or does that get too messy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so this is what I originally tried to do and burned most of monday on.

We could

  • Support hyper's Service trait directly, and expose a generic method that takes one of those. Unfortunately, this means that we can't use axum's Connected trait.
  • Support tower's Service trait. This lets us support axum and its Connected trait better, but ties us to the tower ecosystem. hyper seems like a better bet imo than the higher-level tower APIs, at least for now.

The core issue with hyper's Service is that it expects a Request, which is already interpreted at the protocol level and doesn't include things like the remote addr. Meanwhile, axum's Connected expects that context to be plumbed in through the tower::Service side-channel by seeding it with the original connection stream first before handing the remote-addr-seeded service off to the hyper API.

It's all a mess and I think I'd prefer to support the lowest-common-denominator for now, which seems to be hyper. I'm frustrated that they removed the Accept trait and Server struct and rebased their Service to operate on Requests instead of the connection without providing an alternative. Ideally, axum::serve would've filled that gap, but it's explicitly dependent on TcpListener, so it's a no-go.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eek, makes sense to just be raw hyper and everything else DIY, it's sad they dropped that. I do wonder if these accept loops could be moved to a helper in our sdk so the examples and users would have less to do directly.

let tower_service = unwrap_infallible(make_service.call(remote_addr).await);

tokio::spawn(async move {
let hyper_service =
hyper::service::service_fn(move |request: Request<Incoming>| {
tower_service.clone().oneshot(request)
});

if let Err(err) = server::conn::auto::Builder::new(TokioExecutor::new())
.serve_connection_with_upgrades(conn, hyper_service)
.await
{
eprintln!("failed to serve connection: {err:#}");
}
});
}
Ok::<(), BoxError>(())
};

server.await?;

Ok(())
}

#[allow(dead_code)]
Expand All @@ -103,3 +129,12 @@ fn create_policy() -> Result<Policy, InvalidPolicy> {
)
.to_owned())
}

// const CA_CERT: &[u8] = include_bytes!("ca.crt");

fn unwrap_infallible<T>(result: Result<T, Infallible>) -> T {
match result {
Ok(value) => value,
Err(err) => match err {},
}
}
75 changes: 55 additions & 20 deletions ngrok/examples/labeled.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,32 @@
use std::net::SocketAddr;
use std::{
convert::Infallible,
error::Error,
net::SocketAddr,
};

use axum::{
extract::ConnectInfo,
routing::get,
BoxError,
Router,
};
use ngrok::{
prelude::*,
tunnel::LabeledTunnel,
use futures::TryStreamExt;
use hyper::{
body::Incoming,
Request,
};
use hyper_util::{
rt::TokioExecutor,
server,
};
use ngrok::prelude::*;
use tower::{
Service,
ServiceExt,
};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
// build our application with a single route
let app = Router::new().route(
"/",
Expand All @@ -22,32 +37,52 @@ async fn main() -> anyhow::Result<()> {
),
);

// run it with hyper on localhost:8000
// axum::Server::bind(&"0.0.0.0:8000".parse().unwrap())
// Or with an ngrok tunnel
axum::Server::builder(start_tunnel().await?)
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
.await
.unwrap();

Ok(())
}

async fn start_tunnel() -> anyhow::Result<LabeledTunnel> {
let sess = ngrok::Session::builder()
.authtoken_from_env()
.connect()
.await?;

let tun = sess
let mut listener = sess
.labeled_tunnel()
//.app_protocol("http2")
.label("edge", "edghts_<edge_id>")
.metadata("example tunnel metadata from rust")
.listen()
.await?;

println!("Labeled tunnel started!");
println!("Labeled listener started!");

let mut make_service = app.into_make_service_with_connect_info::<SocketAddr>();

let server = async move {
while let Some(conn) = listener.try_next().await? {
let remote_addr = conn.remote_addr();
let tower_service = unwrap_infallible(make_service.call(remote_addr).await);

tokio::spawn(async move {
let hyper_service =
hyper::service::service_fn(move |request: Request<Incoming>| {
tower_service.clone().oneshot(request)
});

if let Err(err) = server::conn::auto::Builder::new(TokioExecutor::new())
.serve_connection_with_upgrades(conn, hyper_service)
.await
{
eprintln!("failed to serve connection: {err:#}");
}
});
}
Ok::<(), BoxError>(())
};

server.await?;
Ok(())
}

Ok(tun)
fn unwrap_infallible<T>(result: Result<T, Infallible>) -> T {
match result {
Ok(value) => value,
Err(err) => match err {},
}
}
Loading
Loading