-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: incomplete multi-part checkpoint handling when no hint is provided #641
base: main
Are you sure you want to change the base?
fix: incomplete multi-part checkpoint handling when no hint is provided #641
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #641 +/- ##
==========================================
+ Coverage 83.66% 84.01% +0.35%
==========================================
Files 75 75
Lines 16949 17073 +124
Branches 16949 17073 +124
==========================================
+ Hits 14180 14344 +164
+ Misses 2085 2046 -39
+ Partials 684 683 -1 ☔ View full report in Codecov by Sentry. |
kernel/src/log_segment.rs
Outdated
.filter_map(|res| match res { | ||
Ok(path) => Some(path), | ||
Err(e) => { | ||
warn!("Error processing path: {:?}", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
skipping ParsedLogPath::try_from() errors here as we were already filtering them out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I think it's dangerous to skip the errors here. It's best to try to return the error somehow. Does it work if you do something like chunk_by(|path| path.map(|x| x.version))
.
The proposed approach tries chunking by DeltaResult<Version>
instead of chunking by Version
. The hope is to return the Err
if we encounter it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The chunk_by
function requires that the keys it uses for grouping implement the PartialEq
trait so it can compare them, but the Error
in the DeltaResult
does not implement the trait so it doesn't work...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use try_collect
to handle errors before grouping.
let log_files = list_log_files(fs_client, log_root, start_version, end_version)?;
let log_files: Vec<ParsedLogPath> = log_files.try_collect()?;
for (version, files) in &log_files.into_iter().chunk_by(|path| path.version)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was rly tricky and I got nerdsniped thinking about the iterator stuff lol. But look into process_result.
I think I might've adapted your code right, but double check:
let mut checkpoint_parts = vec![];
let mut max_checkpoint_version = start_version;
let mut commit_files = Vec::with_capacity(10);
process_results(log_files, |iter| {
let log_files = iter.chunk_by(move |x| x.version);
for (version, files) in &log_files {
let mut new_checkpoint_parts = vec![];
for file in files {
if file.is_commit() {
commit_files.push(file);
} else if file.is_checkpoint() {
new_checkpoint_parts.push(file);
}
}
if validate_checkpoint_parts(version, &new_checkpoint_parts) {
max_checkpoint_version = Some(version);
checkpoint_parts = new_checkpoint_parts;
}
}
})?;
Ok((commit_files, checkpoint_parts))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: With ordered listing, checkpoint files always come before commit files. So in theory, we know whether we have a complete checkpoint by the time we encounter the commit file of a given version. Not sure if that allows simpler code or not tho.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use try collect and propagate the errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely agreed that we want to propagate errors. I wanted to avoid try_collect
on all the log files because there could be a lot of them. process_results
+ chunky_by
should only have 1 commit at a time in memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice find withprocess_results
, it works nicely!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clearing the commit files when encountering complete checkpoints makes sense, and thank you for the helpful context @scovich
kernel/src/log_segment.rs
Outdated
.filter_map(|res| match res { | ||
Ok(path) => Some(path), | ||
Err(e) => { | ||
warn!("Error processing path: {:?}", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I think it's dangerous to skip the errors here. It's best to try to return the error somehow. Does it work if you do something like chunk_by(|path| path.map(|x| x.version))
.
The proposed approach tries chunking by DeltaResult<Version>
instead of chunking by Version
. The hope is to return the Err
if we encounter it.
kernel/src/log_segment.rs
Outdated
|
||
match checkpoint_parts.last().map(|file| &file.file_type) { | ||
Some(LogPathFileType::MultiPartCheckpoint { num_parts, .. }) => { | ||
if *num_parts as usize != checkpoint_parts.len() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it may be worth checking that:
- all the checkpoint parts are indeed of type
MultiPartCheckpoint
- that the set of multi-part checkpoints has parts
0..n
.
@zachschuermann what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's actually 1..=n
. And yes, we should check. Also have to be careful because technically there could be two incomplete checkpoints with different num_parts for the same version. Also, we MUST accept at most one checkpoint -- even if multiple complete checkpoints are available -- so this function needs to filter, not just check.
Unfortunately, the poorly-chosen naming convention for multi-part checkpoint files means they interleave:
00000000000000000010.checkpoint.0000000001.0000000003.parquet
00000000000000000010.checkpoint.0000000001.0000000004.parquet
00000000000000000010.checkpoint.0000000002.0000000003.parquet
00000000000000000010.checkpoint.0000000002.0000000004.parquet
00000000000000000010.checkpoint.0000000003.0000000003.parquet
00000000000000000010.checkpoint.0000000003.0000000004.parquet
00000000000000000010.checkpoint.0000000004.0000000004.parquet
... which makes it a lot harder to identify the files of a given checkpoint and also means we can't just return a subslice in case there were multiple checkpoints to choose from.
We'd probably need to build a hash map keyed by number of parts:
let mut checkpoints = HashMap::new();
for part_file in checkpoint_parts {
use LogPathFileType::*;
match &file.file_type {
SinglePartCheckpoint
| UuidCheckpoint(_)
| MultiPartCheckpoint { part_num: 1, num_parts: 1 } =>
{
// All single-file checkpoints are equivalent, just keep one
checkpoints.insert(1, vec![part_file]);
}
MultiPartCheckpoint { part_num: 1, num_parts } => {
// Start a new multi-part checkpoint with at least 2 parts
checkpoints.insert(num_parts, vec![part_file]);
}
MultiPartCheckpoint { part_num, num_parts } => {
// Continue a new multi-part checkpoint with at least 2 parts
if let Some(part_files) = checkpoints.get_mut(num_parts) {
if part_num == 1 + part_files.size() {
// Safe to append because all previous parts exist
part_files.append(part_file);
}
}
}
Commit | CompactedCommit { .. } | Unknown => {} // invalid file type => do nothing
}
}
checkpoints
.into_iter()
.find(|(num_parts, part_files)| part_files.len() == num_parts)
.map_or(vec![], |(_, part_files)| part_files)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this reminds me to use match statements to their full power in the future. Thx for the example Ryan!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yes, did not consider the multiple incomplete checkpoints. I'll introduce tests to cover some of these scenarios. And thanks a lot for the example!
kernel/src/log_segment.rs
Outdated
} | ||
} | ||
// TODO: Include UuidCheckpoint once we actually support v2 checkpoints | ||
_ => {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think uuidcheckpoint should return false, since we can't read that checkpoint. In general, beware catchall cases in match statements
I also wonder if we should panic/error if we ever get a commit file here, since that should not be happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, good catch. I think returning an Error in the catchall case would be a good idea as we really should not get anything other than LogPathFileType::SinglePartCheckpoint
or LogPathFileType::MultiPartCheckpoint
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say handle the Uuid case separately from the catchall. Leave a comment that says this case will be supported in CheckpointV2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+10 avoid catchall in match statements. Better to enumerate the known-invalid cases, so that when a new case shows up the compiler forces us to categorize it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flushing comments (possibly stale -- a newer version was pushed during review)
kernel/src/log_segment.rs
Outdated
.filter_map(|res| match res { | ||
Ok(path) => Some(path), | ||
Err(e) => { | ||
warn!("Error processing path: {:?}", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: We only need to keep commit files after (**) the latest complete checkpoint, so we should probably commit_files.clear()
whenever we find a complete checkpoint.
(**) Delta spark also keeps the commit that corresponds to the checkpoint, in order to assign the file's timestamp as the snapshot timestamp. However:
- All actual timestamp-based time travel is resolved to an actual version, by a different code path called the Delta history manager, before requesting the snapshot by that version.
- The file timestamp is not always the snapshot timestamp, because clock skew could mean an earlier commit has a later timestamp. The Delta history manager includes logic to "adjust" the timestamp forward as needed in such cases, to ensure monotonicity.
- When in-commit timestamps are enabled, the file timestamp isn't even meaningful. Delta history manager also has logic to handle that case.
Based on all of the above, the snapshot may not even need a timestamp value. Even if it does, the value should be the adjusted (or ICT) value from a history manager component (not yet existing in kernel) -- not the "raw" timestamp from the file.
kernel/src/log_segment.rs
Outdated
.filter_map(|res| match res { | ||
Ok(path) => Some(path), | ||
Err(e) => { | ||
warn!("Error processing path: {:?}", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: With ordered listing, checkpoint files always come before commit files. So in theory, we know whether we have a complete checkpoint by the time we encounter the commit file of a given version. Not sure if that allows simpler code or not tho.
kernel/src/log_segment.rs
Outdated
|
||
match checkpoint_parts.last().map(|file| &file.file_type) { | ||
Some(LogPathFileType::MultiPartCheckpoint { num_parts, .. }) => { | ||
if *num_parts as usize != checkpoint_parts.len() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's actually 1..=n
. And yes, we should check. Also have to be careful because technically there could be two incomplete checkpoints with different num_parts for the same version. Also, we MUST accept at most one checkpoint -- even if multiple complete checkpoints are available -- so this function needs to filter, not just check.
Unfortunately, the poorly-chosen naming convention for multi-part checkpoint files means they interleave:
00000000000000000010.checkpoint.0000000001.0000000003.parquet
00000000000000000010.checkpoint.0000000001.0000000004.parquet
00000000000000000010.checkpoint.0000000002.0000000003.parquet
00000000000000000010.checkpoint.0000000002.0000000004.parquet
00000000000000000010.checkpoint.0000000003.0000000003.parquet
00000000000000000010.checkpoint.0000000003.0000000004.parquet
00000000000000000010.checkpoint.0000000004.0000000004.parquet
... which makes it a lot harder to identify the files of a given checkpoint and also means we can't just return a subslice in case there were multiple checkpoints to choose from.
We'd probably need to build a hash map keyed by number of parts:
let mut checkpoints = HashMap::new();
for part_file in checkpoint_parts {
use LogPathFileType::*;
match &file.file_type {
SinglePartCheckpoint
| UuidCheckpoint(_)
| MultiPartCheckpoint { part_num: 1, num_parts: 1 } =>
{
// All single-file checkpoints are equivalent, just keep one
checkpoints.insert(1, vec![part_file]);
}
MultiPartCheckpoint { part_num: 1, num_parts } => {
// Start a new multi-part checkpoint with at least 2 parts
checkpoints.insert(num_parts, vec![part_file]);
}
MultiPartCheckpoint { part_num, num_parts } => {
// Continue a new multi-part checkpoint with at least 2 parts
if let Some(part_files) = checkpoints.get_mut(num_parts) {
if part_num == 1 + part_files.size() {
// Safe to append because all previous parts exist
part_files.append(part_file);
}
}
}
Commit | CompactedCommit { .. } | Unknown => {} // invalid file type => do nothing
}
}
checkpoints
.into_iter()
.find(|(num_parts, part_files)| part_files.len() == num_parts)
.map_or(vec![], |(_, part_files)| part_files)
Thanks for this! |
kernel/src/log_segment.rs
Outdated
let mut checkpoints = HashMap::new(); | ||
for part_file in new_checkpoint_parts { | ||
use LogPathFileType::*; | ||
match &part_file.file_type { | ||
SinglePartCheckpoint | ||
| UuidCheckpoint(_) | ||
| MultiPartCheckpoint { | ||
part_num: 1, | ||
num_parts: 1, | ||
} => { | ||
// All single-file checkpoints are equivalent, just keep one | ||
checkpoints.insert(1, vec![part_file]); | ||
} | ||
Ordering::Equal => checkpoint_parts.push(parsed_path), | ||
Ordering::Less => {} | ||
}, | ||
MultiPartCheckpoint { | ||
part_num: 1, | ||
num_parts, | ||
} => { | ||
// Start a new multi-part checkpoint with at least 2 parts | ||
checkpoints.insert(*num_parts, vec![part_file]); | ||
} | ||
MultiPartCheckpoint { | ||
part_num, | ||
num_parts, | ||
} => { | ||
// Continue a new multi-part checkpoint with at least 2 parts | ||
if let Some(part_files) = checkpoints.get_mut(num_parts) { | ||
if *part_num == 1 + part_files.len() as u32 { | ||
// Safe to append because all previous parts exist | ||
part_files.push(part_file); | ||
} | ||
} | ||
} | ||
Commit | CompactedCommit { .. } | Unknown => {} // invalid file type => do nothing | ||
} | ||
} | ||
|
||
// Find a complete checkpoint (all parts exist) | ||
if let Some((_, complete_checkpoint)) = checkpoints | ||
.into_iter() | ||
.find(|(num_parts, part_files)| part_files.len() as u32 == *num_parts) | ||
{ | ||
// Validate the checkpoint before updating state | ||
if validate_checkpoint_parts(version, &complete_checkpoint) { | ||
checkpoint_parts = complete_checkpoint; | ||
commit_files.clear(); // Clear commit files once checkpoint is found | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a lot of code to get a complete checkpoint. I wonder if we should extract this into this:
/// Extracts a complete checkpoint from a list of checkpoint files
///
/// Explain the case where there could be incomplete checkpoints since that is not an obvious case.
get_complete_checkpoint(parts: &[ParsedLogPath]) -> Option<Vec<ParsedLogPath>> {
}
This makes the closure shorter/cleaner. It also lets us document the incomplete checkpoint case that @scovich brought up. It's not an obvious scenario, and I think it warrants some documentation.
Interested to hear your thoughts @zachschuermann
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for helper function, good idea. Based on Rule of 30, you might even consider pulling out a sub-helper to do the grouping on behalf of the helper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: Based on all the other suggestions, I would recommend just one helper, to build the hashmap:
/// Groups all checkpoint parts according to the size of the checkpoint they belong to.
///
/// NOTE: There could be a single-part and/or any number of uuid-based checkpoints. They
/// are all equivalent, and this routine keeps only one of them (arbitrarily chosen).
fn group_checkpoint_parts(parts: &[ParsedLogPath]) -> HashMap<u32, Vec<ParsedLogPath>>
kernel/src/log_segment.rs
Outdated
Some(LogPathFileType::Commit) | Some(LogPathFileType::CompactedCommit { .. }) => { | ||
warn!( | ||
"Found a commit file at version {} when expecting a checkpoint", | ||
version | ||
); | ||
return false; | ||
} | ||
Some(LogPathFileType::Unknown) => { | ||
warn!( | ||
"Found an unknown file type at version {} when expecting a checkpoint", | ||
version | ||
); | ||
return false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should never happen, and would imply that is_checkpoint
is incorrect. Is this panic-worthy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about that as well, but panics are exceptionally unfriendly to whatever engine embeds kernel.
At some point we have to rely on correctness of is_checkpoint
?
kernel/src/log_segment.rs
Outdated
} | ||
} | ||
Some(LogPathFileType::SinglePartCheckpoint) => { | ||
if checkpoint_parts.len() != 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering in what case this would happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In uncareful grouping code, it could happen if there were also 1+ UuidCheckpoint
instances and we blindly appended them all to the Vec for num_parts: 1
in the hash table. Because single-part checkpoints always compare lexically greater than uuid checkpoints, the single-part would come last and be found here.
But our code creates a new Vec every time it encounters a first part num, so that case shouldn't arise.
kernel/src/log_segment.rs
Outdated
let mut checkpoints = HashMap::new(); | ||
for part_file in new_checkpoint_parts { | ||
use LogPathFileType::*; | ||
match &part_file.file_type { | ||
SinglePartCheckpoint | ||
| UuidCheckpoint(_) | ||
| MultiPartCheckpoint { | ||
part_num: 1, | ||
num_parts: 1, | ||
} => { | ||
// All single-file checkpoints are equivalent, just keep one | ||
checkpoints.insert(1, vec![part_file]); | ||
} | ||
Ordering::Equal => checkpoint_parts.push(parsed_path), | ||
Ordering::Less => {} | ||
}, | ||
MultiPartCheckpoint { | ||
part_num: 1, | ||
num_parts, | ||
} => { | ||
// Start a new multi-part checkpoint with at least 2 parts | ||
checkpoints.insert(*num_parts, vec![part_file]); | ||
} | ||
MultiPartCheckpoint { | ||
part_num, | ||
num_parts, | ||
} => { | ||
// Continue a new multi-part checkpoint with at least 2 parts | ||
if let Some(part_files) = checkpoints.get_mut(num_parts) { | ||
if *part_num == 1 + part_files.len() as u32 { | ||
// Safe to append because all previous parts exist | ||
part_files.push(part_file); | ||
} | ||
} | ||
} | ||
Commit | CompactedCommit { .. } | Unknown => {} // invalid file type => do nothing | ||
} | ||
} | ||
|
||
// Find a complete checkpoint (all parts exist) | ||
if let Some((_, complete_checkpoint)) = checkpoints | ||
.into_iter() | ||
.find(|(num_parts, part_files)| part_files.len() as u32 == *num_parts) | ||
{ | ||
// Validate the checkpoint before updating state | ||
if validate_checkpoint_parts(version, &complete_checkpoint) { | ||
checkpoint_parts = complete_checkpoint; | ||
commit_files.clear(); // Clear commit files once checkpoint is found | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for helper function, good idea. Based on Rule of 30, you might even consider pulling out a sub-helper to do the grouping on behalf of the helper
kernel/src/log_segment.rs
Outdated
.find(|(num_parts, part_files)| part_files.len() as u32 == *num_parts) | ||
{ | ||
// Validate the checkpoint before updating state | ||
if validate_checkpoint_parts(version, &complete_checkpoint) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code above already did this validation "in line" while populating the hashmap:
- The iterator chunking ensures that all parts come from the same commit version
- The hash table (keyed by num_parts) ensures that all parts in a given Vec come from the same multi-part checkpoint
- The length checking in the "continue" case ensures the sequence of parts is gap-free
- This check is redundant, because the
ParsedLogPath
constructor already ensures that(1..=num_parts).contains(part_num)
. There's no way to get a seemingly-complete incomplete checkpoint by wrongly numbered parts. - The check also eliminates any duplicates, but duplicates shouldn't be possible if this is actually a listing result.
- The check also verifies order, but that doesn't actually matter (all checkpoint parts are independent of each other)
- This check is redundant, because the
- The
find
ignores any incomplete checkpoint (any missing part would make the vec smaller thannum_parts
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the breakdown, most of the validation in validate_checkpoint_parts
is redundant due to the in-line validation you mentioned. For the sake of caution, I would still like to warn the user when encountering scenarios covered currently in the validate_checkpoint_parts
function.
To do this, I've sprinkled in logging for scenarios below when grouping the checkpoints:
- unsupported UUID checkpoints
- invalid file types (commit, compacted, unknown)
- multiple single-part checkpoints
In group_checkpoint_parts
, when we encounter a multi-part checkpoint which has arrived out of order, we do not try to build a complete checkpoint with it (we do not add it to our hashmap) as we assume that the parts should arrive in order. Thus we are maybe left with an incomplete checkpoint part, as we could still find the remaining sequence of checkpoint parts in our iteration. I've added logging here for encountering an unexpected part number when grouping. We could additionally/alternatively do a second pass after grouping to find & warn for incomplete checkpoint parts as I remembered this was particularly important for this issue in general @OussamaSaoudi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've sprinkled in logging for scenarios below when grouping the checkpoints:
- unsupported UUID checkpoints
- invalid file types (commit, compacted, unknown)
- multiple single-part checkpoints
Re invalid file types and UUID checkpoints -- the is_checkpoint
filter before this code should have already removed those. So any logging about UUID checkpoints would need to happen before that filtering, not here, which makes me question the utility of logging here for users' sake. The checks here are only really needed because rustc demands complete matches and it doesn't grok the concept that some cases could be statically impossible.
Meanwhile, why would multiple single-part checkpoints be a problem? The spec allows one classic checkpoint, one multi-part checkpoint with a single part, and any number of uuid checkpoints (tho the latter should be filtered out long before now). Any of them should be equivalent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea the logging would only reveal errors if .is_checkpoint
did not behave correctly. Will summarize bullet points 1 & 2 with a single warn if file type is neither commit in the filtering you mentioned.
I see, the spec indeed allows for single-part checkpoints and we are correctly handling multiple single-part checkpoints as well. Thanks for the heads-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zachschuermann curious as to your thoughts about doing an additional pass after grouping the multi-part checkpoints to find an incomplete multi-part checkpoint for the purpose of warning the user.
kernel/src/log_segment.rs
Outdated
let mut checkpoints = HashMap::new(); | ||
for part_file in new_checkpoint_parts { | ||
use LogPathFileType::*; | ||
match &part_file.file_type { | ||
SinglePartCheckpoint | ||
| UuidCheckpoint(_) | ||
| MultiPartCheckpoint { | ||
part_num: 1, | ||
num_parts: 1, | ||
} => { | ||
// All single-file checkpoints are equivalent, just keep one | ||
checkpoints.insert(1, vec![part_file]); | ||
} | ||
Ordering::Equal => checkpoint_parts.push(parsed_path), | ||
Ordering::Less => {} | ||
}, | ||
MultiPartCheckpoint { | ||
part_num: 1, | ||
num_parts, | ||
} => { | ||
// Start a new multi-part checkpoint with at least 2 parts | ||
checkpoints.insert(*num_parts, vec![part_file]); | ||
} | ||
MultiPartCheckpoint { | ||
part_num, | ||
num_parts, | ||
} => { | ||
// Continue a new multi-part checkpoint with at least 2 parts | ||
if let Some(part_files) = checkpoints.get_mut(num_parts) { | ||
if *part_num == 1 + part_files.len() as u32 { | ||
// Safe to append because all previous parts exist | ||
part_files.push(part_file); | ||
} | ||
} | ||
} | ||
Commit | CompactedCommit { .. } | Unknown => {} // invalid file type => do nothing | ||
} | ||
} | ||
|
||
// Find a complete checkpoint (all parts exist) | ||
if let Some((_, complete_checkpoint)) = checkpoints | ||
.into_iter() | ||
.find(|(num_parts, part_files)| part_files.len() as u32 == *num_parts) | ||
{ | ||
// Validate the checkpoint before updating state | ||
if validate_checkpoint_parts(version, &complete_checkpoint) { | ||
checkpoint_parts = complete_checkpoint; | ||
commit_files.clear(); // Clear commit files once checkpoint is found | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: Based on all the other suggestions, I would recommend just one helper, to build the hashmap:
/// Groups all checkpoint parts according to the size of the checkpoint they belong to.
///
/// NOTE: There could be a single-part and/or any number of uuid-based checkpoints. They
/// are all equivalent, and this routine keeps only one of them (arbitrarily chosen).
fn group_checkpoint_parts(parts: &[ParsedLogPath]) -> HashMap<u32, Vec<ParsedLogPath>>
kernel/src/log_segment.rs
Outdated
Some(LogPathFileType::MultiPartCheckpoint { num_parts, .. }) => { | ||
if *num_parts as usize != checkpoint_parts.len() { | ||
warn!( | ||
"Found a multi-part checkpoint at version {}. Found {} parts, expected {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: This would imply either duplicates or part numbers outside the checkpoint's part range. The former should be impossible for a correct listing, and the latter would produce an error in ParsedLogPath::try_from.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm considering we have this place and some below, I wonder if we should modify semantics such that the function returns a Result<bool>
and we could leverage result to return Error::internal
for some unexpected cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(maybe, but the whole function shouldn't even exist -- see other comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, @scovich pointed out that much of the logging here is redundant. I've summarized most to a single log when encountering an unknown file type when filtering the batched log_files
. ref to discussion #641 (comment)
kernel/src/log_segment.rs
Outdated
} | ||
} | ||
Some(LogPathFileType::SinglePartCheckpoint) => { | ||
if checkpoint_parts.len() != 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In uncareful grouping code, it could happen if there were also 1+ UuidCheckpoint
instances and we blindly appended them all to the Vec for num_parts: 1
in the hash table. Because single-part checkpoints always compare lexically greater than uuid checkpoints, the single-part would come last and be found here.
But our code creates a new Vec every time it encounters a first part num, so that case shouldn't arise.
kernel/src/log_segment.rs
Outdated
Some(LogPathFileType::Commit) | Some(LogPathFileType::CompactedCommit { .. }) => { | ||
warn!( | ||
"Found a commit file at version {} when expecting a checkpoint", | ||
version | ||
); | ||
return false; | ||
} | ||
Some(LogPathFileType::Unknown) => { | ||
warn!( | ||
"Found an unknown file type at version {} when expecting a checkpoint", | ||
version | ||
); | ||
return false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about that as well, but panics are exceptionally unfriendly to whatever engine embeds kernel.
At some point we have to rely on correctness of is_checkpoint
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approach looks good. Didn't dig deeply into unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got a little confused with the test, so I added more detail to the comments. Looks good besides that!
9b170fb
to
8844cdc
Compare
8844cdc
to
4dd4e5b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good job!
What changes are proposed in this pull request?
Problem behavior:
When constructing a
LogSegment
, we scan the_delta_log
directory for checkpoints and commits. The most recent checkpoint is collected from the log, but the kernel does not check that it has collected a full multi-part checkpoint. Thus, the checkpoint returned may be incomplete. More context can be found here #497Expected behavior:
Keep track of the most recent complete checkpoint when iterating through the
_delta_log
directory and warn the user when incomplete checkpoints are encountered.build_snapshot_with_out_of_date_last_checkpoint
showcases the expected behavior, which now passes ✅ .build_snapshot_with_unsupported_uuid_checkpoint
verifying that currently unsupportedUuidCheckpoint
s are not considered valid checkpoints.How was this change tested?
build_snapshot_with_out_of_date_last_checkpoint
is the intended behavior of scenario described in Log file listing should handle incomplete multi-part checkpoints #497build_snapshot_with_out_of_date_last_checkpoint_and_incomplete_recent_checkpoint
. Prior to this PR, when an out of date checkpoint hint is provided, AND there is a more recent incomplete checkpoint, the more recent incomplete checkpoint would be taken and assumed to be complete - resulting in errors building the snapshot. Now, the more recent incomplete checkpoint is ignored.build_snapshot_with_unsupported_uuid_checkpoint
verifies that currently unsupportedUuidCheckpoint
s are not considered valid checkpoints.build_snapshot_with_multiple_incomplete_multipart_checkpoints
verifies that multiple incomplete multi-part checkpoints that interleave are validated appropriately, more context here: fix: incomplete multi-part checkpoint handling when no hint is provided #641 (comment)resolves #497