From 9a228edf9999edf2624532db28a3224da7aae2f2 Mon Sep 17 00:00:00 2001 From: sebastian tia Date: Mon, 13 Jan 2025 13:23:12 -0800 Subject: [PATCH 01/10] group logs by version & validate checkpoint parts --- kernel/src/log_segment.rs | 83 +++++++++---- .../log_segment/delta_log_group_iterator.rs | 82 +++++++++++++ kernel/src/log_segment/tests.rs | 113 +++++++++++++++++- 3 files changed, 251 insertions(+), 27 deletions(-) create mode 100644 kernel/src/log_segment/delta_log_group_iterator.rs diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 24d78a986..54c45e268 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -2,7 +2,8 @@ //! files. use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME}; -use crate::path::ParsedLogPath; +use crate::log_segment::delta_log_group_iterator::DeltaLogGroupingIterator; +use crate::path::{LogPathFileType, ParsedLogPath}; use crate::schema::SchemaRef; use crate::snapshot::CheckpointMetadata; use crate::utils::require; @@ -10,12 +11,12 @@ use crate::{ DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileSystemClient, Version, }; use itertools::Itertools; -use std::cmp::Ordering; use std::convert::identity; use std::sync::{Arc, LazyLock}; use tracing::warn; use url::Url; +mod delta_log_group_iterator; #[cfg(test)] mod tests; @@ -313,27 +314,27 @@ fn list_log_files_with_version( let mut checkpoint_parts = vec![]; let mut max_checkpoint_version = start_version; - for parsed_path in list_log_files(fs_client, log_root, start_version, end_version)? { - let parsed_path = parsed_path?; - if parsed_path.is_commit() { - commit_files.push(parsed_path); - } else if parsed_path.is_checkpoint() { - let path_version = parsed_path.version; - match max_checkpoint_version { - None => { - checkpoint_parts.push(parsed_path); - max_checkpoint_version = Some(path_version); - } - Some(checkpoint_version) => match path_version.cmp(&checkpoint_version) { - Ordering::Greater => { - max_checkpoint_version = Some(path_version); - checkpoint_parts.clear(); - checkpoint_parts.push(parsed_path); - } - Ordering::Equal => checkpoint_parts.push(parsed_path), - Ordering::Less => {} - }, - } + let log_iterator = DeltaLogGroupingIterator::new(list_log_files( + fs_client, + log_root, + start_version, + end_version, + )?); + + for (version, files) in log_iterator { + let mut new_checkpoint_parts = Vec::new(); + + files.into_iter().for_each(|file| match file { + f if f.is_commit() => commit_files.push(f), + f if f.is_checkpoint() => new_checkpoint_parts.push(f), + _ => {} + }); + + if validate_checkpoint_parts(version, &new_checkpoint_parts) + && (max_checkpoint_version.is_none() || Some(version) >= max_checkpoint_version) + { + max_checkpoint_version = Some(version); + checkpoint_parts = new_checkpoint_parts; } } @@ -377,3 +378,39 @@ fn list_log_files_with_checkpoint( } Ok((commit_files, checkpoint_parts)) } + +/// Validates that all the checkpoint parts belong to the same checkpoint version and that all parts +/// are present. Returns `true` if we have a complete checkpoint, `false` otherwise. +fn validate_checkpoint_parts(version: u64, checkpoint_parts: &Vec) -> bool { + if checkpoint_parts.is_empty() { + return false; + } + + match checkpoint_parts.last().map(|file| &file.file_type) { + Some(LogPathFileType::MultiPartCheckpoint { num_parts, .. }) => { + if *num_parts as usize != checkpoint_parts.len() { + warn!( + "Found a multi-part checkpoint at version {}. Found {} parts, expected {}", + version, + checkpoint_parts.len(), + num_parts + ); + return false; + } + } + Some(LogPathFileType::SinglePartCheckpoint) => { + if checkpoint_parts.len() != 1 { + warn!( + "Found a single-part checkpoint at version {}. Found {} parts", + version, + checkpoint_parts.len() + ); + return false; + } + } + // TODO: Include UuidCheckpoint once we actually support v2 checkpoints + _ => {} + } + + true +} diff --git a/kernel/src/log_segment/delta_log_group_iterator.rs b/kernel/src/log_segment/delta_log_group_iterator.rs new file mode 100644 index 000000000..05c786978 --- /dev/null +++ b/kernel/src/log_segment/delta_log_group_iterator.rs @@ -0,0 +1,82 @@ +use tracing::warn; + +use crate::log_segment::ParsedLogPath; +use crate::DeltaResult; +use crate::Version; +use std::iter::Peekable; + +/** + * An iterator that groups [`ParsedLogPath`]s by version. + * For example for an input iterator: + * - 11.checkpoint.0.1.parquet + * - 11.checkpoint.1.1.parquet + * - 11.json + * - 12.checkpoint.parquet + * - 12.json + * - 13.json + * - 14.json + * - 15.checkpoint.0.1.parquet + * - 15.checkpoint.1.1.parquet + * - 15.checkpoint..parquet + * - 15.json + * This will return: + * - (11, Vec[11.checkpoint.0.1.parquet, 11.checkpoint.1.1.parquet, 11.json]) + * - (12, Vec[12.checkpoint.parquet, 12.json]) + * - (13, Vec[13.json]) + * - (14, Vec[14.json]) + * - (15, Vec[15.checkpoint.0.1.parquet, 15.checkpoint.1.1.parquet, 15.checkpoint..parquet, + * 15.json]) + */ +pub(super) struct DeltaLogGroupingIterator +where + I: Iterator>, +{ + iter: Peekable, +} + +impl DeltaLogGroupingIterator +where + I: Iterator>, +{ + pub(crate) fn new(iter: I) -> Self { + Self { + iter: iter.peekable(), + } + } +} + +impl Iterator for DeltaLogGroupingIterator +where + I: Iterator>, +{ + type Item = (Version, Vec); + + fn next(&mut self) -> Option { + let mut paths = Vec::new(); + let mut version = None; + + while let Some(next) = self.iter.peek() { + match next { + Ok(next_path) => { + if let Some(v) = version { + if next_path.version != v { + break; + } + } else { + version = Some(next_path.version); + } + + if let Ok(path) = self.iter.next().unwrap() { + paths.push(path); + } + } + Err(e) => { + warn!("Error processing path: {:?}", e); + self.iter.next(); // Skip the error + } + } + } + + version.map(|v| (v, paths)) + } +} diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index ed029b006..c68b848ff 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -7,7 +7,7 @@ use url::Url; use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::filesystem::ObjectStoreFileSystemClient; use crate::engine::sync::SyncEngine; -use crate::log_segment::LogSegment; +use crate::log_segment::{list_log_files, DeltaLogGroupingIterator, LogSegment}; use crate::snapshot::CheckpointMetadata; use crate::{FileSystemClient, Table}; use test_utils::delta_path_for_version; @@ -107,6 +107,70 @@ fn build_log_with_paths_and_checkpoint( (Box::new(client), log_root) } +#[test] +fn test_delta_log_group_iterator() { + // Test that the DeltaLogGroupingIterator groups log files by version correctly + let (client, log_root) = build_log_with_paths_and_checkpoint( + &[ + delta_path_for_multipart_checkpoint(1, 1, 3), + delta_path_for_version(1, "json"), + delta_path_for_multipart_checkpoint(1, 2, 3), + delta_path_for_version(2, "checkpoint.parquet"), + delta_path_for_version(2, "json"), + delta_path_for_multipart_checkpoint(3, 1, 3), + delta_path_for_multipart_checkpoint(3, 2, 3), + delta_path_for_version(4, "json"), + ], + None, + ); + + let log_files: Vec<_> = list_log_files(client.as_ref(), &log_root, None, None) + .unwrap() + .collect(); + + let mut iterator = DeltaLogGroupingIterator::new(log_files.into_iter()); + + if let Some((version, files)) = iterator.next() { + assert_eq!(version, 1, "Expected version 1 but got {}", version); + assert_eq!(files.len(), 3); + assert!(files.iter().all(|file| file.version == 1)); + } else { + panic!("Expected group for version 1, but none was found"); + } + + if let Some((version, files)) = iterator.next() { + assert_eq!(version, 2, "Expected version 2 but got {}", version); + assert_eq!(files.len(), 2); + assert!(files.iter().all(|file| file.version == 2)); + } else { + panic!("Expected group for version 2, but none was found"); + } + + if let Some((version, files)) = iterator.next() { + assert_eq!(version, 3, "Expected version 3 but got {}", version); + assert_eq!(files.len(), 2); + assert!(files.iter().all(|file| file.version == 3)); + } else { + panic!("Expected group for version 3, but none was found"); + } + + if let Some((version, files)) = iterator.next() { + assert_eq!(version, 4, "Expected version 4 but got {}", version); + assert_eq!(files.len(), 1); + assert!(files.iter().all(|file| file.version == 4)); + } else { + panic!("Expected group for version 4, but none was found"); + } + + // Verify that there are no more groups after version 4 + if let Some((version, _)) = iterator.next() { + panic!( + "Expected no more groups, but found group for version {}", + version + ); + } +} + #[test] fn build_snapshot_with_out_of_date_last_checkpoint() { let checkpoint_metadata = CheckpointMetadata { @@ -257,11 +321,8 @@ fn build_snapshot_with_bad_checkpoint_hint_fails() { assert!(log_segment.is_err()) } -#[ignore] #[test] fn build_snapshot_with_missing_checkpoint_part_no_hint() { - // TODO: Handle checkpoints correctly so that this test passes: https://github.com/delta-io/delta-kernel-rs/issues/497 - // Part 2 of 3 is missing from checkpoint 5. The Snapshot should be made of checkpoint // number 3 and commit files 4 to 7. let (client, log_root) = build_log_with_paths_and_checkpoint( @@ -296,6 +357,50 @@ fn build_snapshot_with_missing_checkpoint_part_no_hint() { assert_eq!(versions, expected_versions); } +#[test] +fn build_snapshot_with_out_of_date_last_checkpoint_and_incomplete_recent_checkpoint() { + // When the _last_checkpoint is out of date and the most recent checkpoint is incomplete, the + // Snapshot should be made of the most recent complete checkpoint and the commit files that + // follow it. + let checkpoint_metadata = CheckpointMetadata { + version: 3, + size: 10, + parts: None, + size_in_bytes: None, + num_of_add_files: None, + checkpoint_schema: None, + checksum: None, + }; + + let (client, log_root) = build_log_with_paths_and_checkpoint( + &[ + delta_path_for_version(0, "json"), + delta_path_for_version(1, "checkpoint.parquet"), + delta_path_for_version(2, "json"), + delta_path_for_version(3, "checkpoint.parquet"), + delta_path_for_version(4, "json"), + delta_path_for_multipart_checkpoint(5, 1, 3), + // Part 2 is missing! + delta_path_for_multipart_checkpoint(5, 3, 3), + delta_path_for_version(5, "json"), + delta_path_for_version(6, "json"), + delta_path_for_version(7, "json"), + ], + Some(&checkpoint_metadata), + ); + + let log_segment = + LogSegment::for_snapshot(client.as_ref(), log_root, checkpoint_metadata, None).unwrap(); + let commit_files = log_segment.ascending_commit_files; + let checkpoint_parts = log_segment.checkpoint_parts; + + assert_eq!(checkpoint_parts.len(), 1); + + let versions = commit_files.into_iter().map(|x| x.version).collect_vec(); + let expected_versions = vec![4, 5, 6, 7]; + assert_eq!(versions, expected_versions); +} + #[test] fn build_snapshot_without_checkpoints() { let (client, log_root) = build_log_with_paths_and_checkpoint( From ad85ff7bf1ab219a7e93cb1e2c3938d0ffdb0e6e Mon Sep 17 00:00:00 2001 From: sebastian tia Date: Mon, 13 Jan 2025 13:29:24 -0800 Subject: [PATCH 02/10] include mod description --- kernel/src/log_segment/delta_log_group_iterator.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kernel/src/log_segment/delta_log_group_iterator.rs b/kernel/src/log_segment/delta_log_group_iterator.rs index 05c786978..d2061f329 100644 --- a/kernel/src/log_segment/delta_log_group_iterator.rs +++ b/kernel/src/log_segment/delta_log_group_iterator.rs @@ -1,3 +1,5 @@ +//! An iterator that groups [`ParsedLogPath`]s by version. + use tracing::warn; use crate::log_segment::ParsedLogPath; @@ -6,7 +8,8 @@ use crate::Version; use std::iter::Peekable; /** - * An iterator that groups [`ParsedLogPath`]s by version. + * The [`DeltaLogGroupingIterator`] is a utility iterator that groups log paths. + * It takes an iterator of [`ParsedLogPath`]s and groups them by version. * For example for an input iterator: * - 11.checkpoint.0.1.parquet * - 11.checkpoint.1.1.parquet From 059c2c38e6bda45c94215c52ef3b39002315698a Mon Sep 17 00:00:00 2001 From: sebastian tia Date: Mon, 13 Jan 2025 15:14:03 -0800 Subject: [PATCH 03/10] use chunk_by instead of new iterator --- kernel/src/log_segment.rs | 35 ++++---- .../log_segment/delta_log_group_iterator.rs | 85 ------------------- kernel/src/log_segment/tests.rs | 66 +------------- 3 files changed, 20 insertions(+), 166 deletions(-) delete mode 100644 kernel/src/log_segment/delta_log_group_iterator.rs diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 54c45e268..4d27c6a38 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -2,7 +2,6 @@ //! files. use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME}; -use crate::log_segment::delta_log_group_iterator::DeltaLogGroupingIterator; use crate::path::{LogPathFileType, ParsedLogPath}; use crate::schema::SchemaRef; use crate::snapshot::CheckpointMetadata; @@ -16,7 +15,6 @@ use std::sync::{Arc, LazyLock}; use tracing::warn; use url::Url; -mod delta_log_group_iterator; #[cfg(test)] mod tests; @@ -314,22 +312,27 @@ fn list_log_files_with_version( let mut checkpoint_parts = vec![]; let mut max_checkpoint_version = start_version; - let log_iterator = DeltaLogGroupingIterator::new(list_log_files( - fs_client, - log_root, - start_version, - end_version, - )?); + let log_files = list_log_files(fs_client, log_root, start_version, end_version)?; - for (version, files) in log_iterator { - let mut new_checkpoint_parts = Vec::new(); - - files.into_iter().for_each(|file| match file { - f if f.is_commit() => commit_files.push(f), - f if f.is_checkpoint() => new_checkpoint_parts.push(f), - _ => {} - }); + for (version, files) in &log_files + .filter_map(|res| match res { + Ok(path) => Some(path), + Err(e) => { + warn!("Error processing path: {:?}", e); + None + } + }) + .chunk_by(|path| path.version) + { + let mut new_checkpoint_parts = vec![]; + for file in files { + if file.is_commit() { + commit_files.push(file); + } else if file.is_checkpoint() { + new_checkpoint_parts.push(file); + } + } if validate_checkpoint_parts(version, &new_checkpoint_parts) && (max_checkpoint_version.is_none() || Some(version) >= max_checkpoint_version) { diff --git a/kernel/src/log_segment/delta_log_group_iterator.rs b/kernel/src/log_segment/delta_log_group_iterator.rs deleted file mode 100644 index d2061f329..000000000 --- a/kernel/src/log_segment/delta_log_group_iterator.rs +++ /dev/null @@ -1,85 +0,0 @@ -//! An iterator that groups [`ParsedLogPath`]s by version. - -use tracing::warn; - -use crate::log_segment::ParsedLogPath; -use crate::DeltaResult; -use crate::Version; -use std::iter::Peekable; - -/** - * The [`DeltaLogGroupingIterator`] is a utility iterator that groups log paths. - * It takes an iterator of [`ParsedLogPath`]s and groups them by version. - * For example for an input iterator: - * - 11.checkpoint.0.1.parquet - * - 11.checkpoint.1.1.parquet - * - 11.json - * - 12.checkpoint.parquet - * - 12.json - * - 13.json - * - 14.json - * - 15.checkpoint.0.1.parquet - * - 15.checkpoint.1.1.parquet - * - 15.checkpoint..parquet - * - 15.json - * This will return: - * - (11, Vec[11.checkpoint.0.1.parquet, 11.checkpoint.1.1.parquet, 11.json]) - * - (12, Vec[12.checkpoint.parquet, 12.json]) - * - (13, Vec[13.json]) - * - (14, Vec[14.json]) - * - (15, Vec[15.checkpoint.0.1.parquet, 15.checkpoint.1.1.parquet, 15.checkpoint..parquet, - * 15.json]) - */ -pub(super) struct DeltaLogGroupingIterator -where - I: Iterator>, -{ - iter: Peekable, -} - -impl DeltaLogGroupingIterator -where - I: Iterator>, -{ - pub(crate) fn new(iter: I) -> Self { - Self { - iter: iter.peekable(), - } - } -} - -impl Iterator for DeltaLogGroupingIterator -where - I: Iterator>, -{ - type Item = (Version, Vec); - - fn next(&mut self) -> Option { - let mut paths = Vec::new(); - let mut version = None; - - while let Some(next) = self.iter.peek() { - match next { - Ok(next_path) => { - if let Some(v) = version { - if next_path.version != v { - break; - } - } else { - version = Some(next_path.version); - } - - if let Ok(path) = self.iter.next().unwrap() { - paths.push(path); - } - } - Err(e) => { - warn!("Error processing path: {:?}", e); - self.iter.next(); // Skip the error - } - } - } - - version.map(|v| (v, paths)) - } -} diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index c68b848ff..3cbbae736 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -7,7 +7,7 @@ use url::Url; use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::filesystem::ObjectStoreFileSystemClient; use crate::engine::sync::SyncEngine; -use crate::log_segment::{list_log_files, DeltaLogGroupingIterator, LogSegment}; +use crate::log_segment::LogSegment; use crate::snapshot::CheckpointMetadata; use crate::{FileSystemClient, Table}; use test_utils::delta_path_for_version; @@ -107,70 +107,6 @@ fn build_log_with_paths_and_checkpoint( (Box::new(client), log_root) } -#[test] -fn test_delta_log_group_iterator() { - // Test that the DeltaLogGroupingIterator groups log files by version correctly - let (client, log_root) = build_log_with_paths_and_checkpoint( - &[ - delta_path_for_multipart_checkpoint(1, 1, 3), - delta_path_for_version(1, "json"), - delta_path_for_multipart_checkpoint(1, 2, 3), - delta_path_for_version(2, "checkpoint.parquet"), - delta_path_for_version(2, "json"), - delta_path_for_multipart_checkpoint(3, 1, 3), - delta_path_for_multipart_checkpoint(3, 2, 3), - delta_path_for_version(4, "json"), - ], - None, - ); - - let log_files: Vec<_> = list_log_files(client.as_ref(), &log_root, None, None) - .unwrap() - .collect(); - - let mut iterator = DeltaLogGroupingIterator::new(log_files.into_iter()); - - if let Some((version, files)) = iterator.next() { - assert_eq!(version, 1, "Expected version 1 but got {}", version); - assert_eq!(files.len(), 3); - assert!(files.iter().all(|file| file.version == 1)); - } else { - panic!("Expected group for version 1, but none was found"); - } - - if let Some((version, files)) = iterator.next() { - assert_eq!(version, 2, "Expected version 2 but got {}", version); - assert_eq!(files.len(), 2); - assert!(files.iter().all(|file| file.version == 2)); - } else { - panic!("Expected group for version 2, but none was found"); - } - - if let Some((version, files)) = iterator.next() { - assert_eq!(version, 3, "Expected version 3 but got {}", version); - assert_eq!(files.len(), 2); - assert!(files.iter().all(|file| file.version == 3)); - } else { - panic!("Expected group for version 3, but none was found"); - } - - if let Some((version, files)) = iterator.next() { - assert_eq!(version, 4, "Expected version 4 but got {}", version); - assert_eq!(files.len(), 1); - assert!(files.iter().all(|file| file.version == 4)); - } else { - panic!("Expected group for version 4, but none was found"); - } - - // Verify that there are no more groups after version 4 - if let Some((version, _)) = iterator.next() { - panic!( - "Expected no more groups, but found group for version {}", - version - ); - } -} - #[test] fn build_snapshot_with_out_of_date_last_checkpoint() { let checkpoint_metadata = CheckpointMetadata { From 39f34d350684ecc039e49cb6d944836db5b7f518 Mon Sep 17 00:00:00 2001 From: sebastian tia Date: Mon, 13 Jan 2025 15:30:30 -0800 Subject: [PATCH 04/10] clippy --- kernel/src/log_segment.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 4d27c6a38..967707bdb 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -384,7 +384,7 @@ fn list_log_files_with_checkpoint( /// Validates that all the checkpoint parts belong to the same checkpoint version and that all parts /// are present. Returns `true` if we have a complete checkpoint, `false` otherwise. -fn validate_checkpoint_parts(version: u64, checkpoint_parts: &Vec) -> bool { +fn validate_checkpoint_parts(version: u64, checkpoint_parts: &[ParsedLogPath]) -> bool { if checkpoint_parts.is_empty() { return false; } From 9dd336738f58a1fe02764d0e5a45941d33d67782 Mon Sep 17 00:00:00 2001 From: sebastian tia Date: Tue, 14 Jan 2025 13:07:23 -0800 Subject: [PATCH 05/10] propogate error and add test --- kernel/src/log_segment.rs | 68 ++++++++++++++++++--------------- kernel/src/log_segment/tests.rs | 30 +++++++++++++++ 2 files changed, 67 insertions(+), 31 deletions(-) diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 967707bdb..b6d1c5f2f 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -9,7 +9,7 @@ use crate::utils::require; use crate::{ DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileSystemClient, Version, }; -use itertools::Itertools; +use itertools::{process_results, Itertools}; use std::convert::identity; use std::sync::{Arc, LazyLock}; use tracing::warn; @@ -310,36 +310,25 @@ fn list_log_files_with_version( // on config at some point let mut commit_files = Vec::with_capacity(10); let mut checkpoint_parts = vec![]; - let mut max_checkpoint_version = start_version; let log_files = list_log_files(fs_client, log_root, start_version, end_version)?; - for (version, files) in &log_files - .filter_map(|res| match res { - Ok(path) => Some(path), - Err(e) => { - warn!("Error processing path: {:?}", e); - None + process_results(log_files, |iter| { + let log_files = iter.chunk_by(move |x| x.version); + for (version, files) in &log_files { + let mut new_checkpoint_parts = vec![]; + for file in files { + if file.is_commit() { + commit_files.push(file); + } else if file.is_checkpoint() { + new_checkpoint_parts.push(file); + } } - }) - .chunk_by(|path| path.version) - { - let mut new_checkpoint_parts = vec![]; - - for file in files { - if file.is_commit() { - commit_files.push(file); - } else if file.is_checkpoint() { - new_checkpoint_parts.push(file); + if validate_checkpoint_parts(version, &new_checkpoint_parts) { + checkpoint_parts = new_checkpoint_parts; } } - if validate_checkpoint_parts(version, &new_checkpoint_parts) - && (max_checkpoint_version.is_none() || Some(version) >= max_checkpoint_version) - { - max_checkpoint_version = Some(version); - checkpoint_parts = new_checkpoint_parts; - } - } + })?; Ok((commit_files, checkpoint_parts)) } @@ -385,10 +374,6 @@ fn list_log_files_with_checkpoint( /// Validates that all the checkpoint parts belong to the same checkpoint version and that all parts /// are present. Returns `true` if we have a complete checkpoint, `false` otherwise. fn validate_checkpoint_parts(version: u64, checkpoint_parts: &[ParsedLogPath]) -> bool { - if checkpoint_parts.is_empty() { - return false; - } - match checkpoint_parts.last().map(|file| &file.file_type) { Some(LogPathFileType::MultiPartCheckpoint { num_parts, .. }) => { if *num_parts as usize != checkpoint_parts.len() { @@ -411,8 +396,29 @@ fn validate_checkpoint_parts(version: u64, checkpoint_parts: &[ParsedLogPath]) - return false; } } - // TODO: Include UuidCheckpoint once we actually support v2 checkpoints - _ => {} + Some(LogPathFileType::UuidCheckpoint(_)) => { + warn!( + "Found a UUID checkpoint at version {} when it is not supported", + version + ); + return false; + } + Some(LogPathFileType::Commit) | Some(LogPathFileType::CompactedCommit { .. }) => { + warn!( + "Found a commit file at version {} when expecting a checkpoint", + version + ); + return false; + } + Some(LogPathFileType::Unknown) => { + warn!( + "Found an unknown file type at version {} when expecting a checkpoint", + version + ); + return false; + } + // No checkpoint parts + None => return false, } true diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index 3cbbae736..1ec753f34 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -107,6 +107,35 @@ fn build_log_with_paths_and_checkpoint( (Box::new(client), log_root) } +#[test] +fn build_snapshot_with_unsupported_uuid_checkpoint() { + let (client, log_root) = build_log_with_paths_and_checkpoint( + &[ + delta_path_for_version(0, "json"), + delta_path_for_version(1, "checkpoint.parquet"), + delta_path_for_version(2, "json"), + delta_path_for_version(3, "checkpoint.parquet"), + delta_path_for_version(4, "json"), + delta_path_for_version(5, "json"), + delta_path_for_version(5, "checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.parquet"), + delta_path_for_version(6, "json"), + delta_path_for_version(7, "json"), + ], + None, + ); + + let log_segment = LogSegment::for_snapshot(client.as_ref(), log_root, None, None).unwrap(); + let commit_files = log_segment.ascending_commit_files; + let checkpoint_parts = log_segment.checkpoint_parts; + + assert_eq!(checkpoint_parts.len(), 1); + assert_eq!(checkpoint_parts[0].version, 3); + + let versions = commit_files.into_iter().map(|x| x.version).collect_vec(); + let expected_versions = vec![4, 5, 6, 7]; + assert_eq!(versions, expected_versions); +} + #[test] fn build_snapshot_with_out_of_date_last_checkpoint() { let checkpoint_metadata = CheckpointMetadata { @@ -331,6 +360,7 @@ fn build_snapshot_with_out_of_date_last_checkpoint_and_incomplete_recent_checkpo let checkpoint_parts = log_segment.checkpoint_parts; assert_eq!(checkpoint_parts.len(), 1); + assert_eq!(checkpoint_parts[0].version, 3); let versions = commit_files.into_iter().map(|x| x.version).collect_vec(); let expected_versions = vec![4, 5, 6, 7]; From 206250d1a8c0ede8e1ea76ed6675a4f9cbc84bfc Mon Sep 17 00:00:00 2001 From: sebastian tia Date: Tue, 14 Jan 2025 15:27:42 -0800 Subject: [PATCH 06/10] handle multiple incomplete checkpoints of the same version --- kernel/src/log_segment.rs | 51 +++++++++++++++++++++++++++++++-- kernel/src/log_segment/tests.rs | 38 ++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 2 deletions(-) diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index b6d1c5f2f..69e25a4f5 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -10,6 +10,7 @@ use crate::{ DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileSystemClient, Version, }; use itertools::{process_results, Itertools}; +use std::collections::HashMap; use std::convert::identity; use std::sync::{Arc, LazyLock}; use tracing::warn; @@ -324,8 +325,54 @@ fn list_log_files_with_version( new_checkpoint_parts.push(file); } } - if validate_checkpoint_parts(version, &new_checkpoint_parts) { - checkpoint_parts = new_checkpoint_parts; + + // Group checkpoint parts by the number of parts they have + let mut checkpoints = HashMap::new(); + for part_file in new_checkpoint_parts { + use LogPathFileType::*; + match &part_file.file_type { + SinglePartCheckpoint + | UuidCheckpoint(_) + | MultiPartCheckpoint { + part_num: 1, + num_parts: 1, + } => { + // All single-file checkpoints are equivalent, just keep one + checkpoints.insert(1, vec![part_file]); + } + MultiPartCheckpoint { + part_num: 1, + num_parts, + } => { + // Start a new multi-part checkpoint with at least 2 parts + checkpoints.insert(*num_parts, vec![part_file]); + } + MultiPartCheckpoint { + part_num, + num_parts, + } => { + // Continue a new multi-part checkpoint with at least 2 parts + if let Some(part_files) = checkpoints.get_mut(num_parts) { + if *part_num == 1 + part_files.len() as u32 { + // Safe to append because all previous parts exist + part_files.push(part_file); + } + } + } + Commit | CompactedCommit { .. } | Unknown => {} // invalid file type => do nothing + } + } + + // Find a complete checkpoint (all parts exist) + if let Some((_, complete_checkpoint)) = checkpoints + .into_iter() + .find(|(num_parts, part_files)| part_files.len() as u32 == *num_parts) + { + // Validate the checkpoint before updating state + if validate_checkpoint_parts(version, &complete_checkpoint) { + checkpoint_parts = complete_checkpoint; + commit_files.clear(); // Clear commit files once checkpoint is found + } } } })?; diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index 1ec753f34..7939e1909 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -136,6 +136,44 @@ fn build_snapshot_with_unsupported_uuid_checkpoint() { assert_eq!(versions, expected_versions); } +#[test] +fn build_snapshot_with_multiple_incomplete_multipart_checkpoints() { + let (client, log_root) = build_log_with_paths_and_checkpoint( + &[ + delta_path_for_version(0, "json"), + delta_path_for_multipart_checkpoint(1, 1, 3), + // Part 2 is missing! + delta_path_for_multipart_checkpoint(3, 3, 3), + // Part 1 is missing! + delta_path_for_multipart_checkpoint(2, 1, 2), + delta_path_for_version(2, "json"), + delta_path_for_multipart_checkpoint(3, 1, 3), + // Part 2 is missing! + delta_path_for_multipart_checkpoint(3, 3, 3), + delta_path_for_multipart_checkpoint(3, 1, 4), + delta_path_for_multipart_checkpoint(3, 2, 4), + delta_path_for_multipart_checkpoint(3, 3, 4), + delta_path_for_multipart_checkpoint(3, 4, 4), + delta_path_for_version(4, "json"), + delta_path_for_version(5, "json"), + delta_path_for_version(6, "json"), + delta_path_for_version(7, "json"), + ], + None, + ); + + let log_segment = LogSegment::for_snapshot(client.as_ref(), log_root, None, None).unwrap(); + let commit_files = log_segment.ascending_commit_files; + let checkpoint_parts = log_segment.checkpoint_parts; + + assert_eq!(checkpoint_parts.len(), 4); + assert_eq!(checkpoint_parts[0].version, 3); + + let versions = commit_files.into_iter().map(|x| x.version).collect_vec(); + let expected_versions = vec![4, 5, 6, 7]; + assert_eq!(versions, expected_versions); +} + #[test] fn build_snapshot_with_out_of_date_last_checkpoint() { let checkpoint_metadata = CheckpointMetadata { From ec3ca8344a7bd6b47d3911eeda649dcf340c13d6 Mon Sep 17 00:00:00 2001 From: sebastian tia Date: Thu, 16 Jan 2025 13:48:08 -0800 Subject: [PATCH 07/10] remove redundant validation & refactor --- kernel/src/log_segment.rs | 162 ++++++++++++++------------------------ 1 file changed, 59 insertions(+), 103 deletions(-) diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 69e25a4f5..6e5123b1a 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -309,13 +309,14 @@ fn list_log_files_with_version( ) -> DeltaResult<(Vec, Vec)> { // We expect 10 commit files per checkpoint, so start with that size. We could adjust this based // on config at some point - let mut commit_files = Vec::with_capacity(10); - let mut checkpoint_parts = vec![]; let log_files = list_log_files(fs_client, log_root, start_version, end_version)?; process_results(log_files, |iter| { - let log_files = iter.chunk_by(move |x| x.version); + let mut commit_files = Vec::with_capacity(10); + let mut checkpoint_parts = vec![]; + + let log_files = iter.chunk_by(|x| x.version); for (version, files) in &log_files { let mut new_checkpoint_parts = vec![]; for file in files { @@ -323,61 +324,69 @@ fn list_log_files_with_version( commit_files.push(file); } else if file.is_checkpoint() { new_checkpoint_parts.push(file); + } else { + warn!( + "Found a file with unknown file type {:?} at version {}", + file.file_type, version + ); } } - // Group checkpoint parts by the number of parts they have - let mut checkpoints = HashMap::new(); - for part_file in new_checkpoint_parts { - use LogPathFileType::*; - match &part_file.file_type { - SinglePartCheckpoint - | UuidCheckpoint(_) - | MultiPartCheckpoint { - part_num: 1, - num_parts: 1, - } => { - // All single-file checkpoints are equivalent, just keep one - checkpoints.insert(1, vec![part_file]); - } - MultiPartCheckpoint { - part_num: 1, - num_parts, - } => { - // Start a new multi-part checkpoint with at least 2 parts - checkpoints.insert(*num_parts, vec![part_file]); - } - MultiPartCheckpoint { - part_num, - num_parts, - } => { - // Continue a new multi-part checkpoint with at least 2 parts - if let Some(part_files) = checkpoints.get_mut(num_parts) { - if *part_num == 1 + part_files.len() as u32 { - // Safe to append because all previous parts exist - part_files.push(part_file); - } - } - } - Commit | CompactedCommit { .. } | Unknown => {} // invalid file type => do nothing - } - } - - // Find a complete checkpoint (all parts exist) - if let Some((_, complete_checkpoint)) = checkpoints + // Group and find the first complete checkpoint found for this version, + // no matter how many exist as they are all equivalent. + if let Some((_, complete_checkpoint)) = group_checkpoint_parts(new_checkpoint_parts) .into_iter() - .find(|(num_parts, part_files)| part_files.len() as u32 == *num_parts) + .find(|(num_parts, part_files)| part_files.len() == *num_parts as usize) { - // Validate the checkpoint before updating state - if validate_checkpoint_parts(version, &complete_checkpoint) { - checkpoint_parts = complete_checkpoint; - commit_files.clear(); // Clear commit files once checkpoint is found - } + checkpoint_parts = complete_checkpoint; + commit_files.clear(); // Clear commits when a complete checkpoint is found } } - })?; + (commit_files, checkpoint_parts) + }) +} - Ok((commit_files, checkpoint_parts)) +/// Groups all checkpoint parts according to the size of the checkpoint they belong to. +/// +/// NOTE: There could be a single-part and/or any number of uuid-based checkpoints. They +/// are all equivalent, and this routine keeps only one of them (arbitrarily chosen). +fn group_checkpoint_parts(parts: Vec) -> HashMap> { + let mut checkpoints: HashMap> = HashMap::new(); + for part_file in parts { + use LogPathFileType::*; + match &part_file.file_type { + SinglePartCheckpoint + | UuidCheckpoint(_) + | MultiPartCheckpoint { + part_num: 1, + num_parts: 1, + } => { + // All single-file checkpoints are equivalent, just keep one + checkpoints.insert(1, vec![part_file]); + } + MultiPartCheckpoint { + part_num: 1, + num_parts, + } => { + // Start a new multi-part checkpoint with at least 2 parts + checkpoints.insert(*num_parts, vec![part_file]); + } + MultiPartCheckpoint { + part_num, + num_parts, + } => { + // Continue a new multi-part checkpoint with at least 2 parts + if let Some(part_files) = checkpoints.get_mut(num_parts) { + if *part_num as usize == 1 + part_files.len() { + // Safe to append because all previous parts exist + part_files.push(part_file); + } + } + } + Commit | CompactedCommit { .. } | Unknown => {} + } + } + checkpoints } /// List all commit and checkpoint files after the provided checkpoint. It is guaranteed that all @@ -417,56 +426,3 @@ fn list_log_files_with_checkpoint( } Ok((commit_files, checkpoint_parts)) } - -/// Validates that all the checkpoint parts belong to the same checkpoint version and that all parts -/// are present. Returns `true` if we have a complete checkpoint, `false` otherwise. -fn validate_checkpoint_parts(version: u64, checkpoint_parts: &[ParsedLogPath]) -> bool { - match checkpoint_parts.last().map(|file| &file.file_type) { - Some(LogPathFileType::MultiPartCheckpoint { num_parts, .. }) => { - if *num_parts as usize != checkpoint_parts.len() { - warn!( - "Found a multi-part checkpoint at version {}. Found {} parts, expected {}", - version, - checkpoint_parts.len(), - num_parts - ); - return false; - } - } - Some(LogPathFileType::SinglePartCheckpoint) => { - if checkpoint_parts.len() != 1 { - warn!( - "Found a single-part checkpoint at version {}. Found {} parts", - version, - checkpoint_parts.len() - ); - return false; - } - } - Some(LogPathFileType::UuidCheckpoint(_)) => { - warn!( - "Found a UUID checkpoint at version {} when it is not supported", - version - ); - return false; - } - Some(LogPathFileType::Commit) | Some(LogPathFileType::CompactedCommit { .. }) => { - warn!( - "Found a commit file at version {} when expecting a checkpoint", - version - ); - return false; - } - Some(LogPathFileType::Unknown) => { - warn!( - "Found an unknown file type at version {} when expecting a checkpoint", - version - ); - return false; - } - // No checkpoint parts - None => return false, - } - - true -} From 537190a1d55702ef3e6886fe35668f53ccfa9e8b Mon Sep 17 00:00:00 2001 From: sebastian tia Date: Thu, 16 Jan 2025 13:58:28 -0800 Subject: [PATCH 08/10] cleanup doc --- kernel/src/log_segment.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 6e5123b1a..1efd3fdab 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -332,8 +332,8 @@ fn list_log_files_with_version( } } - // Group and find the first complete checkpoint found for this version, - // no matter how many exist as they are all equivalent. + // Group and find the first complete checkpoint for this version. + // All checkpoints for the same version are equivalent, so we only take one. if let Some((_, complete_checkpoint)) = group_checkpoint_parts(new_checkpoint_parts) .into_iter() .find(|(num_parts, part_files)| part_files.len() == *num_parts as usize) From 9b170fbe584513badaa2f51a705022409568e9cc Mon Sep 17 00:00:00 2001 From: sebastian tia Date: Thu, 16 Jan 2025 16:27:42 -0800 Subject: [PATCH 09/10] doc & test cleanup --- kernel/src/log_segment.rs | 2 +- kernel/src/log_segment/tests.rs | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 1efd3fdab..f7c7de3dd 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -339,7 +339,7 @@ fn list_log_files_with_version( .find(|(num_parts, part_files)| part_files.len() == *num_parts as usize) { checkpoint_parts = complete_checkpoint; - commit_files.clear(); // Clear commits when a complete checkpoint is found + commit_files.clear(); // Log replay only uses commits after a complete checkpoint } } (commit_files, checkpoint_parts) diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index 7939e1909..3eaedb092 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -142,13 +142,13 @@ fn build_snapshot_with_multiple_incomplete_multipart_checkpoints() { &[ delta_path_for_version(0, "json"), delta_path_for_multipart_checkpoint(1, 1, 3), - // Part 2 is missing! + // Part 2 of 3 at version 1 is missing! delta_path_for_multipart_checkpoint(3, 3, 3), - // Part 1 is missing! delta_path_for_multipart_checkpoint(2, 1, 2), + // Part 2 of 2 at version 2 is missing! delta_path_for_version(2, "json"), delta_path_for_multipart_checkpoint(3, 1, 3), - // Part 2 is missing! + // Part 2 of 3 at version 3 is missing! delta_path_for_multipart_checkpoint(3, 3, 3), delta_path_for_multipart_checkpoint(3, 1, 4), delta_path_for_multipart_checkpoint(3, 2, 4), @@ -276,7 +276,7 @@ fn build_snapshot_with_missing_checkpoint_part_from_hint_fails() { delta_path_for_version(3, "json"), delta_path_for_version(4, "json"), delta_path_for_multipart_checkpoint(5, 1, 3), - // Part 2 is missing! + // Part 2 of 3 at version 5 is missing! delta_path_for_multipart_checkpoint(5, 3, 3), delta_path_for_version(5, "json"), delta_path_for_version(6, "json"), @@ -338,7 +338,7 @@ fn build_snapshot_with_missing_checkpoint_part_no_hint() { delta_path_for_version(3, "json"), delta_path_for_version(4, "json"), delta_path_for_multipart_checkpoint(5, 1, 3), - // Part 2 is missing! + // Part 2 of 3 at version 5 is missing! delta_path_for_multipart_checkpoint(5, 3, 3), delta_path_for_version(5, "json"), delta_path_for_version(6, "json"), @@ -383,7 +383,7 @@ fn build_snapshot_with_out_of_date_last_checkpoint_and_incomplete_recent_checkpo delta_path_for_version(3, "checkpoint.parquet"), delta_path_for_version(4, "json"), delta_path_for_multipart_checkpoint(5, 1, 3), - // Part 2 is missing! + // Part 2 of 3 at version 5 is missing! delta_path_for_multipart_checkpoint(5, 3, 3), delta_path_for_version(5, "json"), delta_path_for_version(6, "json"), From 4dd4e5b08c6c8c8fd386e2dc2bcaa0d71ff12220 Mon Sep 17 00:00:00 2001 From: sebastian tia Date: Fri, 17 Jan 2025 09:08:39 -0800 Subject: [PATCH 10/10] oops --- kernel/src/log_segment/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index 3eaedb092..e1c441d7e 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -143,7 +143,7 @@ fn build_snapshot_with_multiple_incomplete_multipart_checkpoints() { delta_path_for_version(0, "json"), delta_path_for_multipart_checkpoint(1, 1, 3), // Part 2 of 3 at version 1 is missing! - delta_path_for_multipart_checkpoint(3, 3, 3), + delta_path_for_multipart_checkpoint(1, 3, 3), delta_path_for_multipart_checkpoint(2, 1, 2), // Part 2 of 2 at version 2 is missing! delta_path_for_version(2, "json"),