From 4076ffa0e47701df106e6e71fdb04baad7758d9a Mon Sep 17 00:00:00 2001 From: Carter Date: Thu, 17 Oct 2024 12:46:37 -0600 Subject: [PATCH] Switch to fork of serde_rosmsg, benchmark that it is indeed much faster, fix bug with missing write_all --- Cargo.lock | 250 +++++++++++++++++++++++-- docker/noetic_compose.yaml | 6 +- roslibrust/Cargo.toml | 4 +- roslibrust/src/ros1/node/actor.rs | 4 +- roslibrust/src/ros1/publisher.rs | 13 +- roslibrust/src/ros1/service_client.rs | 19 +- roslibrust/src/ros1/service_server.rs | 2 +- roslibrust/src/ros1/subscriber.rs | 15 +- roslibrust/src/ros1/tcpros.rs | 2 + roslibrust_codegen/Cargo.toml | 1 + roslibrust_codegen/src/gen.rs | 14 +- roslibrust_codegen/src/lib.rs | 5 +- roslibrust_test/Cargo.toml | 10 +- roslibrust_test/benches/image_bench.rs | 78 ++++++++ roslibrust_test/src/ros1.rs | 6 + roslibrust_test/src/ros2.rs | 5 + 16 files changed, 385 insertions(+), 49 deletions(-) create mode 100644 roslibrust_test/benches/image_bench.rs diff --git a/Cargo.lock b/Cargo.lock index 7cd6986e..26327595 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -35,6 +35,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "anstream" version = "0.6.15" @@ -174,6 +180,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1582e1c9e755dd6ad6b224dcffb135d199399a4568d454bd89fe515ca8425695" +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cc" version = "1.1.7" @@ -186,6 +198,33 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "clap" version = "4.5.13" @@ -287,6 +326,63 @@ dependencies = [ "libc", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "futures", + "is-terminal", + "itertools 0.10.5", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "tokio", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools 0.10.5", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.11" @@ -302,6 +398,12 @@ version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + [[package]] name = "crypto-common" version = "0.1.6" @@ -432,11 +534,12 @@ dependencies = [ [[package]] name = "error-chain" -version = "0.10.0" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9435d864e017c3c6afeac1654189b06cdb491cf2ff73dbf0d73b0f292f42ff8" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" dependencies = [ "backtrace", + "version_check", ] [[package]] @@ -636,6 +739,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" +dependencies = [ + "cfg-if", + "crunchy", +] + [[package]] name = "hashbrown" version = "0.14.5" @@ -783,6 +896,15 @@ dependencies = [ "nom", ] +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.12.1" @@ -947,6 +1069,15 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "num_threads" version = "0.1.7" @@ -971,6 +1102,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "oorandom" +version = "11.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" + [[package]] name = "openssl" version = "0.10.66" @@ -1021,6 +1158,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + [[package]] name = "parking_lot_core" version = "0.9.10" @@ -1058,6 +1205,34 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -1130,6 +1305,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.5.3" @@ -1245,10 +1440,10 @@ dependencies = [ "reqwest", "roslibrust_codegen", "roslibrust_codegen_macro", + "roslibrust_serde_rosmsg", "serde", "serde-big-array", "serde_json", - "serde_rosmsg", "serde_xmlrpc", "simple_logger", "smart-default 0.6.0", @@ -1271,6 +1466,7 @@ dependencies = [ "quote", "serde", "serde-big-array", + "serde_bytes", "serde_json", "simple-error", "smart-default 0.7.1", @@ -1298,7 +1494,7 @@ dependencies = [ "clap", "const_format", "env_logger 0.11.5", - "itertools", + "itertools 0.12.1", "lazy_static", "log", "minijinja", @@ -1307,15 +1503,29 @@ dependencies = [ "serde_json", ] +[[package]] +name = "roslibrust_serde_rosmsg" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee7ea0fc21625dd94a69e126b9f2c1eab4b60dd80a6abf3e0a3e284f9d648586" +dependencies = [ + "byteorder", + "error-chain", + "serde", + "serde_derive", +] + [[package]] name = "roslibrust_test" version = "0.1.0" dependencies = [ + "criterion", "diffy", "env_logger 0.10.2", "lazy_static", "roslibrust", "roslibrust_codegen", + "tokio", ] [[package]] @@ -1426,6 +1636,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_bytes" +version = "0.11.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "387cc504cb06bb40a96c8e04e951fe01854cf6bc921053c954e4a606d9675c6a" +dependencies = [ + "serde", +] + [[package]] name = "serde_derive" version = "1.0.204" @@ -1449,18 +1668,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_rosmsg" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e55d20c8bff70e82b5948a187c77d93c34fa538ae3d9999597fbb6245993680" -dependencies = [ - "byteorder", - "error-chain", - "serde", - "serde_derive", -] - [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -1744,6 +1951,16 @@ dependencies = [ "time-core", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.8.0" @@ -1769,6 +1986,7 @@ dependencies = [ "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", diff --git a/docker/noetic_compose.yaml b/docker/noetic_compose.yaml index 73197cbd..cde87686 100644 --- a/docker/noetic_compose.yaml +++ b/docker/noetic_compose.yaml @@ -3,8 +3,8 @@ services: image: carter12s/roslibrust-ci-noetic:rust-1-72 # network_mode host required for ros1 testing network_mode: host - ports: - - "9090:9090" + # ports: + # - "9090:9090" # Pass through the ros master port for native ros1 testing - - "11311:11311" + # - "11311:11311" command: bash -c "source /opt/ros/noetic/setup.bash; roslaunch rosbridge_server rosbridge_websocket.launch & disown; rosrun rosapi rosapi_node" diff --git a/roslibrust/Cargo.toml b/roslibrust/Cargo.toml index 645bf452..a871c8dc 100644 --- a/roslibrust/Cargo.toml +++ b/roslibrust/Cargo.toml @@ -38,7 +38,7 @@ roslibrust_codegen_macro = { path = "../roslibrust_codegen_macro", version = "0. roslibrust_codegen = { path = "../roslibrust_codegen", version = "0.10.0" } reqwest = { version = "0.11", optional = true } # Only used with native ros1 serde_xmlrpc = { version = "0.2", optional = true } # Only used with native ros1 -serde_rosmsg = { version = "0.2", optional = true } # Only used with native ros1 +roslibrust_serde_rosmsg = { version = "0.3", optional = true } # Only used with native ros1 hyper = { version = "0.14", features = [ "server", ], optional = true } # Only used with native ros1 @@ -74,7 +74,7 @@ ros1 = [ "dep:hyper", "dep:gethostname", "dep:regex", - "dep:serde_rosmsg", + "dep:roslibrust_serde_rosmsg", ] diff --git a/roslibrust/src/ros1/node/actor.rs b/roslibrust/src/ros1/node/actor.rs index 5d83fc9d..dd48ba50 100644 --- a/roslibrust/src/ros1/node/actor.rs +++ b/roslibrust/src/ros1/node/actor.rs @@ -237,10 +237,10 @@ impl NodeServerHandle { // This gives a generic closure that operates on byte arrays that we can then store and use freely let server_typeless = move |message: Vec| -> Result, Box> { - let request = serde_rosmsg::from_slice::(&message) + let request = roslibrust_serde_rosmsg::from_slice::(&message) .map_err(|err| RosLibRustError::SerializationError(err.to_string()))?; let response = server(request)?; - Ok(serde_rosmsg::to_vec(&response) + Ok(roslibrust_serde_rosmsg::to_vec(&response) .map_err(|err| RosLibRustError::SerializationError(err.to_string()))?) }; let server_typeless = Box::new(server_typeless); diff --git a/roslibrust/src/ros1/publisher.rs b/roslibrust/src/ros1/publisher.rs index aef331de..f4fe8297 100644 --- a/roslibrust/src/ros1/publisher.rs +++ b/roslibrust/src/ros1/publisher.rs @@ -34,7 +34,7 @@ impl Publisher { /// Queues a message to be send on the related topic. /// Returns when the data has been queued not when data is actually sent. pub async fn publish(&self, data: &T) -> Result<(), PublisherError> { - let data = serde_rosmsg::to_vec(&data)?; + let data = roslibrust_serde_rosmsg::to_vec(&data)?; // TODO this is a pretty dumb... // because of the internal channel used for re-direction this future doesn't // actually complete when the data is sent, but merely when it is queued to be sent @@ -169,10 +169,11 @@ impl Publication { loop { match rx.recv().await { Some(msg_to_publish) => { + trace!("Publish task got message to publish for topic: {topic}"); let mut streams = subscriber_streams.write().await; let mut streams_to_remove = vec![]; for (stream_idx, stream) in streams.iter_mut().enumerate() { - if let Err(err) = stream.write(&msg_to_publish[..]).await { + if let Err(err) = stream.write_all(&msg_to_publish[..]).await { // TODO: A single failure between nodes that cross host boundaries is probably normal, should make this more robust perhaps debug!("Failed to send data to subscriber: {err}, removing"); streams_to_remove.push(stream_idx); @@ -272,7 +273,7 @@ impl Publication { .to_bytes(false) .expect("Couldn't serialize connection header"); stream - .write(&response_header_bytes[..]) + .write_all(&response_header_bytes[..]) .await .expect("Unable to respond on tcpstream"); @@ -282,7 +283,7 @@ impl Publication { debug!( "Publication configured to be latching and has last_message, sending" ); - let res = stream.write(last_message).await; + let res = stream.write_all(last_message).await; match res { Ok(_) => {} Err(e) => { @@ -320,8 +321,8 @@ pub enum PublisherError { StreamClosed, } -impl From for PublisherError { - fn from(value: serde_rosmsg::Error) -> Self { +impl From for PublisherError { + fn from(value: roslibrust_serde_rosmsg::Error) -> Self { Self::SerializingError(value.to_string()) } } diff --git a/roslibrust/src/ros1/service_client.rs b/roslibrust/src/ros1/service_client.rs index e8501c63..a5220549 100644 --- a/roslibrust/src/ros1/service_client.rs +++ b/roslibrust/src/ros1/service_client.rs @@ -54,7 +54,7 @@ impl ServiceClient { } pub async fn call(&self, request: &T::Request) -> RosLibRustResult { - let request_payload = serde_rosmsg::to_vec(request) + let request_payload = roslibrust_serde_rosmsg::to_vec(request) .map_err(|err| RosLibRustError::SerializationError(err.to_string()))?; let (response_tx, response_rx) = oneshot::channel(); @@ -69,7 +69,7 @@ impl ServiceClient { self.service_name, result_payload ); - let response: T::Response = serde_rosmsg::from_slice(&result_payload) + let response: T::Response = roslibrust_serde_rosmsg::from_slice(&result_payload) .map_err(|err| RosLibRustError::SerializationError(err.to_string()))?; return Ok(response); } @@ -202,13 +202,14 @@ impl ServiceClientLink { } else { // Parse an error message as the body let error_body = tcpros::receive_body(stream).await?; - let err_msg: String = serde_rosmsg::from_slice(&error_body).map_err(|err| { - log::error!("Failed to parse service call error message: {err}"); - std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Failed to parse service call error message", - ) - })?; + let err_msg: String = + roslibrust_serde_rosmsg::from_slice(&error_body).map_err(|err| { + log::error!("Failed to parse service call error message: {err}"); + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Failed to parse service call error message", + ) + })?; // TODO probably specific error type for this Err(std::io::Error::new( std::io::ErrorKind::Other, diff --git a/roslibrust/src/ros1/service_server.rs b/roslibrust/src/ros1/service_server.rs index 7748a2d7..5abc3def 100644 --- a/roslibrust/src/ros1/service_server.rs +++ b/roslibrust/src/ros1/service_server.rs @@ -220,7 +220,7 @@ impl ServiceServerLink { warn!("Error from user service method for {service_name}: {e:?}"); let error_string = format!("{:?}", e); - let error_bytes = serde_rosmsg::to_vec(&error_string).unwrap(); + let error_bytes = roslibrust_serde_rosmsg::to_vec(&error_string).unwrap(); let full_response = [vec![0u8], error_bytes].concat(); stream.write_all(&full_response).await.unwrap(); diff --git a/roslibrust/src/ros1/subscriber.rs b/roslibrust/src/ros1/subscriber.rs index a19058df..78c236b0 100644 --- a/roslibrust/src/ros1/subscriber.rs +++ b/roslibrust/src/ros1/subscriber.rs @@ -1,5 +1,6 @@ use crate::ros1::{names::Name, tcpros::ConnectionHeader}; use abort_on_drop::ChildTask; +use log::*; use roslibrust_codegen::RosMessageType; use std::{marker::PhantomData, sync::Arc}; use tokio::{ @@ -28,11 +29,14 @@ impl Subscriber { pub async fn next(&mut self) -> Option> { let data = match self.receiver.recv().await { - Ok(v) => v, + Ok(v) => { + trace!("Yielding message from subscriber"); + v + } Err(RecvError::Closed) => return None, Err(RecvError::Lagged(n)) => return Some(Err(SubscriberError::Lagged(n))), }; - match serde_rosmsg::from_slice::(&data[..]) { + match roslibrust_serde_rosmsg::from_slice::(&data[..]) { Ok(p) => Some(Ok(p)), Err(e) => Some(Err(e.into())), } @@ -105,7 +109,7 @@ impl Subscription { let sender = self.msg_sender.clone(); let publisher_list = self.known_publishers.clone(); let publisher_uri = publisher_uri.to_owned(); - + trace!("Creating new subscription connection for {publisher_uri} on {topic_name}"); let handle = tokio::spawn(async move { if let Ok(mut stream) = establish_publisher_connection( &node_name, @@ -120,6 +124,7 @@ impl Subscription { loop { match tcpros::receive_body(&mut stream).await { Ok(body) => { + trace!("Got new message from publisher {publisher_uri} on {topic_name}"); let send_result = sender.send(body); if let Err(err) = send_result { log::error!("Unable to send message data due to dropped channel, closing connection: {err}"); @@ -241,8 +246,8 @@ pub enum SubscriberError { Lagged(u64), } -impl From for SubscriberError { - fn from(value: serde_rosmsg::Error) -> Self { +impl From for SubscriberError { + fn from(value: roslibrust_serde_rosmsg::Error) -> Self { Self::DeserializeError(value.to_string()) } } diff --git a/roslibrust/src/ros1/tcpros.rs b/roslibrust/src/ros1/tcpros.rs index 5041749f..1a0e0fee 100644 --- a/roslibrust/src/ros1/tcpros.rs +++ b/roslibrust/src/ros1/tcpros.rs @@ -252,6 +252,7 @@ pub async fn receive_body(stream: &mut TcpStream) -> Result, std::io::Er let mut body_len_bytes = [0u8; 4]; stream.read_exact(&mut body_len_bytes).await?; let body_len = u32::from_le_bytes(body_len_bytes); + trace!("Read length from stream: {}", body_len); // Allocate buffer space for length and body let mut body = vec![0u8; body_len as usize + 4]; @@ -259,6 +260,7 @@ pub async fn receive_body(stream: &mut TcpStream) -> Result, std::io::Er body[..4].copy_from_slice(&body_len.to_le_bytes()); // Read the body into the buffer stream.read_exact(&mut body[4..]).await?; + trace!("Read body of size: {}", body.len()); // Return body Ok(body) diff --git a/roslibrust_codegen/Cargo.toml b/roslibrust_codegen/Cargo.toml index ffb2d465..5340384c 100644 --- a/roslibrust_codegen/Cargo.toml +++ b/roslibrust_codegen/Cargo.toml @@ -30,6 +30,7 @@ xml-rs = "0.8" # So that the generate code can find them, and users don't have to added dependencies themselves smart-default = "0.7" serde-big-array = "0.5" +serde_bytes = "0.11" [dev-dependencies] env_logger = "0.10" diff --git a/roslibrust_codegen/src/gen.rs b/roslibrust_codegen/src/gen.rs index ffc369db..74746f97 100644 --- a/roslibrust_codegen/src/gen.rs +++ b/roslibrust_codegen/src/gen.rs @@ -133,8 +133,11 @@ fn generate_field_definition( .ok_or(Error::new(format!("No Rust type for {}", field.field_type)))? .to_owned(), }; + // Wrap type in appropriate Vec or array wrapper based on array information let rust_field_type = match field.field_type.array_info { - Some(None) => format!("::std::vec::Vec<{rust_field_type}>"), + Some(None) => { + format!("::std::vec::Vec<{rust_field_type}>") + } Some(Some(fixed_length)) => format!("[{rust_field_type}; {fixed_length}]"), None => rust_field_type, }; @@ -184,6 +187,15 @@ fn generate_field_definition( // Larger than 32. const MAX_FIXED_ARRAY_LEN: usize = 32; let serde_line = match field.field_type.array_info { + Some(None) => { + // Special case for Vec, which massively benefit from optimizations in serde_bytes + // This makes deserializing an Image ~97% faster + if field.field_type.field_type == "uint8" { + quote! { #[serde(with = "::roslibrust_codegen::serde_bytes")] } + } else { + quote! {} + } + } Some(Some(fixed_array_len)) if fixed_array_len > MAX_FIXED_ARRAY_LEN => { quote! { #[serde(with = "::roslibrust_codegen::BigArray")] } } diff --git a/roslibrust_codegen/src/lib.rs b/roslibrust_codegen/src/lib.rs index c9b3b2e7..73867950 100644 --- a/roslibrust_codegen/src/lib.rs +++ b/roslibrust_codegen/src/lib.rs @@ -23,8 +23,9 @@ pub use integral_types::*; // Modeled from: https://users.rust-lang.org/t/proc-macros-using-third-party-crate/42465/4 pub use ::serde; pub use serde::{de::DeserializeOwned, Deserialize, Serialize}; -pub use serde_big_array::BigArray; -pub use smart_default::SmartDefault; +pub use serde_big_array::BigArray; // Used in generated code for large fixed sized arrays +pub use serde_bytes; +pub use smart_default::SmartDefault; // Used in generated code for default values // Used in generated code for faster Vec serialization /// Fundamental traits for message types this crate works with /// This trait will be satisfied for any types generated with this crate's message_gen functionality diff --git a/roslibrust_test/Cargo.toml b/roslibrust_test/Cargo.toml index f16c63aa..b0487765 100644 --- a/roslibrust_test/Cargo.toml +++ b/roslibrust_test/Cargo.toml @@ -5,9 +5,15 @@ edition = "2021" [dependencies] env_logger = "0.10" -roslibrust = { path = "../roslibrust" } +roslibrust = { path = "../roslibrust", features = ["ros1"] } roslibrust_codegen = { path = "../roslibrust_codegen" } lazy_static = "1.4" [dev-dependencies] -diffy = "0.3.0" \ No newline at end of file +diffy = "0.3.0" +criterion = { version = "0.5", features = ["html_reports", "async_tokio"] } +tokio = { version = "1.24", features = ["full"] } + +[[bench]] +name = "image_bench" +harness = false diff --git a/roslibrust_test/benches/image_bench.rs b/roslibrust_test/benches/image_bench.rs new file mode 100644 index 00000000..c1d535c7 --- /dev/null +++ b/roslibrust_test/benches/image_bench.rs @@ -0,0 +1,78 @@ +use criterion::{criterion_group, criterion_main, Criterion}; +use std::{ + hint::black_box, + sync::{Arc, Mutex}, +}; + +struct BenchContext { + publisher: roslibrust::ros1::Publisher, + subscriber: roslibrust::ros1::Subscriber, + image: roslibrust_test::ros1::sensor_msgs::Image, + // Need to keep alive + _client: roslibrust::ros1::NodeHandle, +} + +async fn setup_bench_context() -> BenchContext { + let client = roslibrust::ros1::NodeHandle::new("http://localhost:11311", "image_bench_rs") + .await + .unwrap(); + let publisher = client + .advertise::("/image_bench", 1, false) + .await + .unwrap(); + let subscriber = client + .subscribe::("/image_bench", 1) + .await + .unwrap(); + + // Wait for pub / sub to establish connection + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let image = roslibrust_test::ros1::sensor_msgs::Image { + header: Default::default(), + height: 100, + width: 100, + encoding: "rgb8".to_owned(), + is_bigendian: 0, + step: 300, + data: vec![0; 1920 * 1080 * 3], + }; + + BenchContext { + publisher, + subscriber, + image, + _client: client, + } +} + +async fn bench_iteration(context: &mut BenchContext) { + context.publisher.publish(&context.image).await.unwrap(); + let received_image = context.subscriber.next().await.unwrap().unwrap(); + black_box(received_image); +} + +fn criterion_benchmark(c: &mut Criterion) { + env_logger::init(); + + // Create a tokio runtime so we can be async + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let context = runtime.block_on(async { + let context = setup_bench_context().await; + Arc::new(Mutex::new(context)) + }); + + c.bench_function("image_bench", |b| { + b.to_async(&runtime).iter(|| async { + let mut context = context.lock().unwrap(); + bench_iteration(&mut context).await + }) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/roslibrust_test/src/ros1.rs b/roslibrust_test/src/ros1.rs index 1f2e3b56..7c22231a 100644 --- a/roslibrust_test/src/ros1.rs +++ b/roslibrust_test/src/ros1.rs @@ -7543,6 +7543,7 @@ float32[] values"#; pub struct CompressedImage { pub r#header: std_msgs::Header, pub r#format: ::std::string::String, + #[serde(with = "::roslibrust_codegen::serde_bytes")] pub r#data: ::std::vec::Vec, } impl ::roslibrust_codegen::RosMessageType for CompressedImage { @@ -7695,6 +7696,7 @@ string frame_id"#; pub r#encoding: ::std::string::String, pub r#is_bigendian: u8, pub r#step: u32, + #[serde(with = "::roslibrust_codegen::serde_bytes")] pub r#data: ::std::vec::Vec, } impl ::roslibrust_codegen::RosMessageType for Image { @@ -8649,6 +8651,7 @@ string frame_id"#; pub r#is_bigendian: bool, pub r#point_step: u32, pub r#row_step: u32, + #[serde(with = "::roslibrust_codegen::serde_bytes")] pub r#data: ::std::vec::Vec, pub r#is_dense: bool, } @@ -10506,6 +10509,7 @@ uint32 stride # stride of given dimension"#; #[serde(crate = "::roslibrust_codegen::serde")] pub struct UInt8MultiArray { pub r#layout: self::MultiArrayLayout, + #[serde(with = "::roslibrust_codegen::serde_bytes")] pub r#data: ::std::vec::Vec, } impl ::roslibrust_codegen::RosMessageType for UInt8MultiArray { @@ -11100,6 +11104,7 @@ int64 sum"#; )] #[serde(crate = "::roslibrust_codegen::serde")] pub struct RoundTripArrayRequest { + #[serde(with = "::roslibrust_codegen::serde_bytes")] pub r#bytes: ::std::vec::Vec, } impl ::roslibrust_codegen::RosMessageType for RoundTripArrayRequest { @@ -11119,6 +11124,7 @@ uint8[] bytes"#; )] #[serde(crate = "::roslibrust_codegen::serde")] pub struct RoundTripArrayResponse { + #[serde(with = "::roslibrust_codegen::serde_bytes")] pub r#bytes: ::std::vec::Vec, } impl ::roslibrust_codegen::RosMessageType for RoundTripArrayResponse { diff --git a/roslibrust_test/src/ros2.rs b/roslibrust_test/src/ros2.rs index 66869903..6ec4deb8 100644 --- a/roslibrust_test/src/ros2.rs +++ b/roslibrust_test/src/ros2.rs @@ -4286,6 +4286,7 @@ float32[] values"#; pub struct CompressedImage { pub r#header: std_msgs::Header, pub r#format: ::std::string::String, + #[serde(with = "::roslibrust_codegen::serde_bytes")] pub r#data: ::std::vec::Vec, } impl ::roslibrust_codegen::RosMessageType for CompressedImage { @@ -4426,6 +4427,7 @@ string frame_id"#; pub r#encoding: ::std::string::String, pub r#is_bigendian: u8, pub r#step: u32, + #[serde(with = "::roslibrust_codegen::serde_bytes")] pub r#data: ::std::vec::Vec, } impl ::roslibrust_codegen::RosMessageType for Image { @@ -5342,6 +5344,7 @@ string frame_id"#; pub r#is_bigendian: bool, pub r#point_step: u32, pub r#row_step: u32, + #[serde(with = "::roslibrust_codegen::serde_bytes")] pub r#data: ::std::vec::Vec, pub r#is_dense: bool, } @@ -7455,6 +7458,7 @@ uint8 data"#; #[serde(crate = "::roslibrust_codegen::serde")] pub struct UInt8MultiArray { pub r#layout: self::MultiArrayLayout, + #[serde(with = "::roslibrust_codegen::serde_bytes")] pub r#data: ::std::vec::Vec, } impl ::roslibrust_codegen::RosMessageType for UInt8MultiArray { @@ -13456,6 +13460,7 @@ uint8 command_type"#; #[serde(crate = "::roslibrust_codegen::serde")] pub struct MeshFile { pub r#filename: ::std::string::String, + #[serde(with = "::roslibrust_codegen::serde_bytes")] pub r#data: ::std::vec::Vec, } impl ::roslibrust_codegen::RosMessageType for MeshFile {