Skip to content

Commit

Permalink
Add table statistics (#1285)
Browse files Browse the repository at this point in the history
* Add table statistics update

* Update pyiceberg/table/statistics.py

Co-authored-by: Fokko Driesprong <[email protected]>

* Update mkdocs/docs/api.md

Co-authored-by: Fokko Driesprong <[email protected]>

* Update mkdocs/docs/api.md

Co-authored-by: Fokko Driesprong <[email protected]>

* Add Literal import

* Rewrite tests

---------

Co-authored-by: Fokko Driesprong <[email protected]>
  • Loading branch information
ndrluis and Fokko authored Jan 16, 2025
1 parent f4caa3a commit 0a3a886
Show file tree
Hide file tree
Showing 10 changed files with 488 additions and 2 deletions.
23 changes: 23 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,29 @@ with table.manage_snapshots() as ms:
ms.create_branch(snapshot_id1, "Branch_A").create_tag(snapshot_id2, "tag789")
```

## Table Statistics Management

Manage table statistics with operations through the `Table` API:

```python
# To run a specific operation
table.update_statistics().set_statistics(snapshot_id=1, statistics_file=statistics_file).commit()
# To run multiple operations
table.update_statistics()
.set_statistics(snapshot_id1, statistics_file1)
.remove_statistics(snapshot_id2)
.commit()
# Operations are applied on commit.
```

You can also use context managers to make more changes:

```python
with table.update_statistics() as update:
update.set_statistics(snaphsot_id1, statistics_file)
update.remove_statistics(snapshot_id2)
```

## Query the data

To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID:
Expand Down
18 changes: 18 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
_FastAppendFiles,
)
from pyiceberg.table.update.spec import UpdateSpec
from pyiceberg.table.update.statistics import UpdateStatistics
from pyiceberg.transforms import IdentityTransform
from pyiceberg.typedef import (
EMPTY_DICT,
Expand Down Expand Up @@ -1043,6 +1044,23 @@ def manage_snapshots(self) -> ManageSnapshots:
"""
return ManageSnapshots(transaction=Transaction(self, autocommit=True))

def update_statistics(self) -> UpdateStatistics:
"""
Shorthand to run statistics management operations like add statistics and remove statistics.
Use table.update_statistics().<operation>().commit() to run a specific operation.
Use table.update_statistics().<operation-one>().<operation-two>().commit() to run multiple operations.
Pending changes are applied on commit.
We can also use context managers to make more changes. For example:
with table.update_statistics() as update:
update.set_statistics(snapshot_id=1, statistics_file=statistics_file)
update.remove_statistics(snapshot_id=2)
"""
return UpdateStatistics(transaction=Transaction(self, autocommit=True))

def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
"""Create a new UpdateSchema to alter the columns of this table.
Expand Down
9 changes: 9 additions & 0 deletions pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
SortOrder,
assign_fresh_sort_order_ids,
)
from pyiceberg.table.statistics import StatisticsFile
from pyiceberg.typedef import (
EMPTY_DICT,
IcebergBaseModel,
Expand Down Expand Up @@ -221,6 +222,14 @@ class TableMetadataCommonFields(IcebergBaseModel):
There is always a main branch reference pointing to the
current-snapshot-id even if the refs map is null."""

statistics: List[StatisticsFile] = Field(default_factory=list)
"""A optional list of table statistics files.
Table statistics files are valid Puffin files. Statistics are
informational. A reader can choose to ignore statistics
information. Statistics support is not required to read the
table correctly. A table can contain many statistics files
associated with different table snapshots."""

# validators
@field_validator("properties", mode="before")
def transform_properties_dict_value_to_str(cls, properties: Properties) -> Dict[str, str]:
Expand Down
45 changes: 45 additions & 0 deletions pyiceberg/table/statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Dict, List, Literal, Optional

from pydantic import Field

from pyiceberg.typedef import IcebergBaseModel


class BlobMetadata(IcebergBaseModel):
type: Literal["apache-datasketches-theta-v1", "deletion-vector-v1"]
snapshot_id: int = Field(alias="snapshot-id")
sequence_number: int = Field(alias="sequence-number")
fields: List[int]
properties: Optional[Dict[str, str]] = None


class StatisticsFile(IcebergBaseModel):
snapshot_id: int = Field(alias="snapshot-id")
statistics_path: str = Field(alias="statistics-path")
file_size_in_bytes: int = Field(alias="file-size-in-bytes")
file_footer_size_in_bytes: int = Field(alias="file-footer-size-in-bytes")
key_metadata: Optional[str] = Field(alias="key-metadata", default=None)
blob_metadata: List[BlobMetadata] = Field(alias="blob-metadata")


def filter_statistics_by_snapshot_id(
statistics: List[StatisticsFile],
reject_snapshot_id: int,
) -> List[StatisticsFile]:
return [stat for stat in statistics if stat.snapshot_id != reject_snapshot_id]
36 changes: 36 additions & 0 deletions pyiceberg/table/update/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
SnapshotLogEntry,
)
from pyiceberg.table.sorting import SortOrder
from pyiceberg.table.statistics import StatisticsFile, filter_statistics_by_snapshot_id
from pyiceberg.typedef import (
IcebergBaseModel,
Properties,
Expand Down Expand Up @@ -174,6 +175,17 @@ class RemovePropertiesUpdate(IcebergBaseModel):
removals: List[str]


class SetStatisticsUpdate(IcebergBaseModel):
action: Literal["set-statistics"] = Field(default="set-statistics")
snapshot_id: int = Field(alias="snapshot-id")
statistics: StatisticsFile


class RemoveStatisticsUpdate(IcebergBaseModel):
action: Literal["remove-statistics"] = Field(default="remove-statistics")
snapshot_id: int = Field(alias="snapshot-id")


TableUpdate = Annotated[
Union[
AssignUUIDUpdate,
Expand All @@ -191,6 +203,8 @@ class RemovePropertiesUpdate(IcebergBaseModel):
SetLocationUpdate,
SetPropertiesUpdate,
RemovePropertiesUpdate,
SetStatisticsUpdate,
RemoveStatisticsUpdate,
],
Field(discriminator="action"),
]
Expand Down Expand Up @@ -475,6 +489,28 @@ def _(
return base_metadata.model_copy(update={"default_sort_order_id": new_sort_order_id})


@_apply_table_update.register(SetStatisticsUpdate)
def _(update: SetStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
if update.snapshot_id != update.statistics.snapshot_id:
raise ValueError("Snapshot id in statistics does not match the snapshot id in the update")

statistics = filter_statistics_by_snapshot_id(base_metadata.statistics, update.snapshot_id)
context.add_update(update)

return base_metadata.model_copy(update={"statistics": statistics + [update.statistics]})


@_apply_table_update.register(RemoveStatisticsUpdate)
def _(update: RemoveStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
if not any(stat.snapshot_id == update.snapshot_id for stat in base_metadata.statistics):
raise ValueError(f"Statistics with snapshot id {update.snapshot_id} does not exist")

statistics = filter_statistics_by_snapshot_id(base_metadata.statistics, update.snapshot_id)
context.add_update(update)

return base_metadata.model_copy(update={"statistics": statistics})


def update_table_metadata(
base_metadata: TableMetadata,
updates: Tuple[TableUpdate, ...],
Expand Down
75 changes: 75 additions & 0 deletions pyiceberg/table/update/statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import TYPE_CHECKING, Tuple

from pyiceberg.table.statistics import StatisticsFile
from pyiceberg.table.update import (
RemoveStatisticsUpdate,
SetStatisticsUpdate,
TableUpdate,
UpdatesAndRequirements,
UpdateTableMetadata,
)

if TYPE_CHECKING:
from pyiceberg.table import Transaction


class UpdateStatistics(UpdateTableMetadata["UpdateStatistics"]):
"""
Run statistics management operations using APIs.
APIs include set_statistics and remove statistics operations.
Use table.update_statistics().<operation>().commit() to run a specific operation.
Use table.update_statistics().<operation-one>().<operation-two>().commit() to run multiple operations.
Pending changes are applied on commit.
We can also use context managers to make more changes. For example:
with table.update_statistics() as update:
update.set_statistics(snapshot_id=1, statistics_file=statistics_file)
update.remove_statistics(snapshot_id=2)
"""

_updates: Tuple[TableUpdate, ...] = ()

def __init__(self, transaction: "Transaction") -> None:
super().__init__(transaction)

def set_statistics(self, snapshot_id: int, statistics_file: StatisticsFile) -> "UpdateStatistics":
self._updates += (
SetStatisticsUpdate(
snapshot_id=snapshot_id,
statistics=statistics_file,
),
)

return self

def remove_statistics(self, snapshot_id: int) -> "UpdateStatistics":
self._updates = (
RemoveStatisticsUpdate(
snapshot_id=snapshot_id,
),
)

return self

def _commit(self) -> UpdatesAndRequirements:
return self._updates, ()
98 changes: 98 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,87 @@ def generate_snapshot(
"refs": {"test": {"snapshot-id": 3051729675574597004, "type": "tag", "max-ref-age-ms": 10000000}},
}

TABLE_METADATA_V2_WITH_STATISTICS = {
"format-version": 2,
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
"location": "s3://bucket/test/location",
"last-sequence-number": 34,
"last-updated-ms": 1602638573590,
"last-column-id": 3,
"current-schema-id": 0,
"schemas": [
{
"type": "struct",
"schema-id": 0,
"fields": [
{
"id": 1,
"name": "x",
"required": True,
"type": "long",
}
],
}
],
"default-spec-id": 0,
"partition-specs": [{"spec-id": 0, "fields": []}],
"last-partition-id": 1000,
"default-sort-order-id": 0,
"sort-orders": [{"order-id": 0, "fields": []}],
"properties": {},
"current-snapshot-id": 3055729675574597004,
"snapshots": [
{
"snapshot-id": 3051729675574597004,
"timestamp-ms": 1515100955770,
"sequence-number": 0,
"summary": {"operation": "append"},
"manifest-list": "s3://a/b/1.avro",
},
{
"snapshot-id": 3055729675574597004,
"parent-snapshot-id": 3051729675574597004,
"timestamp-ms": 1555100955770,
"sequence-number": 1,
"summary": {"operation": "append"},
"manifest-list": "s3://a/b/2.avro",
"schema-id": 1,
},
],
"statistics": [
{
"snapshot-id": 3051729675574597004,
"statistics-path": "s3://a/b/stats.puffin",
"file-size-in-bytes": 413,
"file-footer-size-in-bytes": 42,
"blob-metadata": [
{
"type": "apache-datasketches-theta-v1",
"snapshot-id": 3051729675574597004,
"sequence-number": 1,
"fields": [1],
}
],
},
{
"snapshot-id": 3055729675574597004,
"statistics-path": "s3://a/b/stats.puffin",
"file-size-in-bytes": 413,
"file-footer-size-in-bytes": 42,
"blob-metadata": [
{
"type": "deletion-vector-v1",
"snapshot-id": 3055729675574597004,
"sequence-number": 1,
"fields": [1],
}
],
},
],
"snapshot-log": [],
"metadata-log": [],
}


@pytest.fixture
def example_table_metadata_v2() -> Dict[str, Any]:
Expand All @@ -966,6 +1047,11 @@ def table_metadata_v2_with_fixed_and_decimal_types() -> Dict[str, Any]:
return TABLE_METADATA_V2_WITH_FIXED_AND_DECIMAL_TYPES


@pytest.fixture
def table_metadata_v2_with_statistics() -> Dict[str, Any]:
return TABLE_METADATA_V2_WITH_STATISTICS


@pytest.fixture(scope="session")
def metadata_location(tmp_path_factory: pytest.TempPathFactory) -> str:
from pyiceberg.io.pyarrow import PyArrowFileIO
Expand Down Expand Up @@ -2199,6 +2285,18 @@ def table_v2_with_extensive_snapshots(example_table_metadata_v2_with_extensive_s
)


@pytest.fixture
def table_v2_with_statistics(table_metadata_v2_with_statistics: Dict[str, Any]) -> Table:
table_metadata = TableMetadataV2(**table_metadata_v2_with_statistics)
return Table(
identifier=("database", "table"),
metadata=table_metadata,
metadata_location=f"{table_metadata.location}/uuid.metadata.json",
io=load_file_io(),
catalog=NoopCatalog("NoopCatalog"),
)


@pytest.fixture
def bound_reference_str() -> BoundReference[str]:
return BoundReference(field=NestedField(1, "field", StringType(), required=False), accessor=Accessor(position=0, inner=None))
Expand Down
Loading

0 comments on commit 0a3a886

Please sign in to comment.