Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ResidualVisitor to compute residuals #1388

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 247 additions & 0 deletions pyiceberg/expressions/residual_evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from abc import ABC
from typing import Any, Set

from pyiceberg.expressions import And, Or
from pyiceberg.expressions.literals import Literal
from pyiceberg.expressions.visitors import (
AlwaysFalse,
AlwaysTrue,
BooleanExpression,
BoundBooleanExpressionVisitor,
BoundPredicate,
BoundTerm,
Not,
UnboundPredicate,
visit,
)
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.typedef import Record
from pyiceberg.types import L


class ResidualVisitor(BoundBooleanExpressionVisitor[BooleanExpression], ABC):
schema: Schema
spec: PartitionSpec
case_sensitive: bool

def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: bool, expr: BooleanExpression):
self.schema = schema
self.spec = spec
self.case_sensitive = case_sensitive
self.expr = expr

def eval(self, partition_data: Record) -> BooleanExpression:
self.struct = partition_data
return visit(self.expr, visitor=self)

def visit_true(self) -> BooleanExpression:
return AlwaysTrue()

def visit_false(self) -> BooleanExpression:
return AlwaysFalse()

def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
return Not(child_result)

def visit_and(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
return And(left_result, right_result)

def visit_or(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
return Or(left_result, right_result)

def visit_is_null(self, term: BoundTerm[L]) -> BooleanExpression:
if term.eval(self.struct) is None:
return AlwaysTrue()
else:
return AlwaysFalse()

def visit_not_null(self, term: BoundTerm[L]) -> BooleanExpression:
if term.eval(self.struct) is not None:
return AlwaysTrue()
else:
return AlwaysFalse()

def visit_is_nan(self, term: BoundTerm[L]) -> BooleanExpression:
val = term.eval(self.struct)
if val is None:
return self.visit_true()
else:
return self.visit_false()

def visit_not_nan(self, term: BoundTerm[L]) -> BooleanExpression:
val = term.eval(self.struct)
if val is not None:
return self.visit_true()
else:
return self.visit_false()

def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
if term.eval(self.struct) < literal.value:
return self.visit_true()
else:
return self.visit_false()

def visit_less_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
if term.eval(self.struct) <= literal.value:
return self.visit_true()
else:
return self.visit_false()

def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
if term.eval(self.struct) > literal.value:
return self.visit_true()
else:
return self.visit_false()

def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
if term.eval(self.struct) >= literal.value:
return self.visit_true()
else:
return self.visit_false()

def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
if term.eval(self.struct) == literal.value:
return self.visit_true()
else:
return self.visit_false()

def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
if term.eval(self.struct) != literal.value:
return self.visit_true()
else:
return self.visit_false()

def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> BooleanExpression:
if term.eval(self.struct) in literals:
return self.visit_true()
else:
return self.visit_false()

def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> BooleanExpression:
if term.eval(self.struct) not in literals:
return self.visit_true()
else:
return self.visit_false()

def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
eval_res = term.eval(self.struct)
if eval_res is not None and str(eval_res).startswith(str(literal.value)):
return AlwaysTrue()
else:
return AlwaysFalse()

def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
if not self.visit_starts_with(term, literal):
return AlwaysTrue()
else:
return AlwaysFalse()

def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> BooleanExpression:
"""
If there is no strict projection or if it evaluates to false, then return the predicate.

Get the strict projection and inclusive projection of this predicate in partition data,
then use them to determine whether to return the original predicate. The strict projection
returns true iff the original predicate would have returned true, so the predicate can be
eliminated if the strict projection evaluates to true. Similarly the inclusive projection
returns false iff the original predicate would have returned false, so the predicate can
also be eliminated if the inclusive projection evaluates to false.

"""
parts = self.spec.fields_by_source_id(predicate.term.ref().field.field_id)
if parts == []:
return predicate

from pyiceberg.types import StructType

def struct_to_schema(struct: StructType) -> Schema:
return Schema(*[f for f in struct.fields])

for part in parts:
strict_projection = part.transform.strict_project(part.name, predicate)
strict_result = None

if strict_projection is not None:
bound = strict_projection.bind(struct_to_schema(self.spec.partition_type(self.schema)))
if isinstance(bound, BoundPredicate):
strict_result = super().visit_bound_predicate(bound)
else:
strict_result = bound

if strict_result is not None and isinstance(strict_result, AlwaysTrue):
return AlwaysTrue()

inclusive_projection = part.transform.project(part.name, predicate)
inclusive_result = None
if inclusive_projection is not None:
bound_inclusive = inclusive_projection.bind(struct_to_schema(self.spec.partition_type(self.schema)))
if isinstance(bound_inclusive, BoundPredicate):
# using predicate method specific to inclusive
inclusive_result = super().visit_bound_predicate(bound_inclusive)
else:
# if the result is not a predicate, then it must be a constant like alwaysTrue or
# alwaysFalse
inclusive_result = bound_inclusive
if inclusive_result is not None and isinstance(inclusive_result, AlwaysFalse):
return AlwaysFalse()

return predicate

def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpression:
bound = predicate.bind(self.schema, case_sensitive=True)

if isinstance(bound, BoundPredicate):
bound_residual = self.visit_bound_predicate(predicate=bound)
# if isinstance(bound_residual, BooleanExpression):
if bound_residual not in (AlwaysFalse(), AlwaysTrue()):
# replace inclusive original unbound predicate
return predicate

# use the non-predicate residual (e.g. alwaysTrue)
return bound_residual

# if binding didn't result in a Predicate, return the expression
return bound


class ResidualEvaluator(ResidualVisitor):
def residual_for(self, partition_data: Record) -> BooleanExpression:
return self.eval(partition_data)


class UnpartitionedResidualEvaluator(ResidualEvaluator):
# Finds the residuals for an Expression the partitions in the given PartitionSpec
def __init__(self, schema: Schema, expr: BooleanExpression):
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC

super().__init__(schema=schema, spec=UNPARTITIONED_PARTITION_SPEC, expr=expr, case_sensitive=False)
self.expr = expr

def residual_for(self, partition_data: Record) -> BooleanExpression:
return self.expr


def residual_evaluator_of(
spec: PartitionSpec, expr: BooleanExpression, case_sensitive: bool, schema: Schema
) -> ResidualEvaluator:
if len(spec.fields) != 0:
return ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive)
else:
return UnpartitionedResidualEvaluator(schema=schema, expr=expr)
42 changes: 40 additions & 2 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,9 @@ def filter(self: S, expr: Union[str, BooleanExpression]) -> S:
def with_case_sensitive(self: S, case_sensitive: bool = True) -> S:
return self.update(case_sensitive=case_sensitive)

@abstractmethod
def count(self) -> int: ...


class ScanTask(ABC):
pass
Expand All @@ -1341,19 +1344,21 @@ class FileScanTask(ScanTask):
delete_files: Set[DataFile]
start: int
length: int
residual: BooleanExpression

def __init__(
self,
data_file: DataFile,
delete_files: Optional[Set[DataFile]] = None,
start: Optional[int] = None,
length: Optional[int] = None,
residual: BooleanExpression = None
) -> None:
self.file = data_file
self.delete_files = delete_files or set()
self.start = start or 0
self.length = length or data_file.file_size_in_bytes

self.residual = residual

def _open_manifest(
io: FileIO,
Expand Down Expand Up @@ -1513,13 +1518,23 @@ def plan_files(self) -> Iterable[FileScanTask]:
else:
raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")



from pyiceberg.expressions.residual_evaluator import residual_evaluator_of
residual_evaluator = residual_evaluator_of(
spec=self.table_metadata.spec(),
expr=self.row_filter,
case_sensitive=self.case_sensitive,
schema=self.table_metadata.schema()
)
return [
FileScanTask(
data_entry.data_file,
data_file=data_entry.data_file,
delete_files=_match_deletes_to_data_file(
data_entry,
positional_delete_entries,
),
residual=residual_evaluator.residual_for(data_entry.data_file.partition)
)
for data_entry in data_entries
]
Expand Down Expand Up @@ -1594,6 +1609,29 @@ def to_ray(self) -> ray.data.dataset.Dataset:

return ray.data.from_arrow(self.to_arrow())

def count(self) -> int:
"""
Usage: calutates the total number of records in a Scan that haven't had positional deletes
"""
res = 0
# every task is a FileScanTask
tasks = self.plan_files()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this count will not be accurate when there are deletes files

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kevinjqliu thank you for the review. I am trying to account for positional deletes, do you have a suggestion on how that can be achieved?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this can be widely off, not just because the merge-on-read deletes, but because plan_files returns all the files that (might) contain relevant rows. For example, if it cannot be determined if has relevant data, it will be returned by plan_files.

I think there are two ways forward:

  • One is similar to how we handle deletes. For deletes, we check if the whole file matches, if this is the case, then we can simply drop the file from the metadata. You can find the code here. If a file fully matches, is is valid to use task.file.record_count. We would need to extend this to also see if there are also merge-on-read deletes as Kevin already mentioned, or just fail when there are positional deletes.
  • A cleaner option, but this is a bit more work, but pretty exciting, would be to include the residual-predicate in the FileScanTask. When we run a query, like day_added = 2024-12-01 and user_id = 10, then the day_added = 2024-12-01 might be satisfied with the partitioning already. This is the case when the table is partitioned by day, and we know that all the data in the file evaluates true for day_added = 2024-12-01, then we need to open the file, and filter for user_id = 10. If we would leave out the user_id = 10, then it would be ALWAYS_TRUE, and then we know that we can just use task.file.record_count. This way we could very easily loop over the .plan_files():
def count(self) -> int:
    res = 0
    tasks = self.plan_files()
    for task in tasks:
        if task.residual == ALWAYS_TRUE and len(task.delete_files):
            res += task.file.record_count
            else:
            # pseudocode, open the table, and apply the filter and deletes
            res += len(_table_from_scan_task(task))
    return res

To get to the second step, we first have to port the ResidualEvaluator. The java code can be found here, including some excellent tests.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Fokko I have added Residual Evaluator with Tests.
Now I am trying to create the breaking tests for count where delete has occurred and the counts should differ

@pytest.mark.integration
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
def test_delete_partitioned_table_positional_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None:
    identifier = "default.table_partitioned_delete"

    run_spark_commands(
        spark,
        [
            f"DROP TABLE IF EXISTS {identifier}",
            f"""
            CREATE TABLE {identifier} (
                number_partitioned  int,
                number              int
            )
            USING iceberg
            PARTITIONED BY (number_partitioned)
            TBLPROPERTIES(
                'format-version' = 2,
                'write.delete.mode'='merge-on-read',
                'write.update.mode'='merge-on-read',
                'write.merge.mode'='merge-on-read'
            )
        """,
            f"""
            INSERT INTO {identifier} VALUES (10, 20), (10, 30), (10, 40)
        """,
            # Generate a positional delete
            f"""
            DELETE FROM {identifier} WHERE number = 30
        """,
        ],
    )

    tbl = session_catalog.load_table(identifier)

    # Assert that there is just a single Parquet file, that has one merge on read file
    files = list(tbl.scan().plan_files())
    assert len(files) == 1
    assert len(files[0].delete_files) == 1
    # Will rewrite a data file without the positional delete
    assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10, 10], "number": [20, 40]}
    assert tbl.scan().count() == 2
    assert tbl.scan().count() == 1

    tbl.delete(EqualTo("number", 40))

    # One positional delete has been added, but an OVERWRITE status is set
    # https://github.com/apache/iceberg/issues/10122
    assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "overwrite", "overwrite"]
    assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10], "number": [20]}
    assert tbl.scan().count() == 1


for task in tasks:
# task.residual is a Boolean Expression if the fiter condition is fully satisfied by the
# partition value and task.delete_files represents that positional delete haven't been merged yet
# hence those files have to read as a pyarrow table applying the filter and deletes
if task.residual == AlwaysTrue() and not len(task.delete_files):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a bit more explicit:

Suggested change
if task.residual == AlwaysTrue() and not len(task.delete_files):
if task.residual == AlwaysTrue() and len(task.delete_files) == 0:

# Every File has a metadata stat that stores the file record count
res += task.file.record_count
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the positional deletes are missing from the else: branch. How about re-using _task_to_record_batches for now?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko I have added test of positional delete and it works. ArrowScan.to_record_batches calls `_task_to_record_batches' internally. Can you please take another look.

from pyiceberg.io.pyarrow import ArrowScan
tbl = ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_table([task])
res += len(tbl)
return res
Copy link

@gli-chris-hao gli-chris-hao Dec 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love this approach! My only concern is about loading too much data into memory at once, although this is loading just one file at a time, in the worst case some file could potentially be very large? Shall we define a threshold and check, for example, if file size < XXX, load entire file, otherwise turn it into pa.RecordBatchReader and read stream of record batches for counting.

target_schema = schema_to_pyarrow(self.projection())

batches = ArrowScan(
    self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_record_batches([task])

reader = pa.RecordBatchReader.from_batches(
    target_schema,
    batches,
)

count = 0
for batch in reader:
    count += batch.num_rows
return count

https://github.com/apache/iceberg-python/blob/main/pyiceberg/table/__init__.py#L1541-L1564



@dataclass(frozen=True)
class WriteTask:
Expand Down
52 changes: 52 additions & 0 deletions tests/catalog/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,58 @@ def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema, table_id
assert df == table.scan().to_arrow()


@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
lazy_fixture("catalog_sqlite_without_rowcount"),
lazy_fixture("catalog_sqlite_fsspec"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
lazy_fixture("random_table_identifier_with_catalog"),
],
)
def test_count_table(catalog: SqlCatalog, table_schema_simple: Schema, table_identifier: Identifier) -> None:
table_identifier_nocatalog = catalog._identifier_to_tuple_without_catalog(table_identifier)
namespace = Catalog.namespace_from(table_identifier_nocatalog)
catalog.create_namespace(namespace)
table = catalog.create_table(table_identifier, table_schema_simple)

df = pa.Table.from_pydict(
{
"foo": ["a"],
"bar": [1],
"baz": [True],
},
schema=schema_to_pyarrow(table_schema_simple),
)

table.append(df)

# new snapshot is written in APPEND mode
assert len(table.metadata.snapshots) == 1
assert table.metadata.snapshots[0].snapshot_id == table.metadata.current_snapshot_id
assert table.metadata.snapshots[0].parent_snapshot_id is None
assert table.metadata.snapshots[0].sequence_number == 1
assert table.metadata.snapshots[0].summary is not None
assert table.metadata.snapshots[0].summary.operation == Operation.APPEND
assert table.metadata.snapshots[0].summary["added-data-files"] == "1"
assert table.metadata.snapshots[0].summary["added-records"] == "1"
assert table.metadata.snapshots[0].summary["total-data-files"] == "1"
assert table.metadata.snapshots[0].summary["total-records"] == "1"
assert len(table.metadata.metadata_log) == 1

# read back the data
assert df == table.scan().to_arrow()
assert len(table.scan().to_arrow()) == table.scan().count()


@pytest.mark.parametrize(
"catalog",
[
Expand Down
Loading