Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with cache invalidation related to parallel queries. #6292

Merged
merged 5 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/re_query/src/cache_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl Caches {
(
key.clone(),
(
cache.time_range(),
cache.pending_time_range(),
CachedComponentStats {
total_indices: cache.indices.len() as _,
total_instances: cache.num_instances(),
Expand Down
16 changes: 15 additions & 1 deletion crates/re_query/src/flat_vec_deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ pub trait ErasedFlatVecDeque: std::any::Any {

fn into_any(self: Box<Self>) -> Box<dyn std::any::Any>;

/// Dynamically dispatches to [`FlatVecDeque::clone`].
///
/// This is prefixed with `dyn_` to avoid method dispatch ambiguities that are very hard to
/// avoid even with explicit syntax and that silently lead to infinite recursions.
fn dyn_clone(&self) -> Box<dyn ErasedFlatVecDeque + Send + Sync>;

/// Dynamically dispatches to [`FlatVecDeque::num_entries`].
///
/// This is prefixed with `dyn_` to avoid method dispatch ambiguities that are very hard to
Expand Down Expand Up @@ -53,7 +59,10 @@ pub trait ErasedFlatVecDeque: std::any::Any {
fn dyn_total_size_bytes(&self) -> u64;
}

impl<T: SizeBytes + 'static> ErasedFlatVecDeque for FlatVecDeque<T> {
impl<T: SizeBytes + 'static> ErasedFlatVecDeque for FlatVecDeque<T>
where
T: Send + Sync + Clone,
{
#[inline]
fn as_any(&self) -> &dyn std::any::Any {
self
Expand All @@ -64,6 +73,11 @@ impl<T: SizeBytes + 'static> ErasedFlatVecDeque for FlatVecDeque<T> {
self
}

#[inline]
fn dyn_clone(&self) -> Box<dyn ErasedFlatVecDeque + Send + Sync> {
Box::new((*self).clone())
}

#[inline]
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
self
Expand Down
88 changes: 17 additions & 71 deletions crates/re_query/src/range/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Caches {
// and coarsly invalidates the whole cache in that case, to avoid the kind of bugs
// showcased in <https://github.com/rerun-io/rerun/issues/5686>.
{
let time_range = cache.per_data_time.read_recursive().time_range();
let time_range = cache.per_data_time.read_recursive().pending_time_range();
if let Some(time_range) = time_range {
{
let hole_start = time_range.max();
Expand Down Expand Up @@ -290,7 +290,12 @@ impl RangeCache {
return;
};

per_data_time.write().truncate_at_time(pending_invalidation);
// Invalidating data is tricky. Our results object may have been cloned and shared already.
// We can't just invalidate the data in-place without guaranteeing the post-invalidation query
// will return the same results as the pending pre-invalidation queries.
let mut new_inner = (*per_data_time.read()).clone();
new_inner.truncate_at_time(pending_invalidation);
per_data_time.inner = Arc::new(RwLock::new(new_inner));
}
}

Expand Down Expand Up @@ -326,14 +331,6 @@ impl RangeComponentResultsInner {
pub fn compute_front_query(&self, query: &RangeQuery) -> Option<RangeQuery> {
let mut reduced_query = query.clone();

// If nothing has been cached already, then we just want to query everything.
if self.indices.is_empty()
&& self.promises_front.is_empty()
&& self.promises_back.is_empty()
{
return Some(reduced_query);
}

// If the cache contains static data, then there's no point in querying anything else since
// static data overrides everything anyway.
if self
Expand All @@ -354,29 +351,14 @@ impl RangeComponentResultsInner {
// We check the back promises too just because I'm feeling overly cautious.
// See `Concurrency edge-case` section below.

let pending_front_min = self
.promises_front
.first()
.map_or(TimeInt::MAX.as_i64(), |((t, _), _)| {
t.as_i64().saturating_sub(1)
});
let pending_back_min = self
.promises_back
.first()
.map_or(TimeInt::MAX.as_i64(), |((t, _), _)| {
t.as_i64().saturating_sub(1)
});
let pending_min = i64::min(pending_front_min, pending_back_min);

if let Some(time_range) = self.time_range() {
let time_range_min = i64::min(time_range.min().as_i64().saturating_sub(1), pending_min);
if let Some(time_range) = self.pending_time_range() {
let time_range_min = time_range.min().as_i64().saturating_sub(1);
reduced_query
.range
.set_max(i64::min(reduced_query.range.max().as_i64(), time_range_min));
} else {
reduced_query
.range
.set_max(i64::min(reduced_query.range.max().as_i64(), pending_min));
// If nothing has been cached already, then we just want to query everything.
return Some(reduced_query);
}

if reduced_query.range.max() < reduced_query.range.min() {
Expand All @@ -396,15 +378,6 @@ impl RangeComponentResultsInner {
) -> Option<RangeQuery> {
let mut reduced_query = query.clone();

// If nothing has been cached already, then the front query is already going to take care
// of everything.
if self.indices.is_empty()
&& self.promises_front.is_empty()
&& self.promises_back.is_empty()
{
return None;
}

// If the cache contains static data, then there's no point in querying anything else since
// static data overrides everything anyway.
if self
Expand All @@ -415,49 +388,22 @@ impl RangeComponentResultsInner {
return None;
}

// Otherwise, query for what's missing on the back-side of the cache, while making sure to
// Otherwise, query for what's missing on the back-side of the cache., while making sure to
// take pending promises into account!
//
// Keep in mind: it is not possible for the cache to contain only part of a given
// timestamp. All entries for a given timestamp are loaded and invalidated atomically,
// whether it's promises or already resolved entries.

// # Concurrency edge-case
//
// We need to make sure to check for both front _and_ back promises here.
// If two or more threads are querying for the same entity path concurrently, it is possible
// that the first thread queues a bunch of promises to the front queue, but then relinquishes
// control to the second thread before ever resolving those promises.
//
// If that happens, the second thread would end up queuing the same exact promises in the
// back queue, yielding duplicated data.
// In most cases, duplicated data isn't noticeable (except for a performance drop), but if
// the duplication is only partial this can sometimes lead to visual glitches, depending on
// the visualizer used.

let pending_back_max = self
.promises_back
.last()
.map_or(TimeInt::MIN.as_i64(), |((t, _), _)| {
t.as_i64().saturating_add(1)
});
let pending_front_max = self
.promises_front
.last()
.map_or(TimeInt::MIN.as_i64(), |((t, _), _)| {
t.as_i64().saturating_add(1)
});
let pending_max = i64::max(pending_back_max, pending_front_max);

if let Some(time_range) = self.time_range() {
let time_range_max = i64::max(time_range.max().as_i64().saturating_add(1), pending_max);
if let Some(time_range) = self.pending_time_range() {
let time_range_max = time_range.max().as_i64().saturating_add(1);
reduced_query
.range
.set_min(i64::max(reduced_query.range.min().as_i64(), time_range_max));
} else {
reduced_query
.range
.set_min(i64::max(reduced_query.range.min().as_i64(), pending_max));
// If nothing has been cached already, then the front query is already going to take care
// of everything.
return None;
}

// Back query should never overlap with the front query.
Expand Down
41 changes: 32 additions & 9 deletions crates/re_query/src/range/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,20 @@ pub struct RangeComponentResultsInner {
pub(crate) cached_dense: Option<Box<dyn ErasedFlatVecDeque + Send + Sync>>,
}

impl Clone for RangeComponentResultsInner {
#[inline]
fn clone(&self) -> Self {
Self {
indices: self.indices.clone(),
promises_front: self.promises_front.clone(),
promises_back: self.promises_back.clone(),
front_status: self.front_status.clone(),
back_status: self.back_status.clone(),
cached_dense: self.cached_dense.as_ref().map(|dense| dense.dyn_clone()),
}
}
}

impl SizeBytes for RangeComponentResultsInner {
#[inline]
fn heap_size_bytes(&self) -> u64 {
Expand Down Expand Up @@ -747,9 +761,10 @@ impl RangeComponentResultsInner {
}),
"back promises must always be sorted in ascending index order"
);
if let (Some(p_index), Some(i_index)) =
(promises_back.last().map(|(index, _)| index), indices.back())
{
if let (Some(p_index), Some(i_index)) = (
promises_back.first().map(|(index, _)| index),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing this this didn't catch any new sanity issues, but I think this was just a typo unless I'm misunderstanding the intent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, nice catch

indices.back(),
) {
assert!(
i_index < p_index,
"the leftmost back promise must have an index larger than the rightmost data index ({i_index:?} < {p_index:?})",
Expand All @@ -761,14 +776,22 @@ impl RangeComponentResultsInner {
}
}

/// Returns the time range covered by the cached data.
/// Returns the pending time range that will be covered by the cached data.
///
/// Reminder: [`TimeInt::STATIC`] is never included in [`ResolvedTimeRange`]s.
#[inline]
pub fn time_range(&self) -> Option<ResolvedTimeRange> {
let first_time = self.indices.front().map(|(t, _)| *t)?;
let last_time = self.indices.back().map(|(t, _)| *t)?;
Some(ResolvedTimeRange::new(first_time, last_time))
pub fn pending_time_range(&self) -> Option<ResolvedTimeRange> {
let pending_front_min = self.promises_front.first().map(|((t, _), _)| *t);
let pending_front_max = self.promises_front.last().map(|((t, _), _)| *t);
let pending_back_max = self.promises_back.last().map(|((t, _), _)| *t);

let first_time = self.indices.front().map(|(t, _)| *t);
let last_time = self.indices.back().map(|(t, _)| *t);

Some(ResolvedTimeRange::new(
pending_front_min.or(first_time)?,
pending_back_max.or(last_time).or(pending_front_max)?,
))
}

#[inline]
Expand All @@ -786,7 +809,7 @@ impl RangeComponentResultsInner {
pub fn truncate_at_time(&mut self, threshold: TimeInt) {
re_tracing::profile_function!();

let time_range = self.time_range();
let time_range = self.pending_time_range();

let Self {
indices,
Expand Down
Loading
Loading