Skip to content

Commit

Permalink
Switch to fork of serde_rosmsg, benchmark that it is indeed much fast…
Browse files Browse the repository at this point in the history
…er, fix bug with missing write_all
  • Loading branch information
Carter committed Oct 17, 2024
1 parent b9c6d91 commit 4076ffa
Show file tree
Hide file tree
Showing 16 changed files with 385 additions and 49 deletions.
250 changes: 234 additions & 16 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions docker/noetic_compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ services:
image: carter12s/roslibrust-ci-noetic:rust-1-72
# network_mode host required for ros1 testing
network_mode: host
ports:
- "9090:9090"
# ports:
# - "9090:9090"
# Pass through the ros master port for native ros1 testing
- "11311:11311"
# - "11311:11311"
command: bash -c "source /opt/ros/noetic/setup.bash; roslaunch rosbridge_server rosbridge_websocket.launch & disown; rosrun rosapi rosapi_node"
4 changes: 2 additions & 2 deletions roslibrust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ roslibrust_codegen_macro = { path = "../roslibrust_codegen_macro", version = "0.
roslibrust_codegen = { path = "../roslibrust_codegen", version = "0.10.0" }
reqwest = { version = "0.11", optional = true } # Only used with native ros1
serde_xmlrpc = { version = "0.2", optional = true } # Only used with native ros1
serde_rosmsg = { version = "0.2", optional = true } # Only used with native ros1
roslibrust_serde_rosmsg = { version = "0.3", optional = true } # Only used with native ros1
hyper = { version = "0.14", features = [
"server",
], optional = true } # Only used with native ros1
Expand Down Expand Up @@ -74,7 +74,7 @@ ros1 = [
"dep:hyper",
"dep:gethostname",
"dep:regex",
"dep:serde_rosmsg",
"dep:roslibrust_serde_rosmsg",
]


Expand Down
4 changes: 2 additions & 2 deletions roslibrust/src/ros1/node/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,10 @@ impl NodeServerHandle {
// This gives a generic closure that operates on byte arrays that we can then store and use freely
let server_typeless =
move |message: Vec<u8>| -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
let request = serde_rosmsg::from_slice::<T::Request>(&message)
let request = roslibrust_serde_rosmsg::from_slice::<T::Request>(&message)
.map_err(|err| RosLibRustError::SerializationError(err.to_string()))?;
let response = server(request)?;
Ok(serde_rosmsg::to_vec(&response)
Ok(roslibrust_serde_rosmsg::to_vec(&response)
.map_err(|err| RosLibRustError::SerializationError(err.to_string()))?)
};
let server_typeless = Box::new(server_typeless);
Expand Down
13 changes: 7 additions & 6 deletions roslibrust/src/ros1/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl<T: RosMessageType> Publisher<T> {
/// Queues a message to be send on the related topic.
/// Returns when the data has been queued not when data is actually sent.
pub async fn publish(&self, data: &T) -> Result<(), PublisherError> {
let data = serde_rosmsg::to_vec(&data)?;
let data = roslibrust_serde_rosmsg::to_vec(&data)?;
// TODO this is a pretty dumb...
// because of the internal channel used for re-direction this future doesn't
// actually complete when the data is sent, but merely when it is queued to be sent
Expand Down Expand Up @@ -169,10 +169,11 @@ impl Publication {
loop {
match rx.recv().await {
Some(msg_to_publish) => {
trace!("Publish task got message to publish for topic: {topic}");
let mut streams = subscriber_streams.write().await;
let mut streams_to_remove = vec![];
for (stream_idx, stream) in streams.iter_mut().enumerate() {
if let Err(err) = stream.write(&msg_to_publish[..]).await {
if let Err(err) = stream.write_all(&msg_to_publish[..]).await {
// TODO: A single failure between nodes that cross host boundaries is probably normal, should make this more robust perhaps
debug!("Failed to send data to subscriber: {err}, removing");
streams_to_remove.push(stream_idx);
Expand Down Expand Up @@ -272,7 +273,7 @@ impl Publication {
.to_bytes(false)
.expect("Couldn't serialize connection header");
stream
.write(&response_header_bytes[..])
.write_all(&response_header_bytes[..])
.await
.expect("Unable to respond on tcpstream");

Expand All @@ -282,7 +283,7 @@ impl Publication {
debug!(
"Publication configured to be latching and has last_message, sending"
);
let res = stream.write(last_message).await;
let res = stream.write_all(last_message).await;
match res {
Ok(_) => {}
Err(e) => {
Expand Down Expand Up @@ -320,8 +321,8 @@ pub enum PublisherError {
StreamClosed,
}

impl From<serde_rosmsg::Error> for PublisherError {
fn from(value: serde_rosmsg::Error) -> Self {
impl From<roslibrust_serde_rosmsg::Error> for PublisherError {
fn from(value: roslibrust_serde_rosmsg::Error) -> Self {
Self::SerializingError(value.to_string())
}
}
19 changes: 10 additions & 9 deletions roslibrust/src/ros1/service_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl<T: RosServiceType> ServiceClient<T> {
}

pub async fn call(&self, request: &T::Request) -> RosLibRustResult<T::Response> {
let request_payload = serde_rosmsg::to_vec(request)
let request_payload = roslibrust_serde_rosmsg::to_vec(request)
.map_err(|err| RosLibRustError::SerializationError(err.to_string()))?;
let (response_tx, response_rx) = oneshot::channel();

Expand All @@ -69,7 +69,7 @@ impl<T: RosServiceType> ServiceClient<T> {
self.service_name,
result_payload
);
let response: T::Response = serde_rosmsg::from_slice(&result_payload)
let response: T::Response = roslibrust_serde_rosmsg::from_slice(&result_payload)
.map_err(|err| RosLibRustError::SerializationError(err.to_string()))?;
return Ok(response);
}
Expand Down Expand Up @@ -202,13 +202,14 @@ impl ServiceClientLink {
} else {
// Parse an error message as the body
let error_body = tcpros::receive_body(stream).await?;
let err_msg: String = serde_rosmsg::from_slice(&error_body).map_err(|err| {
log::error!("Failed to parse service call error message: {err}");
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Failed to parse service call error message",
)
})?;
let err_msg: String =
roslibrust_serde_rosmsg::from_slice(&error_body).map_err(|err| {
log::error!("Failed to parse service call error message: {err}");
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Failed to parse service call error message",
)
})?;
// TODO probably specific error type for this
Err(std::io::Error::new(
std::io::ErrorKind::Other,
Expand Down
2 changes: 1 addition & 1 deletion roslibrust/src/ros1/service_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl ServiceServerLink {
warn!("Error from user service method for {service_name}: {e:?}");

let error_string = format!("{:?}", e);
let error_bytes = serde_rosmsg::to_vec(&error_string).unwrap();
let error_bytes = roslibrust_serde_rosmsg::to_vec(&error_string).unwrap();
let full_response = [vec![0u8], error_bytes].concat();

stream.write_all(&full_response).await.unwrap();
Expand Down
15 changes: 10 additions & 5 deletions roslibrust/src/ros1/subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::ros1::{names::Name, tcpros::ConnectionHeader};
use abort_on_drop::ChildTask;
use log::*;
use roslibrust_codegen::RosMessageType;
use std::{marker::PhantomData, sync::Arc};
use tokio::{
Expand Down Expand Up @@ -28,11 +29,14 @@ impl<T: RosMessageType> Subscriber<T> {

pub async fn next(&mut self) -> Option<Result<T, SubscriberError>> {
let data = match self.receiver.recv().await {
Ok(v) => v,
Ok(v) => {
trace!("Yielding message from subscriber");
v
}
Err(RecvError::Closed) => return None,
Err(RecvError::Lagged(n)) => return Some(Err(SubscriberError::Lagged(n))),
};
match serde_rosmsg::from_slice::<T>(&data[..]) {
match roslibrust_serde_rosmsg::from_slice::<T>(&data[..]) {
Ok(p) => Some(Ok(p)),
Err(e) => Some(Err(e.into())),
}
Expand Down Expand Up @@ -105,7 +109,7 @@ impl Subscription {
let sender = self.msg_sender.clone();
let publisher_list = self.known_publishers.clone();
let publisher_uri = publisher_uri.to_owned();

trace!("Creating new subscription connection for {publisher_uri} on {topic_name}");
let handle = tokio::spawn(async move {
if let Ok(mut stream) = establish_publisher_connection(
&node_name,
Expand All @@ -120,6 +124,7 @@ impl Subscription {
loop {
match tcpros::receive_body(&mut stream).await {
Ok(body) => {
trace!("Got new message from publisher {publisher_uri} on {topic_name}");
let send_result = sender.send(body);
if let Err(err) = send_result {
log::error!("Unable to send message data due to dropped channel, closing connection: {err}");
Expand Down Expand Up @@ -241,8 +246,8 @@ pub enum SubscriberError {
Lagged(u64),
}

impl From<serde_rosmsg::Error> for SubscriberError {
fn from(value: serde_rosmsg::Error) -> Self {
impl From<roslibrust_serde_rosmsg::Error> for SubscriberError {
fn from(value: roslibrust_serde_rosmsg::Error) -> Self {
Self::DeserializeError(value.to_string())
}
}
2 changes: 2 additions & 0 deletions roslibrust/src/ros1/tcpros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,15 @@ pub async fn receive_body(stream: &mut TcpStream) -> Result<Vec<u8>, std::io::Er
let mut body_len_bytes = [0u8; 4];
stream.read_exact(&mut body_len_bytes).await?;
let body_len = u32::from_le_bytes(body_len_bytes);
trace!("Read length from stream: {}", body_len);

// Allocate buffer space for length and body
let mut body = vec![0u8; body_len as usize + 4];
// Copy the length into the first four bytes
body[..4].copy_from_slice(&body_len.to_le_bytes());
// Read the body into the buffer
stream.read_exact(&mut body[4..]).await?;
trace!("Read body of size: {}", body.len());

// Return body
Ok(body)
Expand Down
1 change: 1 addition & 0 deletions roslibrust_codegen/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ xml-rs = "0.8"
# So that the generate code can find them, and users don't have to added dependencies themselves
smart-default = "0.7"
serde-big-array = "0.5"
serde_bytes = "0.11"

[dev-dependencies]
env_logger = "0.10"
Expand Down
14 changes: 13 additions & 1 deletion roslibrust_codegen/src/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,11 @@ fn generate_field_definition(
.ok_or(Error::new(format!("No Rust type for {}", field.field_type)))?
.to_owned(),
};
// Wrap type in appropriate Vec or array wrapper based on array information
let rust_field_type = match field.field_type.array_info {
Some(None) => format!("::std::vec::Vec<{rust_field_type}>"),
Some(None) => {
format!("::std::vec::Vec<{rust_field_type}>")
}
Some(Some(fixed_length)) => format!("[{rust_field_type}; {fixed_length}]"),
None => rust_field_type,
};
Expand Down Expand Up @@ -184,6 +187,15 @@ fn generate_field_definition(
// Larger than 32.
const MAX_FIXED_ARRAY_LEN: usize = 32;
let serde_line = match field.field_type.array_info {
Some(None) => {
// Special case for Vec<u8>, which massively benefit from optimizations in serde_bytes
// This makes deserializing an Image ~97% faster
if field.field_type.field_type == "uint8" {
quote! { #[serde(with = "::roslibrust_codegen::serde_bytes")] }
} else {
quote! {}
}
}
Some(Some(fixed_array_len)) if fixed_array_len > MAX_FIXED_ARRAY_LEN => {
quote! { #[serde(with = "::roslibrust_codegen::BigArray")] }
}
Expand Down
5 changes: 3 additions & 2 deletions roslibrust_codegen/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ pub use integral_types::*;
// Modeled from: https://users.rust-lang.org/t/proc-macros-using-third-party-crate/42465/4
pub use ::serde;
pub use serde::{de::DeserializeOwned, Deserialize, Serialize};
pub use serde_big_array::BigArray;
pub use smart_default::SmartDefault;
pub use serde_big_array::BigArray; // Used in generated code for large fixed sized arrays
pub use serde_bytes;
pub use smart_default::SmartDefault; // Used in generated code for default values // Used in generated code for faster Vec<u8> serialization

/// Fundamental traits for message types this crate works with
/// This trait will be satisfied for any types generated with this crate's message_gen functionality
Expand Down
10 changes: 8 additions & 2 deletions roslibrust_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ edition = "2021"

[dependencies]
env_logger = "0.10"
roslibrust = { path = "../roslibrust" }
roslibrust = { path = "../roslibrust", features = ["ros1"] }
roslibrust_codegen = { path = "../roslibrust_codegen" }
lazy_static = "1.4"

[dev-dependencies]
diffy = "0.3.0"
diffy = "0.3.0"
criterion = { version = "0.5", features = ["html_reports", "async_tokio"] }
tokio = { version = "1.24", features = ["full"] }

[[bench]]
name = "image_bench"
harness = false
78 changes: 78 additions & 0 deletions roslibrust_test/benches/image_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use criterion::{criterion_group, criterion_main, Criterion};
use std::{
hint::black_box,
sync::{Arc, Mutex},
};

struct BenchContext {
publisher: roslibrust::ros1::Publisher<roslibrust_test::ros1::sensor_msgs::Image>,
subscriber: roslibrust::ros1::Subscriber<roslibrust_test::ros1::sensor_msgs::Image>,
image: roslibrust_test::ros1::sensor_msgs::Image,
// Need to keep alive
_client: roslibrust::ros1::NodeHandle,
}

async fn setup_bench_context() -> BenchContext {
let client = roslibrust::ros1::NodeHandle::new("http://localhost:11311", "image_bench_rs")
.await
.unwrap();
let publisher = client
.advertise::<roslibrust_test::ros1::sensor_msgs::Image>("/image_bench", 1, false)
.await
.unwrap();
let subscriber = client
.subscribe::<roslibrust_test::ros1::sensor_msgs::Image>("/image_bench", 1)
.await
.unwrap();

// Wait for pub / sub to establish connection
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

let image = roslibrust_test::ros1::sensor_msgs::Image {
header: Default::default(),
height: 100,
width: 100,
encoding: "rgb8".to_owned(),
is_bigendian: 0,
step: 300,
data: vec![0; 1920 * 1080 * 3],
};

BenchContext {
publisher,
subscriber,
image,
_client: client,
}
}

async fn bench_iteration(context: &mut BenchContext) {
context.publisher.publish(&context.image).await.unwrap();
let received_image = context.subscriber.next().await.unwrap().unwrap();
black_box(received_image);
}

fn criterion_benchmark(c: &mut Criterion) {
env_logger::init();

// Create a tokio runtime so we can be async
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

let context = runtime.block_on(async {
let context = setup_bench_context().await;
Arc::new(Mutex::new(context))
});

c.bench_function("image_bench", |b| {
b.to_async(&runtime).iter(|| async {
let mut context = context.lock().unwrap();
bench_iteration(&mut context).await
})
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
Loading

0 comments on commit 4076ffa

Please sign in to comment.