Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: incomplete multi-part checkpoint handling when no hint is provided #641

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
101 changes: 75 additions & 26 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
//! files.

use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME};
use crate::path::ParsedLogPath;
use crate::path::{LogPathFileType, ParsedLogPath};
use crate::schema::SchemaRef;
use crate::snapshot::CheckpointMetadata;
use crate::utils::require;
use crate::{
DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileSystemClient, Version,
};
use itertools::Itertools;
use std::cmp::Ordering;
use itertools::{process_results, Itertools};
use std::collections::HashMap;
use std::convert::identity;
use std::sync::{Arc, LazyLock};
use tracing::warn;
Expand Down Expand Up @@ -309,35 +309,84 @@ fn list_log_files_with_version(
) -> DeltaResult<(Vec<ParsedLogPath>, Vec<ParsedLogPath>)> {
// We expect 10 commit files per checkpoint, so start with that size. We could adjust this based
// on config at some point
let mut commit_files = Vec::with_capacity(10);
let mut checkpoint_parts = vec![];
let mut max_checkpoint_version = start_version;

for parsed_path in list_log_files(fs_client, log_root, start_version, end_version)? {
let parsed_path = parsed_path?;
if parsed_path.is_commit() {
commit_files.push(parsed_path);
} else if parsed_path.is_checkpoint() {
let path_version = parsed_path.version;
match max_checkpoint_version {
None => {
checkpoint_parts.push(parsed_path);
max_checkpoint_version = Some(path_version);
let log_files = list_log_files(fs_client, log_root, start_version, end_version)?;

process_results(log_files, |iter| {
let mut commit_files = Vec::with_capacity(10);
let mut checkpoint_parts = vec![];

let log_files = iter.chunk_by(|x| x.version);
for (version, files) in &log_files {
let mut new_checkpoint_parts = vec![];
for file in files {
if file.is_commit() {
commit_files.push(file);
} else if file.is_checkpoint() {
new_checkpoint_parts.push(file);
} else {
warn!(
"Found a file with unknown file type {:?} at version {}",
file.file_type, version
);
}
Some(checkpoint_version) => match path_version.cmp(&checkpoint_version) {
Ordering::Greater => {
max_checkpoint_version = Some(path_version);
checkpoint_parts.clear();
checkpoint_parts.push(parsed_path);
}

// Group and find the first complete checkpoint for this version.
// All checkpoints for the same version are equivalent, so we only take one.
if let Some((_, complete_checkpoint)) = group_checkpoint_parts(new_checkpoint_parts)
.into_iter()
.find(|(num_parts, part_files)| part_files.len() == *num_parts as usize)
{
checkpoint_parts = complete_checkpoint;
commit_files.clear(); // Log replay only uses commits after a complete checkpoint
}
}
(commit_files, checkpoint_parts)
})
}

/// Groups all checkpoint parts according to the size of the checkpoint they belong to.
///
/// NOTE: There could be a single-part and/or any number of uuid-based checkpoints. They
/// are all equivalent, and this routine keeps only one of them (arbitrarily chosen).
fn group_checkpoint_parts(parts: Vec<ParsedLogPath>) -> HashMap<u32, Vec<ParsedLogPath>> {
let mut checkpoints: HashMap<u32, Vec<ParsedLogPath>> = HashMap::new();
for part_file in parts {
use LogPathFileType::*;
match &part_file.file_type {
SinglePartCheckpoint
| UuidCheckpoint(_)
| MultiPartCheckpoint {
part_num: 1,
num_parts: 1,
} => {
// All single-file checkpoints are equivalent, just keep one
checkpoints.insert(1, vec![part_file]);
}
MultiPartCheckpoint {
part_num: 1,
num_parts,
} => {
// Start a new multi-part checkpoint with at least 2 parts
checkpoints.insert(*num_parts, vec![part_file]);
}
MultiPartCheckpoint {
part_num,
num_parts,
} => {
// Continue a new multi-part checkpoint with at least 2 parts
if let Some(part_files) = checkpoints.get_mut(num_parts) {
if *part_num as usize == 1 + part_files.len() {
// Safe to append because all previous parts exist
part_files.push(part_file);
}
Ordering::Equal => checkpoint_parts.push(parsed_path),
Ordering::Less => {}
},
}
}
Commit | CompactedCommit { .. } | Unknown => {}
}
}

Ok((commit_files, checkpoint_parts))
checkpoints
}

/// List all commit and checkpoint files after the provided checkpoint. It is guaranteed that all
Expand Down
119 changes: 114 additions & 5 deletions kernel/src/log_segment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,73 @@ fn build_log_with_paths_and_checkpoint(
(Box::new(client), log_root)
}

#[test]
fn build_snapshot_with_unsupported_uuid_checkpoint() {
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
delta_path_for_version(5, "checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.parquet"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
);

let log_segment = LogSegment::for_snapshot(client.as_ref(), log_root, None, None).unwrap();
let commit_files = log_segment.ascending_commit_files;
let checkpoint_parts = log_segment.checkpoint_parts;

assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 3);

let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = vec![4, 5, 6, 7];
assert_eq!(versions, expected_versions);
}

#[test]
fn build_snapshot_with_multiple_incomplete_multipart_checkpoints() {
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_multipart_checkpoint(1, 1, 3),
// Part 2 of 3 at version 1 is missing!
delta_path_for_multipart_checkpoint(1, 3, 3),
delta_path_for_multipart_checkpoint(2, 1, 2),
// Part 2 of 2 at version 2 is missing!
delta_path_for_version(2, "json"),
delta_path_for_multipart_checkpoint(3, 1, 3),
// Part 2 of 3 at version 3 is missing!
delta_path_for_multipart_checkpoint(3, 3, 3),
delta_path_for_multipart_checkpoint(3, 1, 4),
delta_path_for_multipart_checkpoint(3, 2, 4),
delta_path_for_multipart_checkpoint(3, 3, 4),
delta_path_for_multipart_checkpoint(3, 4, 4),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
);

let log_segment = LogSegment::for_snapshot(client.as_ref(), log_root, None, None).unwrap();
let commit_files = log_segment.ascending_commit_files;
let checkpoint_parts = log_segment.checkpoint_parts;

assert_eq!(checkpoint_parts.len(), 4);
assert_eq!(checkpoint_parts[0].version, 3);

let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = vec![4, 5, 6, 7];
assert_eq!(versions, expected_versions);
}

#[test]
fn build_snapshot_with_out_of_date_last_checkpoint() {
let checkpoint_metadata = CheckpointMetadata {
Expand Down Expand Up @@ -209,7 +276,7 @@ fn build_snapshot_with_missing_checkpoint_part_from_hint_fails() {
delta_path_for_version(3, "json"),
delta_path_for_version(4, "json"),
delta_path_for_multipart_checkpoint(5, 1, 3),
// Part 2 is missing!
// Part 2 of 3 at version 5 is missing!
delta_path_for_multipart_checkpoint(5, 3, 3),
delta_path_for_version(5, "json"),
delta_path_for_version(6, "json"),
Expand Down Expand Up @@ -257,11 +324,8 @@ fn build_snapshot_with_bad_checkpoint_hint_fails() {
assert!(log_segment.is_err())
}

#[ignore]
#[test]
fn build_snapshot_with_missing_checkpoint_part_no_hint() {
// TODO: Handle checkpoints correctly so that this test passes: https://github.com/delta-io/delta-kernel-rs/issues/497

// Part 2 of 3 is missing from checkpoint 5. The Snapshot should be made of checkpoint
// number 3 and commit files 4 to 7.
let (client, log_root) = build_log_with_paths_and_checkpoint(
Expand All @@ -274,7 +338,7 @@ fn build_snapshot_with_missing_checkpoint_part_no_hint() {
delta_path_for_version(3, "json"),
delta_path_for_version(4, "json"),
delta_path_for_multipart_checkpoint(5, 1, 3),
// Part 2 is missing!
// Part 2 of 3 at version 5 is missing!
delta_path_for_multipart_checkpoint(5, 3, 3),
delta_path_for_version(5, "json"),
delta_path_for_version(6, "json"),
Expand All @@ -296,6 +360,51 @@ fn build_snapshot_with_missing_checkpoint_part_no_hint() {
assert_eq!(versions, expected_versions);
}

#[test]
fn build_snapshot_with_out_of_date_last_checkpoint_and_incomplete_recent_checkpoint() {
// When the _last_checkpoint is out of date and the most recent checkpoint is incomplete, the
// Snapshot should be made of the most recent complete checkpoint and the commit files that
// follow it.
let checkpoint_metadata = CheckpointMetadata {
version: 3,
size: 10,
parts: None,
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
};

let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_multipart_checkpoint(5, 1, 3),
// Part 2 of 3 at version 5 is missing!
delta_path_for_multipart_checkpoint(5, 3, 3),
delta_path_for_version(5, "json"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
Some(&checkpoint_metadata),
);

let log_segment =
LogSegment::for_snapshot(client.as_ref(), log_root, checkpoint_metadata, None).unwrap();
let commit_files = log_segment.ascending_commit_files;
let checkpoint_parts = log_segment.checkpoint_parts;

assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 3);

let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = vec![4, 5, 6, 7];
assert_eq!(versions, expected_versions);
}

#[test]
fn build_snapshot_without_checkpoints() {
let (client, log_root) = build_log_with_paths_and_checkpoint(
Expand Down
Loading