diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 24d78a986..f7c7de3dd 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -2,15 +2,15 @@ //! files. use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME}; -use crate::path::ParsedLogPath; +use crate::path::{LogPathFileType, ParsedLogPath}; use crate::schema::SchemaRef; use crate::snapshot::CheckpointMetadata; use crate::utils::require; use crate::{ DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileSystemClient, Version, }; -use itertools::Itertools; -use std::cmp::Ordering; +use itertools::{process_results, Itertools}; +use std::collections::HashMap; use std::convert::identity; use std::sync::{Arc, LazyLock}; use tracing::warn; @@ -309,35 +309,84 @@ fn list_log_files_with_version( ) -> DeltaResult<(Vec, Vec)> { // We expect 10 commit files per checkpoint, so start with that size. We could adjust this based // on config at some point - let mut commit_files = Vec::with_capacity(10); - let mut checkpoint_parts = vec![]; - let mut max_checkpoint_version = start_version; - for parsed_path in list_log_files(fs_client, log_root, start_version, end_version)? { - let parsed_path = parsed_path?; - if parsed_path.is_commit() { - commit_files.push(parsed_path); - } else if parsed_path.is_checkpoint() { - let path_version = parsed_path.version; - match max_checkpoint_version { - None => { - checkpoint_parts.push(parsed_path); - max_checkpoint_version = Some(path_version); + let log_files = list_log_files(fs_client, log_root, start_version, end_version)?; + + process_results(log_files, |iter| { + let mut commit_files = Vec::with_capacity(10); + let mut checkpoint_parts = vec![]; + + let log_files = iter.chunk_by(|x| x.version); + for (version, files) in &log_files { + let mut new_checkpoint_parts = vec![]; + for file in files { + if file.is_commit() { + commit_files.push(file); + } else if file.is_checkpoint() { + new_checkpoint_parts.push(file); + } else { + warn!( + "Found a file with unknown file type {:?} at version {}", + file.file_type, version + ); } - Some(checkpoint_version) => match path_version.cmp(&checkpoint_version) { - Ordering::Greater => { - max_checkpoint_version = Some(path_version); - checkpoint_parts.clear(); - checkpoint_parts.push(parsed_path); + } + + // Group and find the first complete checkpoint for this version. + // All checkpoints for the same version are equivalent, so we only take one. + if let Some((_, complete_checkpoint)) = group_checkpoint_parts(new_checkpoint_parts) + .into_iter() + .find(|(num_parts, part_files)| part_files.len() == *num_parts as usize) + { + checkpoint_parts = complete_checkpoint; + commit_files.clear(); // Log replay only uses commits after a complete checkpoint + } + } + (commit_files, checkpoint_parts) + }) +} + +/// Groups all checkpoint parts according to the size of the checkpoint they belong to. +/// +/// NOTE: There could be a single-part and/or any number of uuid-based checkpoints. They +/// are all equivalent, and this routine keeps only one of them (arbitrarily chosen). +fn group_checkpoint_parts(parts: Vec) -> HashMap> { + let mut checkpoints: HashMap> = HashMap::new(); + for part_file in parts { + use LogPathFileType::*; + match &part_file.file_type { + SinglePartCheckpoint + | UuidCheckpoint(_) + | MultiPartCheckpoint { + part_num: 1, + num_parts: 1, + } => { + // All single-file checkpoints are equivalent, just keep one + checkpoints.insert(1, vec![part_file]); + } + MultiPartCheckpoint { + part_num: 1, + num_parts, + } => { + // Start a new multi-part checkpoint with at least 2 parts + checkpoints.insert(*num_parts, vec![part_file]); + } + MultiPartCheckpoint { + part_num, + num_parts, + } => { + // Continue a new multi-part checkpoint with at least 2 parts + if let Some(part_files) = checkpoints.get_mut(num_parts) { + if *part_num as usize == 1 + part_files.len() { + // Safe to append because all previous parts exist + part_files.push(part_file); } - Ordering::Equal => checkpoint_parts.push(parsed_path), - Ordering::Less => {} - }, + } } + Commit | CompactedCommit { .. } | Unknown => {} } } - - Ok((commit_files, checkpoint_parts)) + checkpoints } /// List all commit and checkpoint files after the provided checkpoint. It is guaranteed that all diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index ed029b006..e1c441d7e 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -107,6 +107,73 @@ fn build_log_with_paths_and_checkpoint( (Box::new(client), log_root) } +#[test] +fn build_snapshot_with_unsupported_uuid_checkpoint() { + let (client, log_root) = build_log_with_paths_and_checkpoint( + &[ + delta_path_for_version(0, "json"), + delta_path_for_version(1, "checkpoint.parquet"), + delta_path_for_version(2, "json"), + delta_path_for_version(3, "checkpoint.parquet"), + delta_path_for_version(4, "json"), + delta_path_for_version(5, "json"), + delta_path_for_version(5, "checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.parquet"), + delta_path_for_version(6, "json"), + delta_path_for_version(7, "json"), + ], + None, + ); + + let log_segment = LogSegment::for_snapshot(client.as_ref(), log_root, None, None).unwrap(); + let commit_files = log_segment.ascending_commit_files; + let checkpoint_parts = log_segment.checkpoint_parts; + + assert_eq!(checkpoint_parts.len(), 1); + assert_eq!(checkpoint_parts[0].version, 3); + + let versions = commit_files.into_iter().map(|x| x.version).collect_vec(); + let expected_versions = vec![4, 5, 6, 7]; + assert_eq!(versions, expected_versions); +} + +#[test] +fn build_snapshot_with_multiple_incomplete_multipart_checkpoints() { + let (client, log_root) = build_log_with_paths_and_checkpoint( + &[ + delta_path_for_version(0, "json"), + delta_path_for_multipart_checkpoint(1, 1, 3), + // Part 2 of 3 at version 1 is missing! + delta_path_for_multipart_checkpoint(1, 3, 3), + delta_path_for_multipart_checkpoint(2, 1, 2), + // Part 2 of 2 at version 2 is missing! + delta_path_for_version(2, "json"), + delta_path_for_multipart_checkpoint(3, 1, 3), + // Part 2 of 3 at version 3 is missing! + delta_path_for_multipart_checkpoint(3, 3, 3), + delta_path_for_multipart_checkpoint(3, 1, 4), + delta_path_for_multipart_checkpoint(3, 2, 4), + delta_path_for_multipart_checkpoint(3, 3, 4), + delta_path_for_multipart_checkpoint(3, 4, 4), + delta_path_for_version(4, "json"), + delta_path_for_version(5, "json"), + delta_path_for_version(6, "json"), + delta_path_for_version(7, "json"), + ], + None, + ); + + let log_segment = LogSegment::for_snapshot(client.as_ref(), log_root, None, None).unwrap(); + let commit_files = log_segment.ascending_commit_files; + let checkpoint_parts = log_segment.checkpoint_parts; + + assert_eq!(checkpoint_parts.len(), 4); + assert_eq!(checkpoint_parts[0].version, 3); + + let versions = commit_files.into_iter().map(|x| x.version).collect_vec(); + let expected_versions = vec![4, 5, 6, 7]; + assert_eq!(versions, expected_versions); +} + #[test] fn build_snapshot_with_out_of_date_last_checkpoint() { let checkpoint_metadata = CheckpointMetadata { @@ -209,7 +276,7 @@ fn build_snapshot_with_missing_checkpoint_part_from_hint_fails() { delta_path_for_version(3, "json"), delta_path_for_version(4, "json"), delta_path_for_multipart_checkpoint(5, 1, 3), - // Part 2 is missing! + // Part 2 of 3 at version 5 is missing! delta_path_for_multipart_checkpoint(5, 3, 3), delta_path_for_version(5, "json"), delta_path_for_version(6, "json"), @@ -257,11 +324,8 @@ fn build_snapshot_with_bad_checkpoint_hint_fails() { assert!(log_segment.is_err()) } -#[ignore] #[test] fn build_snapshot_with_missing_checkpoint_part_no_hint() { - // TODO: Handle checkpoints correctly so that this test passes: https://github.com/delta-io/delta-kernel-rs/issues/497 - // Part 2 of 3 is missing from checkpoint 5. The Snapshot should be made of checkpoint // number 3 and commit files 4 to 7. let (client, log_root) = build_log_with_paths_and_checkpoint( @@ -274,7 +338,7 @@ fn build_snapshot_with_missing_checkpoint_part_no_hint() { delta_path_for_version(3, "json"), delta_path_for_version(4, "json"), delta_path_for_multipart_checkpoint(5, 1, 3), - // Part 2 is missing! + // Part 2 of 3 at version 5 is missing! delta_path_for_multipart_checkpoint(5, 3, 3), delta_path_for_version(5, "json"), delta_path_for_version(6, "json"), @@ -296,6 +360,51 @@ fn build_snapshot_with_missing_checkpoint_part_no_hint() { assert_eq!(versions, expected_versions); } +#[test] +fn build_snapshot_with_out_of_date_last_checkpoint_and_incomplete_recent_checkpoint() { + // When the _last_checkpoint is out of date and the most recent checkpoint is incomplete, the + // Snapshot should be made of the most recent complete checkpoint and the commit files that + // follow it. + let checkpoint_metadata = CheckpointMetadata { + version: 3, + size: 10, + parts: None, + size_in_bytes: None, + num_of_add_files: None, + checkpoint_schema: None, + checksum: None, + }; + + let (client, log_root) = build_log_with_paths_and_checkpoint( + &[ + delta_path_for_version(0, "json"), + delta_path_for_version(1, "checkpoint.parquet"), + delta_path_for_version(2, "json"), + delta_path_for_version(3, "checkpoint.parquet"), + delta_path_for_version(4, "json"), + delta_path_for_multipart_checkpoint(5, 1, 3), + // Part 2 of 3 at version 5 is missing! + delta_path_for_multipart_checkpoint(5, 3, 3), + delta_path_for_version(5, "json"), + delta_path_for_version(6, "json"), + delta_path_for_version(7, "json"), + ], + Some(&checkpoint_metadata), + ); + + let log_segment = + LogSegment::for_snapshot(client.as_ref(), log_root, checkpoint_metadata, None).unwrap(); + let commit_files = log_segment.ascending_commit_files; + let checkpoint_parts = log_segment.checkpoint_parts; + + assert_eq!(checkpoint_parts.len(), 1); + assert_eq!(checkpoint_parts[0].version, 3); + + let versions = commit_files.into_iter().map(|x| x.version).collect_vec(); + let expected_versions = vec![4, 5, 6, 7]; + assert_eq!(versions, expected_versions); +} + #[test] fn build_snapshot_without_checkpoints() { let (client, log_root) = build_log_with_paths_and_checkpoint(