Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create re_arrow_util #8689

Merged
merged 10 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ Update instructions:
| Crate | Description |
|--------------------|--------------------------------------------------------------------------------------|
| re_analytics | Rerun's analytics SDK |
| re_arrow_util | Helpers for working with arrow |
| re_byte_size | Calculate the heap-allocated size of values at runtime |
| re_capabilities | Capability tokens |
| re_case | Case conversions, the way Rerun likes them |
Expand Down
25 changes: 24 additions & 1 deletion Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5584,7 +5584,7 @@ dependencies = [
[[package]]
name = "re_arrow2"
version = "0.18.1"
source = "git+https://github.com/rerun-io/re_arrow2.git?branch=main#e8576708a1b41b493980ecb995e808aefcfa1fbc"
source = "git+https://github.com/rerun-io/re_arrow2.git?branch=main#61ac48418df229584155c5f669477bedf76d4505"
dependencies = [
"ahash",
"arrow-array",
Expand All @@ -5610,6 +5610,17 @@ dependencies = [
"simdutf8",
]

[[package]]
name = "re_arrow_util"
version = "0.22.0-alpha.1+dev"
dependencies = [
"arrow",
"itertools 0.13.0",
"re_arrow2",
"re_log",
"re_tracing",
]

[[package]]
name = "re_blueprint_tree"
version = "0.22.0-alpha.1+dev"
Expand Down Expand Up @@ -5692,6 +5703,7 @@ dependencies = [
"nohash-hasher",
"rand",
"re_arrow2",
"re_arrow_util",
"re_byte_size",
"re_error",
"re_format",
Expand Down Expand Up @@ -5725,6 +5737,7 @@ dependencies = [
"parking_lot",
"rand",
"re_arrow2",
"re_arrow_util",
"re_byte_size",
"re_chunk",
"re_format",
Expand Down Expand Up @@ -5904,6 +5917,7 @@ dependencies = [
"nohash-hasher",
"rayon",
"re_arrow2",
"re_arrow_util",
"re_chunk",
"re_chunk_store",
"re_log",
Expand Down Expand Up @@ -6001,6 +6015,7 @@ dependencies = [
"arrow",
"comfy-table",
"itertools 0.13.0",
"re_arrow_util",
"re_tuid",
"re_types_core",
]
Expand All @@ -6010,6 +6025,7 @@ name = "re_grpc_client"
version = "0.22.0-alpha.1+dev"
dependencies = [
"arrow",
"re_arrow_util",
"re_chunk",
"re_error",
"re_log",
Expand Down Expand Up @@ -6123,6 +6139,7 @@ dependencies = [
"num-derive",
"num-traits",
"re_arrow2",
"re_arrow_util",
"re_build_info",
"re_byte_size",
"re_format",
Expand Down Expand Up @@ -6226,6 +6243,7 @@ dependencies = [
"paste",
"rand",
"re_arrow2",
"re_arrow_util",
"re_byte_size",
"re_chunk",
"re_chunk_store",
Expand Down Expand Up @@ -6585,6 +6603,7 @@ dependencies = [
"nohash-hasher",
"once_cell",
"re_arrow2",
"re_arrow_util",
"re_byte_size",
"re_case",
"re_error",
Expand Down Expand Up @@ -6613,6 +6632,7 @@ dependencies = [
"once_cell",
"parking_lot",
"rand",
"re_arrow_util",
"re_entity_db",
"re_format",
"re_log",
Expand Down Expand Up @@ -6707,6 +6727,7 @@ dependencies = [
"egui",
"egui_table",
"itertools 0.13.0",
"re_arrow_util",
"re_chunk_store",
"re_dataframe",
"re_error",
Expand Down Expand Up @@ -7371,6 +7392,7 @@ dependencies = [
"once_cell",
"parking_lot",
"re_arrow2",
"re_arrow_util",
"re_log",
"re_sdk",
"re_video",
Expand All @@ -7394,6 +7416,7 @@ dependencies = [
"pyo3-build-config",
"rand",
"re_arrow2",
"re_arrow_util",
"re_build_info",
"re_build_tools",
"re_chunk",
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ rerun-cli = { path = "crates/top/rerun-cli", version = "=0.22.0-alpha.1", defaul

# crates/utils:
re_analytics = { path = "crates/utils/re_analytics", version = "=0.22.0-alpha.1", default-features = false }
re_arrow_util = { path = "crates/utils/re_arrow_util", version = "=0.22.0-alpha.1", default-features = false }
re_byte_size = { path = "crates/utils/re_byte_size", version = "=0.22.0-alpha.1", default-features = false }
re_capabilities = { path = "crates/utils/re_capabilities", version = "=0.22.0-alpha.1", default-features = false }
re_case = { path = "crates/utils/re_case", version = "=0.22.0-alpha.1", default-features = false }
re_crash_handler = { path = "crates/utils/re_crash_handler", version = "=0.22.0-alpha.1", default-features = false }
Expand All @@ -80,7 +82,6 @@ re_format = { path = "crates/utils/re_format", version = "=0.22.0-alpha.1", defa
re_int_histogram = { path = "crates/utils/re_int_histogram", version = "=0.22.0-alpha.1", default-features = false }
re_log = { path = "crates/utils/re_log", version = "=0.22.0-alpha.1", default-features = false }
re_memory = { path = "crates/utils/re_memory", version = "=0.22.0-alpha.1", default-features = false }
re_byte_size = { path = "crates/utils/re_byte_size", version = "=0.22.0-alpha.1", default-features = false }
re_smart_channel = { path = "crates/utils/re_smart_channel", version = "=0.22.0-alpha.1", default-features = false }
re_string_interner = { path = "crates/utils/re_string_interner", version = "=0.22.0-alpha.1", default-features = false }
re_tracing = { path = "crates/utils/re_tracing", version = "=0.22.0-alpha.1", default-features = false }
Expand Down
18 changes: 9 additions & 9 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,19 @@ disallowed-methods = [
{ path = "std::panic::catch_unwind", reason = "We compile with `panic = 'abort'`" },
{ path = "std::thread::spawn", reason = "Use `std::thread::Builder` and name the thread" },

{ path = "arrow::compute::concat", reason = "Use `re_chunk::arrow_util::concat_arrays` instead, which has better memory management" },
{ path = "arrow::compute::filter", reason = "Use `re_chunk::arrow_util::filter_array` instead" },
{ path = "arrow::compute::take", reason = "Use `re_chunk::arrow_util::take_array` instead" },
{ path = "arrow::compute::concat", reason = "Use `re_arrow_util::arrow_util::concat_arrays` instead, which has better memory management" },
{ path = "arrow::compute::filter", reason = "Use `re_arrow_util::arrow_util::filter_array` instead" },
{ path = "arrow::compute::take", reason = "Use `re_arrow_util::arrow_util::take_array` instead" },

{ path = "arrow::datatypes::Schema::new", reason = "Use `arrow::datatypes::Schema::new_with_metadata` instead. There is usually some metadata you want to preserve." },

# Specify both `arrow2` and `re_arrow2` -- clippy gets lost in all the package renaming happening.
{ path = "arrow2::compute::concatenate::concatenate", reason = "Use `re_chunk::arrow2_util::concat_arrays` instead, which has proper early outs" },
{ path = "arrow2::compute::filter::filter", reason = "Use `re_chunk::arrow2_util::filter_array` instead, which has proper early outs" },
{ path = "arrow2::compute::take::take", reason = "Use `re_chunk::arrow2_util::take_array` instead, which has proper early outs" },
{ path = "re_arrow2::compute::concatenate::concatenate", reason = "Use `re_chunk::arrow2_util::concat_arrays` instead, which has proper early outs" },
{ path = "re_arrow2::compute::filter::filter", reason = "Use `re_chunk::arrow2_util::filter_array` instead, which has proper early outs" },
{ path = "re_arrow2::compute::take::take", reason = "Use `re_chunk::arrow2_util::take_array` instead, which has proper early outs" },
{ path = "arrow2::compute::concatenate::concatenate", reason = "Use `re_arrow_util::arrow2_util::concat_arrays` instead, which has proper early outs" },
{ path = "arrow2::compute::filter::filter", reason = "Use `re_arrow_util::arrow2_util::filter_array` instead, which has proper early outs" },
{ path = "arrow2::compute::take::take", reason = "Use `re_arrow_util::arrow2_util::take_array` instead, which has proper early outs" },
{ path = "re_arrow2::compute::concatenate::concatenate", reason = "Use `re_arrow_util::arrow2_util::concat_arrays` instead, which has proper early outs" },
{ path = "re_arrow2::compute::filter::filter", reason = "Use `re_arrow_util::arrow2_util::filter_array` instead, which has proper early outs" },
{ path = "re_arrow2::compute::take::take", reason = "Use `re_arrow_util::arrow2_util::take_array` instead, which has proper early outs" },

# There are many things that aren't allowed on wasm,
# but we cannot disable them all here (because of e.g. https://github.com/rust-lang/rust-clippy/issues/10406)
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_chunk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ arrow = ["arrow2/arrow"]
[dependencies]

# Rerun
re_arrow_util.workspace = true
re_byte_size.workspace = true
re_error.workspace = true
re_format.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion crates/store/re_chunk/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use arrow::buffer::ScalarBuffer as ArrowScalarBuffer;
use crossbeam::channel::{Receiver, Sender};
use nohash_hasher::IntMap;

use re_arrow_util::arrow_util;
use re_byte_size::SizeBytes as _;
use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt, TimePoint, Timeline};
use re_types_core::ComponentDescriptor;

use crate::{arrow_util, chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn};
use crate::{chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn};

// ---

Expand Down
3 changes: 2 additions & 1 deletion crates/store/re_chunk/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use arrow::{array::ArrayRef, datatypes::DataType as ArrowDatatype};
use itertools::Itertools;
use nohash_hasher::IntMap;

use re_arrow_util::arrow_util;
use re_log_types::{EntityPath, TimeInt, TimePoint, Timeline};
use re_types_core::{AsComponents, ComponentBatch, ComponentDescriptor, SerializedComponentBatch};

use crate::{arrow_util, chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn};
use crate::{chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn};

// ---

Expand Down
34 changes: 13 additions & 21 deletions crates/store/re_chunk/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use arrow2::{
use itertools::{izip, Itertools};
use nohash_hasher::IntMap;

use re_arrow_util::ArrowArrayDowncastRef as _;
use re_byte_size::SizeBytes as _;
use re_log_types::{EntityPath, ResolvedTimeRange, Time, TimeInt, TimePoint, Timeline};
use re_types_core::{
Expand Down Expand Up @@ -445,8 +446,7 @@ impl Chunk {
let row_ids = <RowId as Loggable>::to_arrow(&row_ids)
// Unwrap: native RowIds cannot fail to serialize.
.unwrap()
.as_any()
.downcast_ref::<ArrowStructArray>()
.downcast_array_ref::<ArrowStructArray>()
// Unwrap: RowId schema is known in advance to be a struct array -- always.
.unwrap()
.clone();
Expand Down Expand Up @@ -501,8 +501,7 @@ impl Chunk {
let row_ids = <RowId as Loggable>::to_arrow(&row_ids)
// Unwrap: native RowIds cannot fail to serialize.
.unwrap()
.as_any()
.downcast_ref::<ArrowStructArray>()
.downcast_array_ref::<ArrowStructArray>()
// Unwrap: RowId schema is known in advance to be a struct array -- always.
.unwrap()
.clone();
Expand Down Expand Up @@ -876,8 +875,7 @@ impl Chunk {
.map_err(|err| ChunkError::Malformed {
reason: format!("RowIds failed to serialize: {err}"),
})?
.as_any()
.downcast_ref::<ArrowStructArray>()
.downcast_array_ref::<ArrowStructArray>()
// NOTE: impossible, but better safe than sorry.
.ok_or_else(|| ChunkError::Malformed {
reason: "RowIds failed to downcast".to_owned(),
Expand Down Expand Up @@ -1131,21 +1129,18 @@ impl TimeColumn {
}

// Sequence timelines are i64, but time columns are nanoseconds (also as i64).
if let Some(times) = array.as_any().downcast_ref::<arrow::array::Int64Array>() {
if let Some(times) = array.downcast_array_ref::<arrow::array::Int64Array>() {
Ok(times.values().clone())
} else if let Some(times) = array
.as_any()
.downcast_ref::<arrow::array::TimestampNanosecondArray>()
} else if let Some(times) =
array.downcast_array_ref::<arrow::array::TimestampNanosecondArray>()
{
Ok(times.values().clone())
} else if let Some(times) = array
.as_any()
.downcast_ref::<arrow::array::Time64NanosecondArray>()
} else if let Some(times) =
array.downcast_array_ref::<arrow::array::Time64NanosecondArray>()
{
Ok(times.values().clone())
} else if let Some(times) = array
.as_any()
.downcast_ref::<arrow::array::DurationNanosecondArray>()
} else if let Some(times) =
array.downcast_array_ref::<arrow::array::DurationNanosecondArray>()
{
Ok(times.values().clone())
} else {
Expand Down Expand Up @@ -1224,13 +1219,10 @@ impl Chunk {
};

#[allow(clippy::unwrap_used)]
let times = times.as_any().downcast_ref::<ArrowUInt64Array>().unwrap(); // sanity checked
let times = times.downcast_array_ref::<ArrowUInt64Array>().unwrap(); // sanity checked

#[allow(clippy::unwrap_used)]
let counters = counters
.as_any()
.downcast_ref::<ArrowUInt64Array>()
.unwrap(); // sanity checked
let counters = counters.downcast_array_ref::<ArrowUInt64Array>().unwrap(); // sanity checked

(times, counters)
}
Expand Down
40 changes: 40 additions & 0 deletions crates/store/re_chunk/src/concat_record_batches.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use crate::TransportChunk;

use arrow::datatypes::Schema as ArrowSchema;
use arrow2::chunk::Chunk as Arrow2Chunk;

/// Concatenate multiple [`TransportChunk`]s into one.
///
/// This is a temporary method that we use while waiting to migrate towards `arrow-rs`.
/// * `arrow2` doesn't have a `RecordBatch` type, therefore we emulate that using our `TransportChunk`s.
/// * `arrow-rs` does have one, and it natively supports concatenation.
pub fn concatenate_record_batches(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old code; moved

schema: impl Into<ArrowSchema>,
batches: &[TransportChunk],
) -> anyhow::Result<TransportChunk> {
let schema: ArrowSchema = schema.into();
anyhow::ensure!(
batches
.iter()
.all(|batch| batch.schema_ref().as_ref() == &schema),
"concatenate_record_batches: all batches must have the same schema"
);

let mut output_columns = Vec::new();

if !batches.is_empty() {
for (i, _field) in schema.fields.iter().enumerate() {
let arrays: Option<Vec<_>> = batches.iter().map(|batch| batch.column(i)).collect();
let arrays = arrays.ok_or_else(|| {
anyhow::anyhow!("concatenate_record_batches: all batches must have the same schema")
})?;
let array = re_arrow_util::arrow2_util::concat_arrays(&arrays)?;
output_columns.push(array);
}
}

Ok(TransportChunk::new(
schema,
Arrow2Chunk::new(output_columns),
))
}
Loading
Loading