Skip to content

Commit

Permalink
Refactor it a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Oct 23, 2023
1 parent 760c0d4 commit 3dba41a
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 126 deletions.
85 changes: 47 additions & 38 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ def __repr__(self) -> str:
repr_properties = f", **{repr(self._additional_properties)}" if self._additional_properties else ""
return f"Summary({repr(self.operation)}{repr_properties})"

def __eq__(self, other: Any) -> bool:
"""Compare if the summary is equal to another summary."""
return (
self.operation == other.operation and self.additional_properties == other.additional_properties
if isinstance(other, Summary)
else False
)


class Snapshot(IcebergBaseModel):
snapshot_id: int = Field(alias="snapshot-id")
Expand Down Expand Up @@ -207,13 +215,13 @@ def removed_file(self, data_file: DataFile) -> None:

def added_manifest(self, manifest: ManifestFile) -> None:
if manifest.content == ManifestContent.DATA:
self.added_files += manifest.added_files_count
self.added_records += manifest.added_rows_count
self.removed_files += manifest.deleted_files_count
self.deleted_records += manifest.deleted_rows_count
self.added_files += manifest.added_files_count or 0
self.added_records += manifest.added_rows_count or 0
self.removed_files += manifest.deleted_files_count or 0
self.deleted_records += manifest.deleted_rows_count or 0
elif manifest.content == ManifestContent.DELETES:
self.added_delete_files += manifest.added_files_count
self.removed_delete_files += manifest.deleted_files_count
self.added_delete_files += manifest.added_files_count or 0
self.removed_delete_files += manifest.deleted_files_count or 0
else:
raise ValueError(f"Unknown manifest file content: {manifest.content}")

Expand Down Expand Up @@ -246,38 +254,39 @@ def set_non_zero(properties: Dict[str, str], num: int, property_name: str) -> No
properties = ['records', 'files-size', 'data-files', 'delete-files', 'position-deletes', 'equality-deletes']


def truncate_table_summary(summary: Dict[str, str]) -> Dict[str, str]:
truncated_metrics = {
'total-data-files': '0',
'total-delete-files': '0',
'total-records': '0',
'total-files-size': '0',
'total-position-deletes': '0',
'total-equality-deletes': '0',
}

def _cast_to_int(value: str) -> Optional[int]:
return int(value) if value is not None else None
def truncate_table_summary(summary: Summary, previous_summary: Mapping[str, str]) -> Summary:
for prop in {
'total-data-files',
'total-delete-files',
'total-records',
'total-files-size',
'total-position-deletes',
'total-equality-deletes',
}:
summary[prop] = '0'

if value := previous_summary.get('total-data-files'):
summary['deleted-data-files'] = value
if value := previous_summary.get('total-delete-files'):
summary['removed-delete-files'] = value
if value := previous_summary.get('total-records'):
summary['deleted-records'] = value
if value := previous_summary.get('total-files-size'):
summary['removed-files-size'] = value
if value := previous_summary.get('total-position-deletes'):
summary['removed-position-deletes'] = value
if value := previous_summary.get('total-equality-deletes'):
summary['removed-equality-deletes'] = value

if value := _cast_to_int(summary.get('total-data-files')):
truncated_metrics['deleted-data-files'] = str(value)
if value := _cast_to_int(summary.get('total-delete-files')):
truncated_metrics['removed-delete-files'] = str(value)
if value := _cast_to_int(summary.get('total-records')):
truncated_metrics['deleted-records'] = str(value)
if value := _cast_to_int(summary.get('total-files-size')):
truncated_metrics['removed-files-size'] = str(value)
if value := _cast_to_int(summary.get('total-position-deletes')):
truncated_metrics['removed-position-deletes'] = str(value)
if value := _cast_to_int(summary.get('total-equality-deletes')):
truncated_metrics['removed-equality-deletes'] = str(value)

return truncated_metrics
return summary


def merge_snapshot_summaries(operation: Operation, previous_summary: Optional[Mapping], summary: Mapping) -> Dict[str, str]:
if operation == Operation.OVERWRITE:
summary.update(truncate_table_summary(previous_summary))
def merge_snapshot_summaries(
summary: Summary,
previous_summary: Optional[Mapping[str, str]] = None,
) -> Summary:
if summary.operation == Operation.OVERWRITE and previous_summary is not None:
summary = truncate_table_summary(summary, previous_summary)

if not previous_summary:
previous_summary = {
Expand All @@ -289,9 +298,9 @@ def merge_snapshot_summaries(operation: Operation, previous_summary: Optional[Ma
'total-equality-deletes': '0',
}

def _update_totals(total_property: str, added_property: str, removed_property: str):
if new_total := previous_summary.get(total_property):
new_total = int(new_total)
def _update_totals(total_property: str, added_property: str, removed_property: str) -> None:
if new_total_str := previous_summary.get(total_property):
new_total = int(new_total_str)
if new_total >= 0 and (added := summary.get(added_property)):
new_total += int(added)
if new_total >= 0 and (removed := summary.get(removed_property)):
Expand Down
166 changes: 78 additions & 88 deletions tests/table/test_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,120 +157,110 @@ def test_snapshot_summary_collector(manifest_file: ManifestFile, data_file: Data

ssc.add_file(data_file)

assert ssc.build() == {'added-data-files': '2', 'added-records': '200', 'deleted-records': '120', 'removed-data-files': '3'}


def test_merge_snapshot_summaries_empty():
assert merge_snapshot_summaries(None, {}) == {
'total-data-files': '0',
'total-delete-files': '0',
'total-equality-deletes': '0',
'total-files-size': '0',
'total-position-deletes': '0',
'total-records': '0',
assert ssc.build() == {
'added-data-files': '2',
'added-files-size': '1234',
'added-records': '200',
'deleted-records': '120',
'removed-data-files': '3',
}


def test_merge_snapshot_summaries_new_summary():
actual = merge_snapshot_summaries(
None,
{
def test_merge_snapshot_summaries_empty() -> None:
assert merge_snapshot_summaries(Summary(Operation.APPEND)) == Summary(
operation=Operation.APPEND,
**{
'total-data-files': '0',
'added-data-files': '2',
'removed-data-files': '1',
'total-delete-files': '0',
'added-delete-files': '2',
'removed-delete-files': '1',
'total-equality-deletes': '0',
'added-equality-deletes': '2',
'removed-equality-deletes': '1',
'total-records': '0',
'total-files-size': '0',
'added-files-size': '2',
'removed-files-size': '1',
'total-position-deletes': '0',
'added-position-deletes': '2',
'removed-position-deletes': '1',
'total-records': '0',
'added-records': '2',
'removed-records': '1',
'total-equality-deletes': '0',
},
)

expected = {
'added-data-files': '2',
'added-delete-files': '2',
'added-equality-deletes': '2',
'added-files-size': '2',
'added-position-deletes': '2',
'added-records': '2',
'removed-data-files': '1',
'removed-delete-files': '1',
'removed-equality-deletes': '1',
'removed-files-size': '1',
'removed-position-deletes': '1',
'removed-records': '1',
'total-data-files': '2',
'total-delete-files': '2',
'total-equality-deletes': '2',
'total-files-size': '2',
'total-position-deletes': '2',
'total-records': '2',
}

def test_merge_snapshot_summaries_new_summary() -> None:
actual = merge_snapshot_summaries(
summary=Summary(
operation=Operation.APPEND,
**{
'added-data-files': '1',
'added-delete-files': '2',
'added-equality-deletes': '3',
'added-files-size': '4',
'added-position-deletes': '5',
'added-records': '6',
},
)
)

expected = Summary(
operation=Operation.APPEND,
**{
'added-data-files': '1',
'added-delete-files': '2',
'added-equality-deletes': '3',
'added-files-size': '4',
'added-position-deletes': '5',
'added-records': '6',
'total-data-files': '1',
'total-delete-files': '2',
'total-records': '6',
'total-files-size': '4',
'total-position-deletes': '5',
'total-equality-deletes': '3',
},
)

assert actual == expected


def test_merge_snapshot_summaries_old_and_new_summary():
def test_merge_snapshot_summaries_overwrite_summary() -> None:
actual = merge_snapshot_summaries(
{
summary=Summary(
operation=Operation.OVERWRITE,
**{
'added-data-files': '1',
'added-delete-files': '2',
'added-equality-deletes': '3',
'added-files-size': '4',
'added-position-deletes': '5',
'added-records': '6',
},
),
previous_summary={
'total-data-files': '1',
'total-delete-files': '1',
'total-equality-deletes': '1',
'total-files-size': '1',
'total-position-deletes': '1',
'total-records': '1',
},
{
'total-data-files': '0',
'added-data-files': '2',
'removed-data-files': '1',
'total-delete-files': '0',
)

expected = Summary(
operation=Operation.OVERWRITE,
**{
'added-data-files': '1',
'added-delete-files': '2',
'added-equality-deletes': '3',
'added-files-size': '4',
'added-position-deletes': '5',
'added-records': '6',
'total-position-deletes': '5',
'total-files-size': '5',
'total-records': '6',
'total-delete-files': '2',
'total-data-files': '1',
'total-equality-deletes': '3',
'deleted-data-files': '1',
'removed-delete-files': '1',
'total-equality-deletes': '0',
'added-equality-deletes': '2',
'removed-equality-deletes': '1',
'total-files-size': '0',
'added-files-size': '2',
'deleted-records': '1',
'removed-files-size': '1',
'total-position-deletes': '0',
'added-position-deletes': '2',
'removed-position-deletes': '1',
'total-records': '0',
'added-records': '2',
'removed-records': '1',
'removed-equality-deletes': '1',
},
)

expected = {
'added-data-files': '2',
'added-delete-files': '2',
'added-equality-deletes': '2',
'added-files-size': '2',
'added-position-deletes': '2',
'added-records': '2',
'removed-data-files': '1',
'removed-delete-files': '1',
'removed-equality-deletes': '1',
'removed-files-size': '1',
'removed-position-deletes': '1',
'removed-records': '1',
'total-data-files': '3',
'total-delete-files': '3',
'total-equality-deletes': '3',
'total-files-size': '3',
'total-position-deletes': '3',
'total-records': '3',
}

assert actual == expected
assert actual.additional_properties == expected.additional_properties

0 comments on commit 3dba41a

Please sign in to comment.