Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: incomplete multi-part checkpoint handling when no hint is provided #641

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
162 changes: 59 additions & 103 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,75 +309,84 @@ fn list_log_files_with_version(
) -> DeltaResult<(Vec<ParsedLogPath>, Vec<ParsedLogPath>)> {
// We expect 10 commit files per checkpoint, so start with that size. We could adjust this based
// on config at some point
let mut commit_files = Vec::with_capacity(10);
let mut checkpoint_parts = vec![];

let log_files = list_log_files(fs_client, log_root, start_version, end_version)?;

process_results(log_files, |iter| {
let log_files = iter.chunk_by(move |x| x.version);
let mut commit_files = Vec::with_capacity(10);
let mut checkpoint_parts = vec![];

let log_files = iter.chunk_by(|x| x.version);
for (version, files) in &log_files {
let mut new_checkpoint_parts = vec![];
for file in files {
if file.is_commit() {
commit_files.push(file);
} else if file.is_checkpoint() {
new_checkpoint_parts.push(file);
} else {
warn!(
"Found a file with unknown file type {:?} at version {}",
file.file_type, version
);
}
}

// Group checkpoint parts by the number of parts they have
let mut checkpoints = HashMap::new();
for part_file in new_checkpoint_parts {
use LogPathFileType::*;
match &part_file.file_type {
SinglePartCheckpoint
| UuidCheckpoint(_)
| MultiPartCheckpoint {
part_num: 1,
num_parts: 1,
} => {
// All single-file checkpoints are equivalent, just keep one
checkpoints.insert(1, vec![part_file]);
}
MultiPartCheckpoint {
part_num: 1,
num_parts,
} => {
// Start a new multi-part checkpoint with at least 2 parts
checkpoints.insert(*num_parts, vec![part_file]);
}
MultiPartCheckpoint {
part_num,
num_parts,
} => {
// Continue a new multi-part checkpoint with at least 2 parts
if let Some(part_files) = checkpoints.get_mut(num_parts) {
if *part_num == 1 + part_files.len() as u32 {
// Safe to append because all previous parts exist
part_files.push(part_file);
}
}
}
Commit | CompactedCommit { .. } | Unknown => {} // invalid file type => do nothing
}
}

// Find a complete checkpoint (all parts exist)
if let Some((_, complete_checkpoint)) = checkpoints
// Group and find the first complete checkpoint for this version.
// All checkpoints for the same version are equivalent, so we only take one.
if let Some((_, complete_checkpoint)) = group_checkpoint_parts(new_checkpoint_parts)
.into_iter()
.find(|(num_parts, part_files)| part_files.len() as u32 == *num_parts)
.find(|(num_parts, part_files)| part_files.len() == *num_parts as usize)
{
// Validate the checkpoint before updating state
if validate_checkpoint_parts(version, &complete_checkpoint) {
checkpoint_parts = complete_checkpoint;
commit_files.clear(); // Clear commit files once checkpoint is found
}
checkpoint_parts = complete_checkpoint;
commit_files.clear(); // Clear commits when a complete checkpoint is found
sebastiantia marked this conversation as resolved.
Show resolved Hide resolved
}
}
sebastiantia marked this conversation as resolved.
Show resolved Hide resolved
})?;
(commit_files, checkpoint_parts)
})
}

Ok((commit_files, checkpoint_parts))
/// Groups all checkpoint parts according to the size of the checkpoint they belong to.
///
/// NOTE: There could be a single-part and/or any number of uuid-based checkpoints. They
/// are all equivalent, and this routine keeps only one of them (arbitrarily chosen).
fn group_checkpoint_parts(parts: Vec<ParsedLogPath>) -> HashMap<u32, Vec<ParsedLogPath>> {
let mut checkpoints: HashMap<u32, Vec<ParsedLogPath>> = HashMap::new();
for part_file in parts {
use LogPathFileType::*;
match &part_file.file_type {
SinglePartCheckpoint
| UuidCheckpoint(_)
| MultiPartCheckpoint {
part_num: 1,
num_parts: 1,
} => {
// All single-file checkpoints are equivalent, just keep one
checkpoints.insert(1, vec![part_file]);
}
MultiPartCheckpoint {
part_num: 1,
num_parts,
} => {
// Start a new multi-part checkpoint with at least 2 parts
checkpoints.insert(*num_parts, vec![part_file]);
}
MultiPartCheckpoint {
part_num,
num_parts,
} => {
// Continue a new multi-part checkpoint with at least 2 parts
if let Some(part_files) = checkpoints.get_mut(num_parts) {
if *part_num as usize == 1 + part_files.len() {
// Safe to append because all previous parts exist
part_files.push(part_file);
}
}
}
Commit | CompactedCommit { .. } | Unknown => {}
}
}
checkpoints
}

/// List all commit and checkpoint files after the provided checkpoint. It is guaranteed that all
Expand Down Expand Up @@ -417,56 +426,3 @@ fn list_log_files_with_checkpoint(
}
Ok((commit_files, checkpoint_parts))
}

/// Validates that all the checkpoint parts belong to the same checkpoint version and that all parts
/// are present. Returns `true` if we have a complete checkpoint, `false` otherwise.
fn validate_checkpoint_parts(version: u64, checkpoint_parts: &[ParsedLogPath]) -> bool {
match checkpoint_parts.last().map(|file| &file.file_type) {
Some(LogPathFileType::MultiPartCheckpoint { num_parts, .. }) => {
if *num_parts as usize != checkpoint_parts.len() {
warn!(
"Found a multi-part checkpoint at version {}. Found {} parts, expected {}",
version,
checkpoint_parts.len(),
num_parts
);
return false;
}
}
Some(LogPathFileType::SinglePartCheckpoint) => {
if checkpoint_parts.len() != 1 {
warn!(
"Found a single-part checkpoint at version {}. Found {} parts",
version,
checkpoint_parts.len()
);
return false;
}
}
Some(LogPathFileType::UuidCheckpoint(_)) => {
warn!(
"Found a UUID checkpoint at version {} when it is not supported",
version
);
return false;
}
Some(LogPathFileType::Commit) | Some(LogPathFileType::CompactedCommit { .. }) => {
warn!(
"Found a commit file at version {} when expecting a checkpoint",
version
);
return false;
}
Some(LogPathFileType::Unknown) => {
warn!(
"Found an unknown file type at version {} when expecting a checkpoint",
version
);
return false;
}
// No checkpoint parts
None => return false,
}

true
}
Loading