Skip to content

Commit

Permalink
Merge branch 'main' into cdf_delta_spark_tests
Browse files Browse the repository at this point in the history
  • Loading branch information
OussamaSaoudi authored Jan 14, 2025
2 parents 820a384 + c1c1dbe commit 8156f9b
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 13 deletions.
2 changes: 1 addition & 1 deletion kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ pub struct Add {
/// in the added file must be contained in one or more remove actions in the same version.
pub data_change: bool,

/// Contains [statistics] (e.g., count, min/max values for columns) about the data in this logical file.
/// Contains [statistics] (e.g., count, min/max values for columns) about the data in this logical file encoded as a JSON string.
///
/// [statistics]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Per-file-Statistics
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
Expand Down
54 changes: 45 additions & 9 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ impl RemoveVisitor {
let extended_file_metadata: Option<bool> =
getters[3].get_opt(row_index, "remove.extendedFileMetadata")?;

// TODO(nick) handle partition values in getters[4]
let partition_values: Option<HashMap<_, _>> =
getters[4].get_opt(row_index, "remove.partitionValues")?;

let size: Option<i64> = getters[5].get_opt(row_index, "remove.size")?;

Expand All @@ -284,7 +285,7 @@ impl RemoveVisitor {
data_change,
deletion_timestamp,
extended_file_metadata,
partition_values: None,
partition_values,
size,
tags: None,
deletion_vector,
Expand All @@ -305,10 +306,9 @@ impl RowVisitor for RemoveVisitor {
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
for i in 0..row_count {
// Since path column is required, use it to detect presence of an Remove action
// Since path column is required, use it to detect presence of a Remove action
if let Some(path) = getters[0].get_opt(i, "remove.path")? {
self.removes.push(Self::visit_remove(i, path, getters)?);
break;
}
}
Ok(())
Expand Down Expand Up @@ -603,11 +603,7 @@ mod tests {
modification_time: 1670892998135,
data_change: true,
stats: Some("{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}".into()),
tags: None,
deletion_vector: None,
base_row_id: None,
default_row_commit_version: None,
clustering_provider: None,
..Default::default()
};
let add2 = Add {
path: "c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet".into(),
Expand All @@ -630,11 +626,51 @@ mod tests {
..add1.clone()
};
let expected = vec![add1, add2, add3];
assert_eq!(add_visitor.adds.len(), expected.len());
for (add, expected) in add_visitor.adds.into_iter().zip(expected.into_iter()) {
assert_eq!(add, expected);
}
}

#[test]
fn test_parse_remove_partitioned() {
let engine = SyncEngine::new();
let json_handler = engine.get_json_handler();
let json_strings: StringArray = vec![
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#,
r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
r#"{"remove":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","deletionTimestamp":1670892998135,"dataChange":true,"partitionValues":{"c1":"4","c2":"c"},"size":452}}"#,
]
.into();
let output_schema = get_log_schema().clone();
let batch = json_handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
let mut remove_visitor = RemoveVisitor::default();
remove_visitor.visit_rows_of(batch.as_ref()).unwrap();
let expected_remove = Remove {
path: "c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet"
.into(),
deletion_timestamp: Some(1670892998135),
data_change: true,
partition_values: Some(HashMap::from([
("c1".to_string(), "4".to_string()),
("c2".to_string(), "c".to_string()),
])),
size: Some(452),
..Default::default()
};
assert_eq!(
remove_visitor.removes.len(),
1,
"Unexpected number of remove actions"
);
assert_eq!(
remove_visitor.removes[0], expected_remove,
"Unexpected remove action"
);
}

#[test]
fn test_parse_txn() {
let engine = SyncEngine::new();
Expand Down
16 changes: 14 additions & 2 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,20 @@ pub trait JsonHandler: AsAny {
output_schema: SchemaRef,
) -> DeltaResult<Box<dyn EngineData>>;

/// Read and parse the JSON format file at given locations and return
/// the data as EngineData with the columns requested by physical schema.
/// Read and parse the JSON format file at given locations and return the data as EngineData with
/// the columns requested by physical schema. Note: The [`FileDataReadResultIterator`] must emit
/// data from files in the order that `files` is given. For example if files ["a", "b"] is provided,
/// then the engine data iterator must first return all the engine data from file "a", _then_ all
/// the engine data from file "b". Moreover, for a given file, all of its [`EngineData`] and
/// constituent rows must be in order that they occur in the file. Consider a file with rows
/// (1, 2, 3). The following are legal iterator batches:
/// iter: [EngineData(1, 2), EngineData(3)]
/// iter: [EngineData(1), EngineData(2, 3)]
/// iter: [EngineData(1, 2, 3)]
/// The following are illegal batches:
/// iter: [EngineData(3), EngineData(1, 2)]
/// iter: [EngineData(1), EngineData(3, 2)]
/// iter: [EngineData(2, 1, 3)]
///
/// # Parameters
///
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ pub struct StructType {
pub type_name: String,
/// The type of element stored in this array
// We use indexmap to preserve the order of fields as they are defined in the schema
// while also allowing for fast lookup by name. The atlerative to do a liner search
// while also allowing for fast lookup by name. The alternative is to do a linear search
// for each field by name would be potentially quite expensive for large schemas.
pub fields: IndexMap<String, StructField>,
}
Expand Down
1 change: 1 addition & 0 deletions kernel/src/table_changes/log_replay/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ async fn dv() {
assert_eq!(sv, &[false, true, true]);
}

// Note: Data skipping does not work on Remove actions.
#[tokio::test]
async fn data_skipping_filter() {
let engine = Arc::new(SyncEngine::new());
Expand Down

0 comments on commit 8156f9b

Please sign in to comment.