Skip to content

Commit

Permalink
Add table statistics update
Browse files Browse the repository at this point in the history
  • Loading branch information
ndrluis committed Jan 8, 2025
1 parent 3b58011 commit adfc971
Show file tree
Hide file tree
Showing 11 changed files with 495 additions and 2 deletions.
24 changes: 24 additions & 0 deletions dev/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,27 @@
)
spark.sql(f"ALTER TABLE {catalog_name}.default.test_empty_scan_ordered_str WRITE ORDERED BY id")
spark.sql(f"INSERT INTO {catalog_name}.default.test_empty_scan_ordered_str VALUES 'a', 'c'")

spark.sql(
f"""
CREATE OR REPLACE TABLE {catalog_name}.default.test_table_statistics_operations (
number integer
)
USING iceberg
TBLPROPERTIES (
'format-version'='2'
);
"""
)
spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_table_statistics_operations
VALUES (1)
"""
)
spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_table_statistics_operations
VALUES (2)
"""
)
23 changes: 23 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,29 @@ with table.manage_snapshots() as ms:
ms.create_branch(snapshot_id1, "Branch_A").create_tag(snapshot_id2, "tag789")
```

## Table Statistics Management

Manage table statistics with operations through the `Table` API:

```python
# To run a specific operation
table.update_statistics().set_statistics(snapshot_id, statistics_file).commit()
# To run multiple operations
table.update_statistics()
.set_statistics(snapshot_id1, statistics_file1)
.remove_statistics(snapshot_id2)
.commit()
# Operations are applied on commit.
```

You can also use context managers to make more changes:

```python
with table.update_statistics() as update:
update.set_statistics(snaphsot_id1, statistics_file)
update.remove_statistics(snapshot_id2)
```

## Query the data

To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID:
Expand Down
18 changes: 18 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
_FastAppendFiles,
)
from pyiceberg.table.update.spec import UpdateSpec
from pyiceberg.table.update.statistics import UpdateStatistics
from pyiceberg.transforms import IdentityTransform
from pyiceberg.typedef import (
EMPTY_DICT,
Expand Down Expand Up @@ -1035,6 +1036,23 @@ def manage_snapshots(self) -> ManageSnapshots:
"""
return ManageSnapshots(transaction=Transaction(self, autocommit=True))

def update_statistics(self) -> UpdateStatistics:
"""
Shorthand to run statistics management operations like add statistics and remove statistics.
Use table.update_statistics().<operation>().commit() to run a specific operation.
Use table.update_statistics().<operation-one>().<operation-two>().commit() to run multiple operations.
Pending changes are applied on commit.
We can also use context managers to make more changes. For example:
with table.update_statistics() as update:
update.set_statistics(snapshot_id=1, statistics_file=statistics_file)
update.remove_statistics(snapshot_id=2)
"""
return UpdateStatistics(transaction=Transaction(self, autocommit=True))

def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
"""Create a new UpdateSchema to alter the columns of this table.
Expand Down
9 changes: 9 additions & 0 deletions pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
SortOrder,
assign_fresh_sort_order_ids,
)
from pyiceberg.table.statistics import StatisticsFile
from pyiceberg.typedef import (
EMPTY_DICT,
IcebergBaseModel,
Expand Down Expand Up @@ -221,6 +222,14 @@ class TableMetadataCommonFields(IcebergBaseModel):
There is always a main branch reference pointing to the
current-snapshot-id even if the refs map is null."""

statistics: List[StatisticsFile] = Field(default_factory=list)
"""A optional list of table statistics files.
Table statistics files are valid Puffin files. Statistics are
informational. A reader can choose to ignore statistics
information. Statistics support is not required to read the
table correctly. A table can contain many statistics files
associated with different table snapshots."""

# validators
@field_validator("properties", mode="before")
def transform_properties_dict_value_to_str(cls, properties: Properties) -> Dict[str, str]:
Expand Down
49 changes: 49 additions & 0 deletions pyiceberg/table/statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import (
Dict,
List,
Optional,
)

from pydantic import Field

from pyiceberg.typedef import IcebergBaseModel


class BlobMetadata(IcebergBaseModel):
type: str
snapshot_id: int = Field(alias="snapshot-id")
sequence_number: int = Field(alias="sequence-number")
fields: List[int]
properties: Optional[Dict[str, str]] = None


class StatisticsFile(IcebergBaseModel):
snapshot_id: int = Field(alias="snapshot-id")
statistics_path: str = Field(alias="statistics-path")
file_size_in_bytes: int = Field(alias="file-size-in-bytes")
file_footer_size_in_bytes: int = Field(alias="file-footer-size-in-bytes")
key_metadata: Optional[str] = Field(alias="key-metadata", default=None)
blob_metadata: List[BlobMetadata] = Field(alias="blob-metadata")


def filter_statistics_by_snapshot_id(
statistics: List[StatisticsFile],
reject_snapshot_id: int,
) -> List[StatisticsFile]:
return [stat for stat in statistics if stat.snapshot_id != reject_snapshot_id]
36 changes: 36 additions & 0 deletions pyiceberg/table/update/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
SnapshotLogEntry,
)
from pyiceberg.table.sorting import SortOrder
from pyiceberg.table.statistics import StatisticsFile, filter_statistics_by_snapshot_id
from pyiceberg.typedef import (
IcebergBaseModel,
Properties,
Expand Down Expand Up @@ -174,6 +175,17 @@ class RemovePropertiesUpdate(IcebergBaseModel):
removals: List[str]


class SetStatisticsUpdate(IcebergBaseModel):
action: Literal["set-statistics"] = Field(default="set-statistics")
snapshot_id: int = Field(alias="snapshot-id")
statistics: StatisticsFile


class RemoveStatisticsUpdate(IcebergBaseModel):
action: Literal["remove-statistics"] = Field(default="remove-statistics")
snapshot_id: int = Field(alias="snapshot-id")


TableUpdate = Annotated[
Union[
AssignUUIDUpdate,
Expand All @@ -191,6 +203,8 @@ class RemovePropertiesUpdate(IcebergBaseModel):
SetLocationUpdate,
SetPropertiesUpdate,
RemovePropertiesUpdate,
SetStatisticsUpdate,
RemoveStatisticsUpdate,
],
Field(discriminator="action"),
]
Expand Down Expand Up @@ -475,6 +489,28 @@ def _(
return base_metadata.model_copy(update={"default_sort_order_id": new_sort_order_id})


@_apply_table_update.register(SetStatisticsUpdate)
def _(update: SetStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
if update.snapshot_id != update.statistics.snapshot_id:
raise ValueError("Snapshot id in statistics does not match the snapshot id in the update")

statistics = filter_statistics_by_snapshot_id(base_metadata.statistics, update.snapshot_id)
context.add_update(update)

return base_metadata.model_copy(update={"statistics": statistics + [update.statistics]})


@_apply_table_update.register(RemoveStatisticsUpdate)
def _(update: RemoveStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
if not any(stat.snapshot_id == update.snapshot_id for stat in base_metadata.statistics):
raise ValueError(f"Statistics with snapshot id {update.snapshot_id} does not exist")

statistics = filter_statistics_by_snapshot_id(base_metadata.statistics, update.snapshot_id)
context.add_update(update)

return base_metadata.model_copy(update={"statistics": statistics})


def update_table_metadata(
base_metadata: TableMetadata,
updates: Tuple[TableUpdate, ...],
Expand Down
75 changes: 75 additions & 0 deletions pyiceberg/table/update/statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import TYPE_CHECKING, Tuple

from pyiceberg.table.statistics import StatisticsFile
from pyiceberg.table.update import (
RemoveStatisticsUpdate,
SetStatisticsUpdate,
TableUpdate,
UpdatesAndRequirements,
UpdateTableMetadata,
)

if TYPE_CHECKING:
from pyiceberg.table import Transaction


class UpdateStatistics(UpdateTableMetadata["UpdateStatistics"]):
"""
Run statistics management operations using APIs.
APIs include set_statistics and remove statistics operations.
Use table.update_statistics().<operation>().commit() to run a specific operation.
Use table.update_statistics().<operation-one>().<operation-two>().commit() to run multiple operations.
Pending changes are applied on commit.
We can also use context managers to make more changes. For example:
with table.update_statistics() as update:
update.set_statistics(snapshot_id=1, statistics_file=statistics_file)
update.remove_statistics(snapshot_id=2)
"""

_updates: Tuple[TableUpdate, ...] = ()

def __init__(self, transaction: "Transaction") -> None:
super().__init__(transaction)

def set_statistics(self, snapshot_id: int, statistics_file: StatisticsFile) -> "UpdateStatistics":
self._updates += (
SetStatisticsUpdate(
snapshot_id=snapshot_id,
statistics=statistics_file,
),
)

return self

def remove_statistics(self, snapshot_id: int) -> "UpdateStatistics":
self._updates = (
RemoveStatisticsUpdate(
snapshot_id=snapshot_id,
),
)

return self

def _commit(self) -> UpdatesAndRequirements:
return self._updates, ()
Loading

0 comments on commit adfc971

Please sign in to comment.