-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: incomplete multi-part checkpoint handling when no hint is provided #641
base: main
Are you sure you want to change the base?
Changes from 3 commits
9a228ed
ad85ff7
059c2c3
39f34d3
9dd3367
206250d
ec3ca83
537190a
9b170fb
4dd4e5b
961398a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,15 +2,14 @@ | |
//! files. | ||
|
||
use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME}; | ||
use crate::path::ParsedLogPath; | ||
use crate::path::{LogPathFileType, ParsedLogPath}; | ||
use crate::schema::SchemaRef; | ||
use crate::snapshot::CheckpointMetadata; | ||
use crate::utils::require; | ||
use crate::{ | ||
DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileSystemClient, Version, | ||
}; | ||
use itertools::Itertools; | ||
use std::cmp::Ordering; | ||
use std::convert::identity; | ||
use std::sync::{Arc, LazyLock}; | ||
use tracing::warn; | ||
|
@@ -313,28 +312,33 @@ fn list_log_files_with_version( | |
let mut checkpoint_parts = vec![]; | ||
let mut max_checkpoint_version = start_version; | ||
|
||
for parsed_path in list_log_files(fs_client, log_root, start_version, end_version)? { | ||
let parsed_path = parsed_path?; | ||
if parsed_path.is_commit() { | ||
commit_files.push(parsed_path); | ||
} else if parsed_path.is_checkpoint() { | ||
let path_version = parsed_path.version; | ||
match max_checkpoint_version { | ||
None => { | ||
checkpoint_parts.push(parsed_path); | ||
max_checkpoint_version = Some(path_version); | ||
} | ||
Some(checkpoint_version) => match path_version.cmp(&checkpoint_version) { | ||
Ordering::Greater => { | ||
max_checkpoint_version = Some(path_version); | ||
checkpoint_parts.clear(); | ||
checkpoint_parts.push(parsed_path); | ||
} | ||
Ordering::Equal => checkpoint_parts.push(parsed_path), | ||
Ordering::Less => {} | ||
}, | ||
let log_files = list_log_files(fs_client, log_root, start_version, end_version)?; | ||
|
||
for (version, files) in &log_files | ||
.filter_map(|res| match res { | ||
Ok(path) => Some(path), | ||
Err(e) => { | ||
warn!("Error processing path: {:?}", e); | ||
None | ||
} | ||
}) | ||
.chunk_by(|path| path.version) | ||
{ | ||
let mut new_checkpoint_parts = vec![]; | ||
|
||
for file in files { | ||
if file.is_commit() { | ||
commit_files.push(file); | ||
} else if file.is_checkpoint() { | ||
new_checkpoint_parts.push(file); | ||
} | ||
} | ||
sebastiantia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if validate_checkpoint_parts(version, &new_checkpoint_parts) | ||
&& (max_checkpoint_version.is_none() || Some(version) >= max_checkpoint_version) | ||
OussamaSaoudi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
max_checkpoint_version = Some(version); | ||
checkpoint_parts = new_checkpoint_parts; | ||
} | ||
} | ||
|
||
Ok((commit_files, checkpoint_parts)) | ||
|
@@ -377,3 +381,39 @@ fn list_log_files_with_checkpoint( | |
} | ||
Ok((commit_files, checkpoint_parts)) | ||
} | ||
|
||
/// Validates that all the checkpoint parts belong to the same checkpoint version and that all parts | ||
/// are present. Returns `true` if we have a complete checkpoint, `false` otherwise. | ||
fn validate_checkpoint_parts(version: u64, checkpoint_parts: &Vec<ParsedLogPath>) -> bool { | ||
if checkpoint_parts.is_empty() { | ||
return false; | ||
} | ||
|
||
match checkpoint_parts.last().map(|file| &file.file_type) { | ||
Some(LogPathFileType::MultiPartCheckpoint { num_parts, .. }) => { | ||
if *num_parts as usize != checkpoint_parts.len() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it may be worth checking that:
@zachschuermann what do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: it's actually Unfortunately, the poorly-chosen naming convention for multi-part checkpoint files means they interleave:
... which makes it a lot harder to identify the files of a given checkpoint and also means we can't just return a subslice in case there were multiple checkpoints to choose from. We'd probably need to build a hash map keyed by number of parts: let mut checkpoints = HashMap::new();
for part_file in checkpoint_parts {
use LogPathFileType::*;
match &file.file_type {
SinglePartCheckpoint
| UuidCheckpoint(_)
| MultiPartCheckpoint { part_num: 1, num_parts: 1 } =>
{
// All single-file checkpoints are equivalent, just keep one
checkpoints.insert(1, vec![part_file]);
}
MultiPartCheckpoint { part_num: 1, num_parts } => {
// Start a new multi-part checkpoint with at least 2 parts
checkpoints.insert(num_parts, vec![part_file]);
}
MultiPartCheckpoint { part_num, num_parts } => {
// Continue a new multi-part checkpoint with at least 2 parts
if let Some(part_files) = checkpoints.get_mut(num_parts) {
if part_num == 1 + part_files.size() {
// Safe to append because all previous parts exist
part_files.append(part_file);
}
}
}
Commit | CompactedCommit { .. } | Unknown => {} // invalid file type => do nothing
}
}
checkpoints
.into_iter()
.find(|(num_parts, part_files)| part_files.len() == num_parts)
.map_or(vec![], |(_, part_files)| part_files) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this reminds me to use match statements to their full power in the future. Thx for the example Ryan! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah yes, did not consider the multiple incomplete checkpoints. I'll introduce tests to cover some of these scenarios. And thanks a lot for the example! |
||
warn!( | ||
"Found a multi-part checkpoint at version {}. Found {} parts, expected {}", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: This would imply either duplicates or part numbers outside the checkpoint's part range. The former should be impossible for a correct listing, and the latter would produce an error in ParsedLogPath::try_from. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm considering we have this place and some below, I wonder if we should modify semantics such that the function returns a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (maybe, but the whole function shouldn't even exist -- see other comment) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, @scovich pointed out that much of the logging here is redundant. I've summarized most to a single log when encountering an unknown file type when filtering the batched |
||
version, | ||
checkpoint_parts.len(), | ||
num_parts | ||
); | ||
return false; | ||
} | ||
} | ||
Some(LogPathFileType::SinglePartCheckpoint) => { | ||
if checkpoint_parts.len() != 1 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm wondering in what case this would happen? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In uncareful grouping code, it could happen if there were also 1+ But our code creates a new Vec every time it encounters a first part num, so that case shouldn't arise. |
||
warn!( | ||
"Found a single-part checkpoint at version {}. Found {} parts", | ||
version, | ||
checkpoint_parts.len() | ||
); | ||
return false; | ||
} | ||
} | ||
// TODO: Include UuidCheckpoint once we actually support v2 checkpoints | ||
_ => {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think uuidcheckpoint should return false, since we can't read that checkpoint. In general, beware catchall cases in match statements I also wonder if we should panic/error if we ever get a commit file here, since that should not be happening. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, good catch. I think returning an Error in the catchall case would be a good idea as we really should not get anything other than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd say handle the Uuid case separately from the catchall. Leave a comment that says this case will be supported in CheckpointV2. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +10 avoid catchall in match statements. Better to enumerate the known-invalid cases, so that when a new case shows up the compiler forces us to categorize it. |
||
} | ||
|
||
true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
skipping ParsedLogPath::try_from() errors here as we were already filtering them out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I think it's dangerous to skip the errors here. It's best to try to return the error somehow. Does it work if you do something like
chunk_by(|path| path.map(|x| x.version))
.The proposed approach tries chunking by
DeltaResult<Version>
instead of chunking byVersion
. The hope is to return theErr
if we encounter it.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
chunk_by
function requires that the keys it uses for grouping implement thePartialEq
trait so it can compare them, but theError
in theDeltaResult
does not implement the trait so it doesn't work...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use
try_collect
to handle errors before grouping.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was rly tricky and I got nerdsniped thinking about the iterator stuff lol. But look into process_result.
I think I might've adapted your code right, but double check:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: With ordered listing, checkpoint files always come before commit files. So in theory, we know whether we have a complete checkpoint by the time we encounter the commit file of a given version. Not sure if that allows simpler code or not tho.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use try collect and propagate the errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely agreed that we want to propagate errors. I wanted to avoid
try_collect
on all the log files because there could be a lot of them.process_results
+chunky_by
should only have 1 commit at a time in memory.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice find with
process_results
, it works nicely!There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clearing the commit files when encountering complete checkpoints makes sense, and thank you for the helpful context @scovich