diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index c62486873..630eedc81 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -375,7 +375,7 @@ pub struct Add { /// in the added file must be contained in one or more remove actions in the same version. pub data_change: bool, - /// Contains [statistics] (e.g., count, min/max values for columns) about the data in this logical file. + /// Contains [statistics] (e.g., count, min/max values for columns) about the data in this logical file encoded as a JSON string. /// /// [statistics]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Per-file-Statistics #[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))] diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index 0cd12ce50..957befe80 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -267,7 +267,8 @@ impl RemoveVisitor { let extended_file_metadata: Option = getters[3].get_opt(row_index, "remove.extendedFileMetadata")?; - // TODO(nick) handle partition values in getters[4] + let partition_values: Option> = + getters[4].get_opt(row_index, "remove.partitionValues")?; let size: Option = getters[5].get_opt(row_index, "remove.size")?; @@ -284,7 +285,7 @@ impl RemoveVisitor { data_change, deletion_timestamp, extended_file_metadata, - partition_values: None, + partition_values, size, tags: None, deletion_vector, @@ -305,10 +306,9 @@ impl RowVisitor for RemoveVisitor { } fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { for i in 0..row_count { - // Since path column is required, use it to detect presence of an Remove action + // Since path column is required, use it to detect presence of a Remove action if let Some(path) = getters[0].get_opt(i, "remove.path")? { self.removes.push(Self::visit_remove(i, path, getters)?); - break; } } Ok(()) @@ -603,11 +603,7 @@ mod tests { modification_time: 1670892998135, data_change: true, stats: Some("{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}".into()), - tags: None, - deletion_vector: None, - base_row_id: None, - default_row_commit_version: None, - clustering_provider: None, + ..Default::default() }; let add2 = Add { path: "c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet".into(), @@ -630,11 +626,51 @@ mod tests { ..add1.clone() }; let expected = vec![add1, add2, add3]; + assert_eq!(add_visitor.adds.len(), expected.len()); for (add, expected) in add_visitor.adds.into_iter().zip(expected.into_iter()) { assert_eq!(add, expected); } } + #[test] + fn test_parse_remove_partitioned() { + let engine = SyncEngine::new(); + let json_handler = engine.get_json_handler(); + let json_strings: StringArray = vec![ + r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#, + r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, + r#"{"remove":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","deletionTimestamp":1670892998135,"dataChange":true,"partitionValues":{"c1":"4","c2":"c"},"size":452}}"#, + ] + .into(); + let output_schema = get_log_schema().clone(); + let batch = json_handler + .parse_json(string_array_to_engine_data(json_strings), output_schema) + .unwrap(); + let mut remove_visitor = RemoveVisitor::default(); + remove_visitor.visit_rows_of(batch.as_ref()).unwrap(); + let expected_remove = Remove { + path: "c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet" + .into(), + deletion_timestamp: Some(1670892998135), + data_change: true, + partition_values: Some(HashMap::from([ + ("c1".to_string(), "4".to_string()), + ("c2".to_string(), "c".to_string()), + ])), + size: Some(452), + ..Default::default() + }; + assert_eq!( + remove_visitor.removes.len(), + 1, + "Unexpected number of remove actions" + ); + assert_eq!( + remove_visitor.removes[0], expected_remove, + "Unexpected remove action" + ); + } + #[test] fn test_parse_txn() { let engine = SyncEngine::new(); diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index fa88e7afa..49dceea75 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -371,8 +371,20 @@ pub trait JsonHandler: AsAny { output_schema: SchemaRef, ) -> DeltaResult>; - /// Read and parse the JSON format file at given locations and return - /// the data as EngineData with the columns requested by physical schema. + /// Read and parse the JSON format file at given locations and return the data as EngineData with + /// the columns requested by physical schema. Note: The [`FileDataReadResultIterator`] must emit + /// data from files in the order that `files` is given. For example if files ["a", "b"] is provided, + /// then the engine data iterator must first return all the engine data from file "a", _then_ all + /// the engine data from file "b". Moreover, for a given file, all of its [`EngineData`] and + /// constituent rows must be in order that they occur in the file. Consider a file with rows + /// (1, 2, 3). The following are legal iterator batches: + /// iter: [EngineData(1, 2), EngineData(3)] + /// iter: [EngineData(1), EngineData(2, 3)] + /// iter: [EngineData(1, 2, 3)] + /// The following are illegal batches: + /// iter: [EngineData(3), EngineData(1, 2)] + /// iter: [EngineData(1), EngineData(3, 2)] + /// iter: [EngineData(2, 1, 3)] /// /// # Parameters /// diff --git a/kernel/src/schema.rs b/kernel/src/schema.rs index 9cb6769f9..d2ff65193 100644 --- a/kernel/src/schema.rs +++ b/kernel/src/schema.rs @@ -223,7 +223,7 @@ pub struct StructType { pub type_name: String, /// The type of element stored in this array // We use indexmap to preserve the order of fields as they are defined in the schema - // while also allowing for fast lookup by name. The atlerative to do a liner search + // while also allowing for fast lookup by name. The alternative is to do a linear search // for each field by name would be potentially quite expensive for large schemas. pub fields: IndexMap, } diff --git a/kernel/src/table_changes/log_replay/tests.rs b/kernel/src/table_changes/log_replay/tests.rs index f2dbdd956..29e076c07 100644 --- a/kernel/src/table_changes/log_replay/tests.rs +++ b/kernel/src/table_changes/log_replay/tests.rs @@ -469,6 +469,7 @@ async fn dv() { assert_eq!(sv, &[false, true, true]); } +// Note: Data skipping does not work on Remove actions. #[tokio::test] async fn data_skipping_filter() { let engine = Arc::new(SyncEngine::new());