Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: incomplete multi-part checkpoint handling when no hint is provided #641

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
139 changes: 116 additions & 23 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
//! files.

use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME};
use crate::path::ParsedLogPath;
use crate::path::{LogPathFileType, ParsedLogPath};
use crate::schema::SchemaRef;
use crate::snapshot::CheckpointMetadata;
use crate::utils::require;
use crate::{
DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileSystemClient, Version,
};
use itertools::Itertools;
use std::cmp::Ordering;
use itertools::{process_results, Itertools};
use std::collections::HashMap;
use std::convert::identity;
use std::sync::{Arc, LazyLock};
use tracing::warn;
Expand Down Expand Up @@ -311,31 +311,71 @@ fn list_log_files_with_version(
// on config at some point
let mut commit_files = Vec::with_capacity(10);
let mut checkpoint_parts = vec![];
let mut max_checkpoint_version = start_version;

for parsed_path in list_log_files(fs_client, log_root, start_version, end_version)? {
let parsed_path = parsed_path?;
if parsed_path.is_commit() {
commit_files.push(parsed_path);
} else if parsed_path.is_checkpoint() {
let path_version = parsed_path.version;
match max_checkpoint_version {
None => {
checkpoint_parts.push(parsed_path);
max_checkpoint_version = Some(path_version);
let log_files = list_log_files(fs_client, log_root, start_version, end_version)?;

process_results(log_files, |iter| {
let log_files = iter.chunk_by(move |x| x.version);
sebastiantia marked this conversation as resolved.
Show resolved Hide resolved
for (version, files) in &log_files {
let mut new_checkpoint_parts = vec![];
for file in files {
if file.is_commit() {
commit_files.push(file);
} else if file.is_checkpoint() {
new_checkpoint_parts.push(file);
}
Some(checkpoint_version) => match path_version.cmp(&checkpoint_version) {
Ordering::Greater => {
max_checkpoint_version = Some(path_version);
checkpoint_parts.clear();
checkpoint_parts.push(parsed_path);
}

// Group checkpoint parts by the number of parts they have
let mut checkpoints = HashMap::new();
for part_file in new_checkpoint_parts {
use LogPathFileType::*;
match &part_file.file_type {
SinglePartCheckpoint
| UuidCheckpoint(_)
| MultiPartCheckpoint {
part_num: 1,
num_parts: 1,
} => {
// All single-file checkpoints are equivalent, just keep one
checkpoints.insert(1, vec![part_file]);
}
Ordering::Equal => checkpoint_parts.push(parsed_path),
Ordering::Less => {}
},
MultiPartCheckpoint {
part_num: 1,
num_parts,
} => {
// Start a new multi-part checkpoint with at least 2 parts
checkpoints.insert(*num_parts, vec![part_file]);
}
MultiPartCheckpoint {
part_num,
num_parts,
} => {
// Continue a new multi-part checkpoint with at least 2 parts
if let Some(part_files) = checkpoints.get_mut(num_parts) {
if *part_num == 1 + part_files.len() as u32 {
sebastiantia marked this conversation as resolved.
Show resolved Hide resolved
// Safe to append because all previous parts exist
part_files.push(part_file);
}
}
}
Commit | CompactedCommit { .. } | Unknown => {} // invalid file type => do nothing
}
}

// Find a complete checkpoint (all parts exist)
sebastiantia marked this conversation as resolved.
Show resolved Hide resolved
if let Some((_, complete_checkpoint)) = checkpoints
.into_iter()
.find(|(num_parts, part_files)| part_files.len() as u32 == *num_parts)
{
// Validate the checkpoint before updating state
if validate_checkpoint_parts(version, &complete_checkpoint) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code above already did this validation "in line" while populating the hashmap:

  • The iterator chunking ensures that all parts come from the same commit version
  • The hash table (keyed by num_parts) ensures that all parts in a given Vec come from the same multi-part checkpoint
  • The length checking in the "continue" case ensures the sequence of parts is gap-free
    • This check is redundant, because the ParsedLogPath constructor already ensures that (1..=num_parts).contains(part_num). There's no way to get a seemingly-complete incomplete checkpoint by wrongly numbered parts.
    • The check also eliminates any duplicates, but duplicates shouldn't be possible if this is actually a listing result.
    • The check also verifies order, but that doesn't actually matter (all checkpoint parts are independent of each other)
  • The find ignores any incomplete checkpoint (any missing part would make the vec smaller than num_parts)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the breakdown, most of the validation in validate_checkpoint_parts is redundant due to the in-line validation you mentioned. For the sake of caution, I would still like to warn the user when encountering scenarios covered currently in the validate_checkpoint_parts function.
To do this, I've sprinkled in logging for scenarios below when grouping the checkpoints:

  • unsupported UUID checkpoints
  • invalid file types (commit, compacted, unknown)
  • multiple single-part checkpoints

In group_checkpoint_parts, when we encounter a multi-part checkpoint which has arrived out of order, we do not try to build a complete checkpoint with it (we do not add it to our hashmap) as we assume that the parts should arrive in order. Thus we are maybe left with an incomplete checkpoint part, as we could still find the remaining sequence of checkpoint parts in our iteration. I've added logging here for encountering an unexpected part number when grouping. We could additionally/alternatively do a second pass after grouping to find & warn for incomplete checkpoint parts as I remembered this was particularly important for this issue in general @OussamaSaoudi

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've sprinkled in logging for scenarios below when grouping the checkpoints:

  • unsupported UUID checkpoints
  • invalid file types (commit, compacted, unknown)
  • multiple single-part checkpoints

Re invalid file types and UUID checkpoints -- the is_checkpoint filter before this code should have already removed those. So any logging about UUID checkpoints would need to happen before that filtering, not here, which makes me question the utility of logging here for users' sake. The checks here are only really needed because rustc demands complete matches and it doesn't grok the concept that some cases could be statically impossible.

Meanwhile, why would multiple single-part checkpoints be a problem? The spec allows one classic checkpoint, one multi-part checkpoint with a single part, and any number of uuid checkpoints (tho the latter should be filtered out long before now). Any of them should be equivalent.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea the logging would only reveal errors if .is_checkpoint did not behave correctly. Will summarize bullet points 1 & 2 with a single warn if file type is neither commit in the filtering you mentioned.

I see, the spec indeed allows for single-part checkpoints and we are correctly handling multiple single-part checkpoints as well. Thanks for the heads-up.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zachschuermann curious as to your thoughts about doing an additional pass after grouping the multi-part checkpoints to find an incomplete multi-part checkpoint for the purpose of warning the user.

checkpoint_parts = complete_checkpoint;
commit_files.clear(); // Clear commit files once checkpoint is found
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a lot of code to get a complete checkpoint. I wonder if we should extract this into this:

/// Extracts a complete checkpoint from a list of checkpoint files
///
/// Explain the case where there could be incomplete checkpoints since that is not an obvious case.
get_complete_checkpoint(parts: &[ParsedLogPath]) -> Option<Vec<ParsedLogPath>> {
}

This makes the closure shorter/cleaner. It also lets us document the incomplete checkpoint case that @scovich brought up. It's not an obvious scenario, and I think it warrants some documentation.

Interested to hear your thoughts @zachschuermann

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for helper function, good idea. Based on Rule of 30, you might even consider pulling out a sub-helper to do the grouping on behalf of the helper

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: Based on all the other suggestions, I would recommend just one helper, to build the hashmap:

/// Groups all checkpoint parts according to the size of the checkpoint they belong to. 
///
/// NOTE: There could be a single-part and/or any number of uuid-based checkpoints. They
/// are all equivalent, and this routine keeps only one of them (arbitrarily chosen).
fn group_checkpoint_parts(parts: &[ParsedLogPath]) -> HashMap<u32, Vec<ParsedLogPath>>

}
}
}
})?;

Ok((commit_files, checkpoint_parts))
}
Expand Down Expand Up @@ -377,3 +417,56 @@ fn list_log_files_with_checkpoint(
}
Ok((commit_files, checkpoint_parts))
}

/// Validates that all the checkpoint parts belong to the same checkpoint version and that all parts
/// are present. Returns `true` if we have a complete checkpoint, `false` otherwise.
fn validate_checkpoint_parts(version: u64, checkpoint_parts: &[ParsedLogPath]) -> bool {
match checkpoint_parts.last().map(|file| &file.file_type) {
Some(LogPathFileType::MultiPartCheckpoint { num_parts, .. }) => {
if *num_parts as usize != checkpoint_parts.len() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be worth checking that:

  1. all the checkpoint parts are indeed of type MultiPartCheckpoint
  2. that the set of multi-part checkpoints has parts 0..n.

@zachschuermann what do you think?

Copy link
Collaborator

@scovich scovich Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's actually 1..=n. And yes, we should check. Also have to be careful because technically there could be two incomplete checkpoints with different num_parts for the same version. Also, we MUST accept at most one checkpoint -- even if multiple complete checkpoints are available -- so this function needs to filter, not just check.

Unfortunately, the poorly-chosen naming convention for multi-part checkpoint files means they interleave:

00000000000000000010.checkpoint.0000000001.0000000003.parquet
00000000000000000010.checkpoint.0000000001.0000000004.parquet
00000000000000000010.checkpoint.0000000002.0000000003.parquet
00000000000000000010.checkpoint.0000000002.0000000004.parquet
00000000000000000010.checkpoint.0000000003.0000000003.parquet
00000000000000000010.checkpoint.0000000003.0000000004.parquet
00000000000000000010.checkpoint.0000000004.0000000004.parquet

... which makes it a lot harder to identify the files of a given checkpoint and also means we can't just return a subslice in case there were multiple checkpoints to choose from.

We'd probably need to build a hash map keyed by number of parts:

let mut checkpoints = HashMap::new();
for part_file in checkpoint_parts {
    use LogPathFileType::*;
    match &file.file_type {
        SinglePartCheckpoint 
            | UuidCheckpoint(_) 
            | MultiPartCheckpoint { part_num: 1, num_parts: 1 } => 
        {
           // All single-file checkpoints are equivalent, just keep one
           checkpoints.insert(1, vec![part_file]);
        }
        MultiPartCheckpoint { part_num: 1, num_parts } => {
            // Start a new multi-part checkpoint with at least 2 parts
            checkpoints.insert(num_parts, vec![part_file]);
        }
        MultiPartCheckpoint { part_num, num_parts } => {
            // Continue a new multi-part checkpoint with at least 2 parts
            if let Some(part_files) = checkpoints.get_mut(num_parts) {
                if part_num == 1 + part_files.size() {
                    // Safe to append because all previous parts exist
                    part_files.append(part_file); 
                }
            }
        }
        Commit | CompactedCommit { .. } | Unknown => {} // invalid file type => do nothing
    } 
}
checkpoints
    .into_iter()
    .find(|(num_parts, part_files)| part_files.len() == num_parts)
    .map_or(vec![], |(_, part_files)| part_files)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this reminds me to use match statements to their full power in the future. Thx for the example Ryan!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yes, did not consider the multiple incomplete checkpoints. I'll introduce tests to cover some of these scenarios. And thanks a lot for the example!

warn!(
"Found a multi-part checkpoint at version {}. Found {} parts, expected {}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This would imply either duplicates or part numbers outside the checkpoint's part range. The former should be impossible for a correct listing, and the latter would produce an error in ParsedLogPath::try_from.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm considering we have this place and some below, I wonder if we should modify semantics such that the function returns a Result<bool> and we could leverage result to return Error::internal for some unexpected cases?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(maybe, but the whole function shouldn't even exist -- see other comment)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, @scovich pointed out that much of the logging here is redundant. I've summarized most to a single log when encountering an unknown file type when filtering the batched log_files. ref to discussion #641 (comment)

version,
checkpoint_parts.len(),
num_parts
);
return false;
}
}
Some(LogPathFileType::SinglePartCheckpoint) => {
if checkpoint_parts.len() != 1 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering in what case this would happen?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In uncareful grouping code, it could happen if there were also 1+ UuidCheckpoint instances and we blindly appended them all to the Vec for num_parts: 1 in the hash table. Because single-part checkpoints always compare lexically greater than uuid checkpoints, the single-part would come last and be found here.

But our code creates a new Vec every time it encounters a first part num, so that case shouldn't arise.

warn!(
"Found a single-part checkpoint at version {}. Found {} parts",
version,
checkpoint_parts.len()
);
return false;
}
}
Some(LogPathFileType::UuidCheckpoint(_)) => {
warn!(
"Found a UUID checkpoint at version {} when it is not supported",
version
);
return false;
}
Some(LogPathFileType::Commit) | Some(LogPathFileType::CompactedCommit { .. }) => {
warn!(
"Found a commit file at version {} when expecting a checkpoint",
version
);
return false;
}
Some(LogPathFileType::Unknown) => {
warn!(
"Found an unknown file type at version {} when expecting a checkpoint",
version
);
return false;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should never happen, and would imply that is_checkpoint is incorrect. Is this panic-worthy?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that as well, but panics are exceptionally unfriendly to whatever engine embeds kernel.
At some point we have to rely on correctness of is_checkpoint?

// No checkpoint parts
None => return false,
}

true
}
115 changes: 112 additions & 3 deletions kernel/src/log_segment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,73 @@ fn build_log_with_paths_and_checkpoint(
(Box::new(client), log_root)
}

#[test]
fn build_snapshot_with_unsupported_uuid_checkpoint() {
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
delta_path_for_version(5, "checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.parquet"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
);

let log_segment = LogSegment::for_snapshot(client.as_ref(), log_root, None, None).unwrap();
let commit_files = log_segment.ascending_commit_files;
let checkpoint_parts = log_segment.checkpoint_parts;

assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 3);

let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = vec![4, 5, 6, 7];
assert_eq!(versions, expected_versions);
}

#[test]
fn build_snapshot_with_multiple_incomplete_multipart_checkpoints() {
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_multipart_checkpoint(1, 1, 3),
// Part 2 is missing!
delta_path_for_multipart_checkpoint(3, 3, 3),
sebastiantia marked this conversation as resolved.
Show resolved Hide resolved
// Part 1 is missing!
delta_path_for_multipart_checkpoint(2, 1, 2),
delta_path_for_version(2, "json"),
delta_path_for_multipart_checkpoint(3, 1, 3),
// Part 2 is missing!
delta_path_for_multipart_checkpoint(3, 3, 3),
delta_path_for_multipart_checkpoint(3, 1, 4),
delta_path_for_multipart_checkpoint(3, 2, 4),
delta_path_for_multipart_checkpoint(3, 3, 4),
delta_path_for_multipart_checkpoint(3, 4, 4),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
sebastiantia marked this conversation as resolved.
Show resolved Hide resolved
],
None,
);

let log_segment = LogSegment::for_snapshot(client.as_ref(), log_root, None, None).unwrap();
let commit_files = log_segment.ascending_commit_files;
let checkpoint_parts = log_segment.checkpoint_parts;

assert_eq!(checkpoint_parts.len(), 4);
assert_eq!(checkpoint_parts[0].version, 3);

let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = vec![4, 5, 6, 7];
assert_eq!(versions, expected_versions);
}

#[test]
fn build_snapshot_with_out_of_date_last_checkpoint() {
let checkpoint_metadata = CheckpointMetadata {
Expand Down Expand Up @@ -257,11 +324,8 @@ fn build_snapshot_with_bad_checkpoint_hint_fails() {
assert!(log_segment.is_err())
}

#[ignore]
#[test]
fn build_snapshot_with_missing_checkpoint_part_no_hint() {
// TODO: Handle checkpoints correctly so that this test passes: https://github.com/delta-io/delta-kernel-rs/issues/497

// Part 2 of 3 is missing from checkpoint 5. The Snapshot should be made of checkpoint
// number 3 and commit files 4 to 7.
let (client, log_root) = build_log_with_paths_and_checkpoint(
Expand Down Expand Up @@ -296,6 +360,51 @@ fn build_snapshot_with_missing_checkpoint_part_no_hint() {
assert_eq!(versions, expected_versions);
}

#[test]
fn build_snapshot_with_out_of_date_last_checkpoint_and_incomplete_recent_checkpoint() {
// When the _last_checkpoint is out of date and the most recent checkpoint is incomplete, the
// Snapshot should be made of the most recent complete checkpoint and the commit files that
// follow it.
let checkpoint_metadata = CheckpointMetadata {
version: 3,
size: 10,
parts: None,
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
};

let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_multipart_checkpoint(5, 1, 3),
// Part 2 is missing!
sebastiantia marked this conversation as resolved.
Show resolved Hide resolved
delta_path_for_multipart_checkpoint(5, 3, 3),
delta_path_for_version(5, "json"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
Some(&checkpoint_metadata),
);

let log_segment =
LogSegment::for_snapshot(client.as_ref(), log_root, checkpoint_metadata, None).unwrap();
let commit_files = log_segment.ascending_commit_files;
let checkpoint_parts = log_segment.checkpoint_parts;

assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 3);

let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = vec![4, 5, 6, 7];
assert_eq!(versions, expected_versions);
}

#[test]
fn build_snapshot_without_checkpoints() {
let (client, log_root) = build_log_with_paths_and_checkpoint(
Expand Down
Loading