Skip to content

Commit

Permalink
Use non-contiguous buffer for codec and transport
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoyawei committed Nov 16, 2023
1 parent 07e4ee1 commit 35cfa8e
Show file tree
Hide file tree
Showing 36 changed files with 2,128 additions and 316 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ members = [
"examples",
"codegen",
"interop", # Tests
"benches/arrow-flight",
"tests/disable_comments",
"tests/included_service",
"tests/same_name",
Expand Down
25 changes: 25 additions & 0 deletions benches/arrow-flight/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
authors = ["Xiaoya Wei <[email protected]>"]
edition = "2021"
license = "MIT"
name = "arrow-flight"
publish = false
version = "0.1.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
prost = "0.12"
tokio = { version = "1.0", features = ["rt-multi-thread"] }
tonic = { path = "../../tonic" }

[build-dependencies]
tonic-build = { path = "../../tonic-build", features = ["prost"] }

[dev-dependencies]
bencher = "0.1.5"
futures-util = { version = "0.3", default-features = false }

[[bench]]
harness = false
name = "end_to_end"
222 changes: 222 additions & 0 deletions benches/arrow-flight/benches/end_to_end.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
use arrow_flight::{arrow, client::FlightClient, server::FlightService};
use bencher::{benchmark_group, benchmark_main};
use prost::bytes::Bytes;
use std::{sync::Arc, time::Duration};
use tokio::{sync::mpsc, time};
use tonic::codegen::tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tonic::transport::Server;

#[derive(Default, Debug)]
struct Opts {
manual_client: bool,
manual_server: bool,
request_chunks: usize,
request_payload: arrow::FlightData,
response_payload: arrow::FlightData,
}

fn req_64kb_resp_64kb_10_chunks_native_client_native_server(b: &mut bencher::Bencher) {
opts()
.request_chunks(10)
.request_payload(make_payload(64 * 1024))
.response_payload(make_payload(64 * 1024))
.bench(b);
}

fn req_64kb_resp_64kb_10_chunks_native_client_manual_server(b: &mut bencher::Bencher) {
opts()
.manual_server()
.request_chunks(10)
.request_payload(make_payload(64 * 1024))
.response_payload(make_payload(64 * 1024))
.bench(b);
}

fn req_64kb_resp_64kb_10_chunks_manual_client_native_server(b: &mut bencher::Bencher) {
opts()
.manual_client()
.request_chunks(10)
.request_payload(make_payload(64 * 1024))
.response_payload(make_payload(64 * 1024))
.bench(b);
}

fn req_64kb_resp_64kb_10_chunks_manual_client_manual_server(b: &mut bencher::Bencher) {
opts()
.manual_server()
.manual_client()
.request_chunks(10)
.request_payload(make_payload(64 * 1024))
.response_payload(make_payload(64 * 1024))
.bench(b);
}

fn req_1mb_resp_1mb_10_chunks_native_client_native_server(b: &mut bencher::Bencher) {
opts()
.request_chunks(10)
.request_payload(make_payload(1 * 1024 * 1024))
.response_payload(make_payload(1 * 1024 * 1024))
.bench(b);
}

fn req_1mb_resp_1mb_10_chunks_native_client_manual_server(b: &mut bencher::Bencher) {
opts()
.manual_server()
.request_chunks(10)
.request_payload(make_payload(1 * 1024 * 1024))
.response_payload(make_payload(1 * 1024 * 1024))
.bench(b);
}

fn req_1mb_resp_1mb_10_chunks_manual_client_native_server(b: &mut bencher::Bencher) {
opts()
.manual_client()
.request_chunks(10)
.request_payload(make_payload(1 * 1024 * 1024))
.response_payload(make_payload(1 * 1024 * 1024))
.bench(b);
}

fn req_1mb_resp_1mb_10_chunks_manual_client_manual_server(b: &mut bencher::Bencher) {
opts()
.manual_server()
.manual_client()
.request_chunks(10)
.request_payload(make_payload(1 * 1024 * 1024))
.response_payload(make_payload(1 * 1024 * 1024))
.bench(b);
}

fn make_payload(size: usize) -> arrow::FlightData {
arrow::FlightData {
flight_descriptor: Some(arrow::FlightDescriptor {
cmd: Bytes::from("cmd"),
path: vec!["/path/to/data".to_string()],
..Default::default()
}),
data_header: Bytes::from("data_header"),
app_metadata: Bytes::from("app_metadata"),
data_body: Bytes::from(vec![b'a'; size]),
}
}

fn opts() -> Opts {
Opts {
request_chunks: 1,
..Default::default()
}
}

impl Opts {
fn manual_client(mut self) -> Self {
self.manual_client = true;
self
}

fn manual_server(mut self) -> Self {
self.manual_server = true;
self
}

fn request_chunks(mut self, chunks: usize) -> Self {
self.request_chunks = chunks;
self
}

fn request_payload(mut self, payload: arrow::FlightData) -> Self {
self.request_payload = payload;
self
}

fn response_payload(mut self, payload: arrow::FlightData) -> Self {
self.response_payload = payload;
self
}

fn bench(self, b: &mut bencher::Bencher) {
let rt = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("runtime"),
);

b.bytes = ((self.request_payload.data_body.len() + self.response_payload.data_body.len())
* self.request_chunks) as u64;

spawn_server(&rt, &self);

let channel = rt.block_on(async {
time::sleep(Duration::from_millis(100)).await;
tonic::transport::Endpoint::from_static("http://127.0.0.1:1500")
.connect()
.await
.unwrap()
});

let do_exchange = || async {
let mut client = if self.manual_client {
FlightClient::manual(channel.clone())
} else {
FlightClient::native(channel.clone())
};
let (tx, rx) = mpsc::channel(8192);
let mut server_stream = client
.do_exchange(ReceiverStream::new(rx))
.await
.unwrap()
.into_inner();
for _ in 0..self.request_chunks {
tx.send(self.response_payload.clone()).await.unwrap();
server_stream.next().await.unwrap().unwrap();
}
};

b.iter(move || {
rt.block_on(do_exchange());
});
}
}

fn spawn_server(rt: &tokio::runtime::Runtime, opts: &Opts) {
let addr = "127.0.0.1:1500";

let response_payload = opts.response_payload.clone();
let manual_server = opts.manual_server;
let srv = rt.block_on(async move {
let flight_service = FlightService::new(response_payload);
if manual_server {
Server::builder()
.add_service(
arrow::manual::flight_service_server::FlightServiceServer::new(flight_service),
)
.serve(addr.parse().unwrap())
} else {
Server::builder()
.add_service(arrow::flight_service_server::FlightServiceServer::new(
flight_service,
))
.serve(addr.parse().unwrap())
}
});

rt.spawn(async move { srv.await.unwrap() });
}

benchmark_group!(
req_64kb_resp_64kb_10_chunks,
req_64kb_resp_64kb_10_chunks_native_client_native_server,
req_64kb_resp_64kb_10_chunks_manual_client_native_server,
req_64kb_resp_64kb_10_chunks_native_client_manual_server,
req_64kb_resp_64kb_10_chunks_manual_client_manual_server
);

benchmark_group!(
req_1mb_resp_1mb_10_chunks,
req_1mb_resp_1mb_10_chunks_native_client_native_server,
req_1mb_resp_1mb_10_chunks_manual_client_native_server,
req_1mb_resp_1mb_10_chunks_native_client_manual_server,
req_1mb_resp_1mb_10_chunks_manual_client_manual_server
);

benchmark_main!(req_64kb_resp_64kb_10_chunks, req_1mb_resp_1mb_10_chunks);
23 changes: 23 additions & 0 deletions benches/arrow-flight/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
fn main() {
tonic_build::configure()
.bytes(&["."])
.generate_default_stubs(true)
.compile(&["proto/flight.proto"], &["proto"])
.unwrap();

tonic_build::manual::Builder::new().compile(&[tonic_build::manual::Service::builder()
.name("FlightService")
.package("arrow.flight.protocol")
.method(
tonic_build::manual::Method::builder()
.name("do_exchange")
.route_name("DoExchange")
.input_type("crate::arrow::FlightData")
.output_type("crate::arrow::FlightData")
.codec_path("crate::codec::FlightDataCodec")
.client_streaming()
.server_streaming()
.build(),
)
.build()]);
}
Loading

0 comments on commit 35cfa8e

Please sign in to comment.