From 7e6b79dea89133d530e451eab57c4dacf0371d17 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Thu, 16 Jan 2025 13:57:04 +0100 Subject: [PATCH] Less use of `TransportChunk` (#8707) * Part of #3741 --- crates/store/re_chunk/src/transport.rs | 10 ---------- crates/store/re_dataframe/src/query.rs | 2 +- crates/store/re_grpc_client/src/lib.rs | 9 +++++---- .../re_log_encoding/src/codec/wire/decoder.rs | 18 +++++++++-------- .../re_log_encoding/src/codec/wire/mod.rs | 2 +- crates/store/re_log_types/Cargo.toml | 6 +++++- rerun_py/src/remote.rs | 20 +++++++++---------- 7 files changed, 31 insertions(+), 36 deletions(-) diff --git a/crates/store/re_chunk/src/transport.rs b/crates/store/re_chunk/src/transport.rs index 90ea338e21d9..6527da66593d 100644 --- a/crates/store/re_chunk/src/transport.rs +++ b/crates/store/re_chunk/src/transport.rs @@ -356,16 +356,6 @@ impl TransportChunk { }) } - #[inline] - pub fn fields_and_columns(&self) -> impl Iterator + '_ { - self.fields().iter().enumerate().filter_map(|(i, field)| { - self.batch - .columns() - .get(i) - .map(|column| (field.as_ref(), column)) - }) - } - /// Iterates all control columns present in this chunk. #[inline] pub fn controls(&self) -> impl Iterator { diff --git a/crates/store/re_dataframe/src/query.rs b/crates/store/re_dataframe/src/query.rs index e3b3313fa6d2..cd302a51709f 100644 --- a/crates/store/re_dataframe/src/query.rs +++ b/crates/store/re_dataframe/src/query.rs @@ -1262,7 +1262,7 @@ impl QueryHandle { /// Calls [`Self::next_row`] and wraps the result in a [`ArrowRecordBatch`]. /// - /// Only use this if you absolutely need a [`RecordBatch`] as this adds a + /// Only use this if you absolutely need a [`ArrowRecordBatch`] as this adds a /// some overhead for schema validation. /// /// See [`Self::next_row`] for more information. diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index 63471503527c..487d27098508 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -218,7 +218,8 @@ async fn stream_recording_async( })); } - let store_info = store_info_from_catalog_chunk(&resp[0], &recording_id)?; + let store_info = + store_info_from_catalog_chunk(&TransportChunk::from(resp[0].clone()), &recording_id)?; let store_id = store_info.store_id.clone(); re_log::debug!("Fetching {recording_id}…"); @@ -255,8 +256,8 @@ async fn stream_recording_async( re_log::info!("Starting to read..."); while let Some(result) = resp.next().await { - let tc = result.map_err(TonicStatusError)?; - let chunk = Chunk::from_transport(&tc)?; + let batch = result.map_err(TonicStatusError)?; + let chunk = Chunk::from_record_batch(batch)?; if tx .send(LogMsg::ArrowMsg(store_id.clone(), chunk.to_arrow_msg()?)) @@ -391,7 +392,7 @@ async fn stream_catalog_async( re_log::info!("Starting to read..."); while let Some(result) = resp.next().await { - let input = result.map_err(TonicStatusError)?; + let input = TransportChunk::from(result.map_err(TonicStatusError)?); // Catalog received from the ReDap server isn't suitable for direct conversion to a Rerun Chunk: // - conversion expects "data" columns to be ListArrays, hence we need to convert any individual row column data to ListArray diff --git a/crates/store/re_log_encoding/src/codec/wire/decoder.rs b/crates/store/re_log_encoding/src/codec/wire/decoder.rs index e8f3c83040be..39b7a66542e5 100644 --- a/crates/store/re_log_encoding/src/codec/wire/decoder.rs +++ b/crates/store/re_log_encoding/src/codec/wire/decoder.rs @@ -1,36 +1,38 @@ -use crate::codec::arrow::read_arrow_from_bytes; -use crate::codec::CodecError; -use re_chunk::TransportChunk; +use arrow::array::RecordBatch as ArrowRecordBatch; + use re_protos::common::v0::RerunChunk; use re_protos::remote_store::v0::DataframePart; +use crate::codec::arrow::read_arrow_from_bytes; +use crate::codec::CodecError; + /// Decode transport data from a byte stream. fn decode( version: re_protos::common::v0::EncoderVersion, data: &[u8], -) -> Result { +) -> Result { match version { re_protos::common::v0::EncoderVersion::V0 => { let mut reader = std::io::Cursor::new(data); let batch = read_arrow_from_bytes(&mut reader)?; - Ok(TransportChunk::from(batch)) + Ok(batch) } } } /// Decode an object from a its wire (protobuf) representation. pub trait Decode { - fn decode(&self) -> Result; + fn decode(&self) -> Result; } impl Decode for DataframePart { - fn decode(&self) -> Result { + fn decode(&self) -> Result { decode(self.encoder_version(), &self.payload) } } impl Decode for RerunChunk { - fn decode(&self) -> Result { + fn decode(&self) -> Result { decode(self.encoder_version(), &self.payload) } } diff --git a/crates/store/re_log_encoding/src/codec/wire/mod.rs b/crates/store/re_log_encoding/src/codec/wire/mod.rs index a08cd28a810a..7f2ea2624100 100644 --- a/crates/store/re_log_encoding/src/codec/wire/mod.rs +++ b/crates/store/re_log_encoding/src/codec/wire/mod.rs @@ -62,7 +62,7 @@ mod tests { .unwrap(); let decoded = encoded.decode().unwrap(); - let decoded_chunk = Chunk::from_transport(&decoded).unwrap(); + let decoded_chunk = Chunk::from_record_batch(decoded).unwrap(); assert_eq!(expected_chunk, decoded_chunk); } diff --git a/crates/store/re_log_types/Cargo.toml b/crates/store/re_log_types/Cargo.toml index 2437bbf3e5bf..788f02b97c4a 100644 --- a/crates/store/re_log_types/Cargo.toml +++ b/crates/store/re_log_types/Cargo.toml @@ -58,7 +58,11 @@ re_types_core.workspace = true ahash.workspace = true anyhow.workspace = true arrow = { workspace = true, features = ["ipc"] } -arrow2 = { workspace = true, features = ["io_print", "compute_concatenate"] } +arrow2 = { workspace = true, features = [ + "arrow", + "io_print", + "compute_concatenate", +] } backtrace.workspace = true bytemuck.workspace = true clean-path.workspace = true diff --git a/rerun_py/src/remote.rs b/rerun_py/src/remote.rs index 5959a385b6a9..dd0763ac8e0c 100644 --- a/rerun_py/src/remote.rs +++ b/rerun_py/src/remote.rs @@ -130,7 +130,10 @@ impl PyStorageNodeClient { )); } - re_grpc_client::store_info_from_catalog_chunk(&resp[0], id) + re_grpc_client::store_info_from_catalog_chunk( + &re_chunk::TransportChunk::from(resp[0].clone()), + id, + ) }) .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; @@ -173,7 +176,7 @@ impl PyStorageNodeClient { .unwrap_or_else(|| ArrowSchema::empty().into()); Ok(RecordBatchIterator::new( - batches.into_iter().map(|tc| Ok(tc.into())), + batches.into_iter().map(Ok), schema, )) }); @@ -234,10 +237,7 @@ impl PyStorageNodeClient { .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; let record_batches: Vec> = - transport_chunks - .into_iter() - .map(|tc| Ok(tc.into())) - .collect(); + transport_chunks.into_iter().map(Ok).collect(); // TODO(jleibs): surfacing this schema is awkward. This should be more explicit in // the gRPC APIs somehow. @@ -346,9 +346,7 @@ impl PyStorageNodeClient { .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; let recording_id = metadata - .fields_and_columns() - .find(|(field, _data)| field.name() == "rerun_recording_id") - .map(|(_field, data)| data) + .column_by_name("rerun_recording_id") .ok_or(PyRuntimeError::new_err("No rerun_recording_id"))? .downcast_array_ref::() .ok_or(PyRuntimeError::new_err("Recording Id is not a string"))? @@ -480,13 +478,13 @@ impl PyStorageNodeClient { while let Some(result) = resp.next().await { let response = result.map_err(|err| PyRuntimeError::new_err(err.to_string()))?; - let tc = match response.decode() { + let batch = match response.decode() { Ok(tc) => tc, Err(err) => { return Err(PyRuntimeError::new_err(err.to_string())); } }; - let chunk = Chunk::from_transport(&tc) + let chunk = Chunk::from_record_batch(batch) .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; store