From f17f3dd6e14884498b368bbca0746d2aa7e425d4 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Fri, 10 May 2024 15:23:44 -0400 Subject: [PATCH 1/5] Crudely handle invalidation --- crates/re_query/src/flat_vec_deque.rs | 16 +++++++++++++++- crates/re_query/src/range/query.rs | 7 ++++++- crates/re_query/src/range/results.rs | 14 ++++++++++++++ 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/crates/re_query/src/flat_vec_deque.rs b/crates/re_query/src/flat_vec_deque.rs index 1a89f41628e9..6a608e133a85 100644 --- a/crates/re_query/src/flat_vec_deque.rs +++ b/crates/re_query/src/flat_vec_deque.rs @@ -16,6 +16,12 @@ pub trait ErasedFlatVecDeque: std::any::Any { fn into_any(self: Box) -> Box; + /// Dynamically dispatches to [`FlatVecDeque::clone`]. + /// + /// This is prefixed with `dyn_` to avoid method dispatch ambiguities that are very hard to + /// avoid even with explicit syntax and that silently lead to infinite recursions. + fn dyn_clone(&self) -> Box; + /// Dynamically dispatches to [`FlatVecDeque::num_entries`]. /// /// This is prefixed with `dyn_` to avoid method dispatch ambiguities that are very hard to @@ -53,7 +59,10 @@ pub trait ErasedFlatVecDeque: std::any::Any { fn dyn_total_size_bytes(&self) -> u64; } -impl ErasedFlatVecDeque for FlatVecDeque { +impl ErasedFlatVecDeque for FlatVecDeque +where + T: Send + Sync + Clone, +{ #[inline] fn as_any(&self) -> &dyn std::any::Any { self @@ -64,6 +73,11 @@ impl ErasedFlatVecDeque for FlatVecDeque { self } + #[inline] + fn dyn_clone(&self) -> Box { + Box::new((*self).clone()) + } + #[inline] fn into_any(self: Box) -> Box { self diff --git a/crates/re_query/src/range/query.rs b/crates/re_query/src/range/query.rs index 3fe0d3fe7982..4a3b05ff992d 100644 --- a/crates/re_query/src/range/query.rs +++ b/crates/re_query/src/range/query.rs @@ -290,7 +290,12 @@ impl RangeCache { return; }; - per_data_time.write().truncate_at_time(pending_invalidation); + // Invalidating data is tricky. Our results object may have been cloned and shared already. + // We can't just invalidate the data in-place without guaranteeing the post-invalidation query + // will return the same results as the pending pre-invalidation queries. + let mut new_inner = (*per_data_time.read()).clone(); + new_inner.truncate_at_time(pending_invalidation); + per_data_time.inner = Arc::new(RwLock::new(new_inner)); } } diff --git a/crates/re_query/src/range/results.rs b/crates/re_query/src/range/results.rs index 6c7602cc293c..4407f0f1a34a 100644 --- a/crates/re_query/src/range/results.rs +++ b/crates/re_query/src/range/results.rs @@ -642,6 +642,20 @@ pub struct RangeComponentResultsInner { pub(crate) cached_dense: Option>, } +impl Clone for RangeComponentResultsInner { + #[inline] + fn clone(&self) -> Self { + Self { + indices: self.indices.clone(), + promises_front: self.promises_front.clone(), + promises_back: self.promises_back.clone(), + front_status: self.front_status.clone(), + back_status: self.back_status.clone(), + cached_dense: self.cached_dense.as_ref().map(|dense| dense.dyn_clone()), + } + } +} + impl SizeBytes for RangeComponentResultsInner { #[inline] fn heap_size_bytes(&self) -> u64 { From 55beed945f9d39e981fd0deaba99f9a394ed1d89 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Fri, 10 May 2024 19:38:22 -0400 Subject: [PATCH 2/5] Fix incorrect sanity check --- crates/re_query/src/range/results.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/re_query/src/range/results.rs b/crates/re_query/src/range/results.rs index 4407f0f1a34a..069dacd0b005 100644 --- a/crates/re_query/src/range/results.rs +++ b/crates/re_query/src/range/results.rs @@ -761,9 +761,10 @@ impl RangeComponentResultsInner { }), "back promises must always be sorted in ascending index order" ); - if let (Some(p_index), Some(i_index)) = - (promises_back.last().map(|(index, _)| index), indices.back()) - { + if let (Some(p_index), Some(i_index)) = ( + promises_back.first().map(|(index, _)| index), + indices.back(), + ) { assert!( i_index < p_index, "the leftmost back promise must have an index larger than the rightmost data index ({i_index:?} < {p_index:?})", From 3d78cfa407e4719ad7a3aeb069bff3800163e576 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Mon, 13 May 2024 09:49:38 -0400 Subject: [PATCH 3/5] Fix the time_range to include pending data --- crates/re_query/src/cache_stats.rs | 2 +- crates/re_query/src/range/query.rs | 6 +++--- crates/re_query/src/range/results.rs | 20 ++++++++++++++------ 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/crates/re_query/src/cache_stats.rs b/crates/re_query/src/cache_stats.rs index 19494f5a8de5..2277213cecb1 100644 --- a/crates/re_query/src/cache_stats.rs +++ b/crates/re_query/src/cache_stats.rs @@ -83,7 +83,7 @@ impl Caches { ( key.clone(), ( - cache.time_range(), + cache.pending_time_range(), CachedComponentStats { total_indices: cache.indices.len() as _, total_instances: cache.num_instances(), diff --git a/crates/re_query/src/range/query.rs b/crates/re_query/src/range/query.rs index 4a3b05ff992d..2eb0aec40d48 100644 --- a/crates/re_query/src/range/query.rs +++ b/crates/re_query/src/range/query.rs @@ -63,7 +63,7 @@ impl Caches { // and coarsly invalidates the whole cache in that case, to avoid the kind of bugs // showcased in . { - let time_range = cache.per_data_time.read_recursive().time_range(); + let time_range = cache.per_data_time.read_recursive().pending_time_range(); if let Some(time_range) = time_range { { let hole_start = time_range.max(); @@ -373,7 +373,7 @@ impl RangeComponentResultsInner { }); let pending_min = i64::min(pending_front_min, pending_back_min); - if let Some(time_range) = self.time_range() { + if let Some(time_range) = self.pending_time_range() { let time_range_min = i64::min(time_range.min().as_i64().saturating_sub(1), pending_min); reduced_query .range @@ -454,7 +454,7 @@ impl RangeComponentResultsInner { }); let pending_max = i64::max(pending_back_max, pending_front_max); - if let Some(time_range) = self.time_range() { + if let Some(time_range) = self.pending_time_range() { let time_range_max = i64::max(time_range.max().as_i64().saturating_add(1), pending_max); reduced_query .range diff --git a/crates/re_query/src/range/results.rs b/crates/re_query/src/range/results.rs index 069dacd0b005..403012f05c7a 100644 --- a/crates/re_query/src/range/results.rs +++ b/crates/re_query/src/range/results.rs @@ -776,14 +776,22 @@ impl RangeComponentResultsInner { } } - /// Returns the time range covered by the cached data. + /// Returns the pending time range that will be covered by the cached data. /// /// Reminder: [`TimeInt::STATIC`] is never included in [`ResolvedTimeRange`]s. #[inline] - pub fn time_range(&self) -> Option { - let first_time = self.indices.front().map(|(t, _)| *t)?; - let last_time = self.indices.back().map(|(t, _)| *t)?; - Some(ResolvedTimeRange::new(first_time, last_time)) + pub fn pending_time_range(&self) -> Option { + let pending_front_min = self.promises_front.first().map(|((t, _), _)| *t); + let pending_front_max = self.promises_front.last().map(|((t, _), _)| *t); + let pending_back_max = self.promises_back.last().map(|((t, _), _)| *t); + + let first_time = self.indices.front().map(|(t, _)| *t); + let last_time = self.indices.back().map(|(t, _)| *t); + + Some(ResolvedTimeRange::new( + pending_front_min.or(first_time)?, + pending_back_max.or(last_time).or(pending_front_max)?, + )) } #[inline] @@ -801,7 +809,7 @@ impl RangeComponentResultsInner { pub fn truncate_at_time(&mut self, threshold: TimeInt) { re_tracing::profile_function!(); - let time_range = self.time_range(); + let time_range = self.pending_time_range(); let Self { indices, From a8d374f2c9759ea213e2685cef815dc7601bbe08 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Mon, 13 May 2024 10:14:09 -0400 Subject: [PATCH 4/5] Simplify the front/back queries to use the new pending_time_range --- crates/re_query/src/range/query.rs | 75 ++++-------------------------- 1 file changed, 8 insertions(+), 67 deletions(-) diff --git a/crates/re_query/src/range/query.rs b/crates/re_query/src/range/query.rs index 2eb0aec40d48..e1756ba30d48 100644 --- a/crates/re_query/src/range/query.rs +++ b/crates/re_query/src/range/query.rs @@ -331,14 +331,6 @@ impl RangeComponentResultsInner { pub fn compute_front_query(&self, query: &RangeQuery) -> Option { let mut reduced_query = query.clone(); - // If nothing has been cached already, then we just want to query everything. - if self.indices.is_empty() - && self.promises_front.is_empty() - && self.promises_back.is_empty() - { - return Some(reduced_query); - } - // If the cache contains static data, then there's no point in querying anything else since // static data overrides everything anyway. if self @@ -359,29 +351,14 @@ impl RangeComponentResultsInner { // We check the back promises too just because I'm feeling overly cautious. // See `Concurrency edge-case` section below. - let pending_front_min = self - .promises_front - .first() - .map_or(TimeInt::MAX.as_i64(), |((t, _), _)| { - t.as_i64().saturating_sub(1) - }); - let pending_back_min = self - .promises_back - .first() - .map_or(TimeInt::MAX.as_i64(), |((t, _), _)| { - t.as_i64().saturating_sub(1) - }); - let pending_min = i64::min(pending_front_min, pending_back_min); - if let Some(time_range) = self.pending_time_range() { - let time_range_min = i64::min(time_range.min().as_i64().saturating_sub(1), pending_min); + let time_range_min = time_range.min().as_i64().saturating_sub(1); reduced_query .range .set_max(i64::min(reduced_query.range.max().as_i64(), time_range_min)); } else { - reduced_query - .range - .set_max(i64::min(reduced_query.range.max().as_i64(), pending_min)); + // If nothing has been cached already, then we just want to query everything. + return Some(reduced_query); } if reduced_query.range.max() < reduced_query.range.min() { @@ -401,15 +378,6 @@ impl RangeComponentResultsInner { ) -> Option { let mut reduced_query = query.clone(); - // If nothing has been cached already, then the front query is already going to take care - // of everything. - if self.indices.is_empty() - && self.promises_front.is_empty() - && self.promises_back.is_empty() - { - return None; - } - // If the cache contains static data, then there's no point in querying anything else since // static data overrides everything anyway. if self @@ -420,49 +388,22 @@ impl RangeComponentResultsInner { return None; } - // Otherwise, query for what's missing on the back-side of the cache, while making sure to + // Otherwise, query for what's missing on the back-side of the cache., while making sure to // take pending promises into account! // // Keep in mind: it is not possible for the cache to contain only part of a given // timestamp. All entries for a given timestamp are loaded and invalidated atomically, // whether it's promises or already resolved entries. - // # Concurrency edge-case - // - // We need to make sure to check for both front _and_ back promises here. - // If two or more threads are querying for the same entity path concurrently, it is possible - // that the first thread queues a bunch of promises to the front queue, but then relinquishes - // control to the second thread before ever resolving those promises. - // - // If that happens, the second thread would end up queuing the same exact promises in the - // back queue, yielding duplicated data. - // In most cases, duplicated data isn't noticeable (except for a performance drop), but if - // the duplication is only partial this can sometimes lead to visual glitches, depending on - // the visualizer used. - - let pending_back_max = self - .promises_back - .last() - .map_or(TimeInt::MIN.as_i64(), |((t, _), _)| { - t.as_i64().saturating_add(1) - }); - let pending_front_max = self - .promises_front - .last() - .map_or(TimeInt::MIN.as_i64(), |((t, _), _)| { - t.as_i64().saturating_add(1) - }); - let pending_max = i64::max(pending_back_max, pending_front_max); - if let Some(time_range) = self.pending_time_range() { - let time_range_max = i64::max(time_range.max().as_i64().saturating_add(1), pending_max); + let time_range_max = time_range.max().as_i64().saturating_add(1); reduced_query .range .set_min(i64::max(reduced_query.range.min().as_i64(), time_range_max)); } else { - reduced_query - .range - .set_min(i64::max(reduced_query.range.min().as_i64(), pending_max)); + // If nothing has been cached already, then the front query is already going to take care + // of everything. + return None; } // Back query should never overlap with the front query. From 341cb64b5ba3c24f82fe80e0901ddda01a411923 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Mon, 13 May 2024 16:17:20 +0200 Subject: [PATCH 5/5] add unit test demonstrating the issue (fails on main!) --- crates/re_query/tests/range.rs | 161 +++++++++++++++++++++++++++------ 1 file changed, 134 insertions(+), 27 deletions(-) diff --git a/crates/re_query/tests/range.rs b/crates/re_query/tests/range.rs index bd2e73706ed1..84c9f3a3b317 100644 --- a/crates/re_query/tests/range.rs +++ b/crates/re_query/tests/range.rs @@ -4,7 +4,7 @@ use re_data_store::{DataStore, RangeQuery, ResolvedTimeRange, StoreSubscriber as use re_log_types::{ build_frame_nr, example_components::{MyColor, MyPoint, MyPoints}, - DataRow, EntityPath, RowId, TimePoint, Timeline, + DataReadError, DataRow, EntityPath, RowId, TimePoint, Timeline, }; use re_query::{Caches, PromiseResolver, PromiseResult}; use re_types::Archetype; @@ -872,34 +872,26 @@ fn concurrent_multitenant_edge_case() -> anyhow::Result<()> { let entity_path: EntityPath = "point".into(); - let timepoint1 = [build_frame_nr(123)]; - let points1 = vec![MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)]; - let row1 = DataRow::from_cells1_sized( - RowId::new(), - entity_path.clone(), - timepoint1, - points1.clone(), - )?; - insert_and_react(&mut store, &mut caches, &row1); + let add_points = |time: i64, point_value: f32| { + let timepoint = [build_frame_nr(time)]; + let points = vec![ + MyPoint::new(point_value, point_value + 1.0), + MyPoint::new(point_value + 2.0, point_value + 3.0), + ]; + let row = DataRow::from_cells1_sized( + RowId::new(), + entity_path.clone(), + timepoint, + points.clone(), + )?; + Ok::<_, DataReadError>((timepoint, points, row)) + }; - let timepoint2 = [build_frame_nr(223)]; - let points2 = vec![MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)]; - let row2 = DataRow::from_cells1_sized( - RowId::new(), - entity_path.clone(), - timepoint2, - points2.clone(), - )?; + let (timepoint1, points1, row1) = add_points(123, 1.0)?; + insert_and_react(&mut store, &mut caches, &row1); + let (_timepoint2, points2, row2) = add_points(223, 2.0)?; insert_and_react(&mut store, &mut caches, &row2); - - let timepoint3 = [build_frame_nr(323)]; - let points3 = vec![MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)]; - let row3 = DataRow::from_cells1_sized( - RowId::new(), - entity_path.clone(), - timepoint3, - points3.clone(), - )?; + let (_timepoint3, points3, row3) = add_points(323, 3.0)?; insert_and_react(&mut store, &mut caches, &row3); // --- Tenant #1 queries the data, but doesn't cache the result in the deserialization cache --- @@ -942,6 +934,121 @@ fn concurrent_multitenant_edge_case() -> anyhow::Result<()> { Ok(()) } +// See . +#[test] +fn concurrent_multitenant_edge_case2() -> anyhow::Result<()> { + let mut store = DataStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + Default::default(), + ); + let mut caches = Caches::new(&store); + + let entity_path: EntityPath = "point".into(); + + let add_points = |time: i64, point_value: f32| { + let timepoint = [build_frame_nr(time)]; + let points = vec![ + MyPoint::new(point_value, point_value + 1.0), + MyPoint::new(point_value + 2.0, point_value + 3.0), + ]; + let row = DataRow::from_cells1_sized( + RowId::new(), + entity_path.clone(), + timepoint, + points.clone(), + )?; + Ok::<_, DataReadError>((timepoint, points, row)) + }; + + let (timepoint1, points1, row1) = add_points(123, 1.0)?; + insert_and_react(&mut store, &mut caches, &row1); + let (_timepoint2, points2, row2) = add_points(223, 2.0)?; + insert_and_react(&mut store, &mut caches, &row2); + let (_timepoint3, points3, row3) = add_points(323, 3.0)?; + insert_and_react(&mut store, &mut caches, &row3); + let (_timepoint4, points4, row4) = add_points(423, 4.0)?; + insert_and_react(&mut store, &mut caches, &row4); + let (_timepoint5, points5, row5) = add_points(523, 5.0)?; + insert_and_react(&mut store, &mut caches, &row5); + + // --- Tenant #1 queries the data at (123, 223), but doesn't cache the result in the deserialization cache --- + + let query1 = re_data_store::RangeQuery::new(timepoint1[0].0, ResolvedTimeRange::new(123, 223)); + { + let cached = caches.range( + &store, + &query1, + &entity_path, + MyPoints::all_components().iter().copied(), + ); + + let _cached_all_points = cached.get_required(MyPoint::name()).unwrap(); + } + + // --- Tenant #2 queries the data at (423, 523), but doesn't cache the result in the deserialization cache --- + + let query2 = re_data_store::RangeQuery::new(timepoint1[0].0, ResolvedTimeRange::new(423, 523)); + { + let cached = caches.range( + &store, + &query2, + &entity_path, + MyPoints::all_components().iter().copied(), + ); + + let _cached_all_points = cached.get_required(MyPoint::name()).unwrap(); + } + + // --- Tenant #2 queries the data at (223, 423) and deserializes it --- + + let query3 = re_data_store::RangeQuery::new(timepoint1[0].0, ResolvedTimeRange::new(223, 423)); + let expected_points = &[ + ( + (TimeInt::new_temporal(223), row2.row_id()), + points2.as_slice(), + ), // + ( + (TimeInt::new_temporal(323), row3.row_id()), + points3.as_slice(), + ), // + ( + (TimeInt::new_temporal(423), row4.row_id()), + points4.as_slice(), + ), // + ]; + query_and_compare(&caches, &store, &query3, &entity_path, expected_points, &[]); + + // --- Tenant #1 finally deserializes its data --- + + let expected_points = &[ + ( + (TimeInt::new_temporal(123), row1.row_id()), + points1.as_slice(), + ), // + ( + (TimeInt::new_temporal(223), row2.row_id()), + points2.as_slice(), + ), // + ]; + query_and_compare(&caches, &store, &query1, &entity_path, expected_points, &[]); + + // --- Tenant #2 finally deserializes its data --- + + let expected_points = &[ + ( + (TimeInt::new_temporal(423), row4.row_id()), + points4.as_slice(), + ), // + ( + (TimeInt::new_temporal(523), row5.row_id()), + points5.as_slice(), + ), // + ]; + query_and_compare(&caches, &store, &query2, &entity_path, expected_points, &[]); + + Ok(()) +} + // --- fn insert_and_react(store: &mut DataStore, caches: &mut Caches, row: &DataRow) {