Skip to content

Commit

Permalink
remove redundant validation & refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastiantia committed Jan 16, 2025
1 parent 206250d commit ec3ca83
Showing 1 changed file with 59 additions and 103 deletions.
162 changes: 59 additions & 103 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,75 +309,84 @@ fn list_log_files_with_version(
) -> DeltaResult<(Vec<ParsedLogPath>, Vec<ParsedLogPath>)> {
// We expect 10 commit files per checkpoint, so start with that size. We could adjust this based
// on config at some point
let mut commit_files = Vec::with_capacity(10);
let mut checkpoint_parts = vec![];

let log_files = list_log_files(fs_client, log_root, start_version, end_version)?;

process_results(log_files, |iter| {
let log_files = iter.chunk_by(move |x| x.version);
let mut commit_files = Vec::with_capacity(10);
let mut checkpoint_parts = vec![];

let log_files = iter.chunk_by(|x| x.version);
for (version, files) in &log_files {
let mut new_checkpoint_parts = vec![];
for file in files {
if file.is_commit() {
commit_files.push(file);
} else if file.is_checkpoint() {
new_checkpoint_parts.push(file);
} else {
warn!(
"Found a file with unknown file type {:?} at version {}",
file.file_type, version
);
}
}

// Group checkpoint parts by the number of parts they have
let mut checkpoints = HashMap::new();
for part_file in new_checkpoint_parts {
use LogPathFileType::*;
match &part_file.file_type {
SinglePartCheckpoint
| UuidCheckpoint(_)
| MultiPartCheckpoint {
part_num: 1,
num_parts: 1,
} => {
// All single-file checkpoints are equivalent, just keep one
checkpoints.insert(1, vec![part_file]);
}
MultiPartCheckpoint {
part_num: 1,
num_parts,
} => {
// Start a new multi-part checkpoint with at least 2 parts
checkpoints.insert(*num_parts, vec![part_file]);
}
MultiPartCheckpoint {
part_num,
num_parts,
} => {
// Continue a new multi-part checkpoint with at least 2 parts
if let Some(part_files) = checkpoints.get_mut(num_parts) {
if *part_num == 1 + part_files.len() as u32 {
// Safe to append because all previous parts exist
part_files.push(part_file);
}
}
}
Commit | CompactedCommit { .. } | Unknown => {} // invalid file type => do nothing
}
}

// Find a complete checkpoint (all parts exist)
if let Some((_, complete_checkpoint)) = checkpoints
// Group and find the first complete checkpoint found for this version,
// no matter how many exist as they are all equivalent.
if let Some((_, complete_checkpoint)) = group_checkpoint_parts(new_checkpoint_parts)
.into_iter()
.find(|(num_parts, part_files)| part_files.len() as u32 == *num_parts)
.find(|(num_parts, part_files)| part_files.len() == *num_parts as usize)
{
// Validate the checkpoint before updating state
if validate_checkpoint_parts(version, &complete_checkpoint) {
checkpoint_parts = complete_checkpoint;
commit_files.clear(); // Clear commit files once checkpoint is found
}
checkpoint_parts = complete_checkpoint;
commit_files.clear(); // Clear commits when a complete checkpoint is found
}
}
})?;
(commit_files, checkpoint_parts)
})
}

Ok((commit_files, checkpoint_parts))
/// Groups all checkpoint parts according to the size of the checkpoint they belong to.
///
/// NOTE: There could be a single-part and/or any number of uuid-based checkpoints. They
/// are all equivalent, and this routine keeps only one of them (arbitrarily chosen).
fn group_checkpoint_parts(parts: Vec<ParsedLogPath>) -> HashMap<u32, Vec<ParsedLogPath>> {
let mut checkpoints: HashMap<u32, Vec<ParsedLogPath>> = HashMap::new();
for part_file in parts {
use LogPathFileType::*;
match &part_file.file_type {
SinglePartCheckpoint
| UuidCheckpoint(_)
| MultiPartCheckpoint {
part_num: 1,
num_parts: 1,
} => {
// All single-file checkpoints are equivalent, just keep one
checkpoints.insert(1, vec![part_file]);
}
MultiPartCheckpoint {
part_num: 1,
num_parts,
} => {
// Start a new multi-part checkpoint with at least 2 parts
checkpoints.insert(*num_parts, vec![part_file]);
}
MultiPartCheckpoint {
part_num,
num_parts,
} => {
// Continue a new multi-part checkpoint with at least 2 parts
if let Some(part_files) = checkpoints.get_mut(num_parts) {
if *part_num as usize == 1 + part_files.len() {
// Safe to append because all previous parts exist
part_files.push(part_file);
}
}
}
Commit | CompactedCommit { .. } | Unknown => {}
}
}
checkpoints
}

/// List all commit and checkpoint files after the provided checkpoint. It is guaranteed that all
Expand Down Expand Up @@ -417,56 +426,3 @@ fn list_log_files_with_checkpoint(
}
Ok((commit_files, checkpoint_parts))
}

/// Validates that all the checkpoint parts belong to the same checkpoint version and that all parts
/// are present. Returns `true` if we have a complete checkpoint, `false` otherwise.
fn validate_checkpoint_parts(version: u64, checkpoint_parts: &[ParsedLogPath]) -> bool {
match checkpoint_parts.last().map(|file| &file.file_type) {
Some(LogPathFileType::MultiPartCheckpoint { num_parts, .. }) => {
if *num_parts as usize != checkpoint_parts.len() {
warn!(
"Found a multi-part checkpoint at version {}. Found {} parts, expected {}",
version,
checkpoint_parts.len(),
num_parts
);
return false;
}
}
Some(LogPathFileType::SinglePartCheckpoint) => {
if checkpoint_parts.len() != 1 {
warn!(
"Found a single-part checkpoint at version {}. Found {} parts",
version,
checkpoint_parts.len()
);
return false;
}
}
Some(LogPathFileType::UuidCheckpoint(_)) => {
warn!(
"Found a UUID checkpoint at version {} when it is not supported",
version
);
return false;
}
Some(LogPathFileType::Commit) | Some(LogPathFileType::CompactedCommit { .. }) => {
warn!(
"Found a commit file at version {} when expecting a checkpoint",
version
);
return false;
}
Some(LogPathFileType::Unknown) => {
warn!(
"Found an unknown file type at version {} when expecting a checkpoint",
version
);
return false;
}
// No checkpoint parts
None => return false,
}

true
}

0 comments on commit ec3ca83

Please sign in to comment.