-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ResidualVisitor to compute residuals #1388
base: main
Are you sure you want to change the base?
Changes from 21 commits
731542e
c6c971e
da18837
3104a2f
c2740ea
90bca84
f7202b9
c7205b3
09f9c10
1e9da22
3ab20d4
091c0af
3cd797d
6b0924e
96cb4e9
212c83b
8bc65fa
8bb039f
0019f92
a372a93
ab4c000
f5a871b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -62,7 +62,7 @@ | |||||||||||||||||||
from pyiceberg.manifest import DataFile, ManifestFile, PartitionFieldSummary | ||||||||||||||||||||
from pyiceberg.partitioning import PartitionSpec | ||||||||||||||||||||
from pyiceberg.schema import Schema | ||||||||||||||||||||
from pyiceberg.typedef import EMPTY_DICT, L, StructProtocol | ||||||||||||||||||||
from pyiceberg.typedef import EMPTY_DICT, L, Record, StructProtocol | ||||||||||||||||||||
from pyiceberg.types import ( | ||||||||||||||||||||
DoubleType, | ||||||||||||||||||||
FloatType, | ||||||||||||||||||||
|
@@ -1731,3 +1731,214 @@ def _can_contain_nulls(self, field_id: int) -> bool: | |||||||||||||||||||
|
||||||||||||||||||||
def _can_contain_nans(self, field_id: int) -> bool: | ||||||||||||||||||||
return (nan_count := self.nan_counts.get(field_id)) is not None and nan_count > 0 | ||||||||||||||||||||
|
||||||||||||||||||||
|
||||||||||||||||||||
class ResidualVisitor(BoundBooleanExpressionVisitor[BooleanExpression], ABC): | ||||||||||||||||||||
schema: Schema | ||||||||||||||||||||
spec: PartitionSpec | ||||||||||||||||||||
case_sensitive: bool | ||||||||||||||||||||
|
||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add:
to the class variables as well? |
||||||||||||||||||||
def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: bool, expr: BooleanExpression): | ||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||
self.schema = schema | ||||||||||||||||||||
self.spec = spec | ||||||||||||||||||||
self.case_sensitive = case_sensitive | ||||||||||||||||||||
self.expr = expr | ||||||||||||||||||||
|
||||||||||||||||||||
def eval(self, partition_data: Record) -> BooleanExpression: | ||||||||||||||||||||
self.struct = partition_data | ||||||||||||||||||||
return visit(self.expr, visitor=self) | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_true(self) -> BooleanExpression: | ||||||||||||||||||||
return AlwaysTrue() | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_false(self) -> BooleanExpression: | ||||||||||||||||||||
return AlwaysFalse() | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_not(self, child_result: BooleanExpression) -> BooleanExpression: | ||||||||||||||||||||
return Not(child_result) | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_and(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression: | ||||||||||||||||||||
return And(left_result, right_result) | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_or(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression: | ||||||||||||||||||||
return Or(left_result, right_result) | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_is_null(self, term: BoundTerm[L]) -> BooleanExpression: | ||||||||||||||||||||
if term.eval(self.struct) is None: | ||||||||||||||||||||
return AlwaysTrue() | ||||||||||||||||||||
else: | ||||||||||||||||||||
return AlwaysFalse() | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_not_null(self, term: BoundTerm[L]) -> BooleanExpression: | ||||||||||||||||||||
if term.eval(self.struct) is not None: | ||||||||||||||||||||
return AlwaysTrue() | ||||||||||||||||||||
else: | ||||||||||||||||||||
return AlwaysFalse() | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_is_nan(self, term: BoundTerm[L]) -> BooleanExpression: | ||||||||||||||||||||
val = term.eval(self.struct) | ||||||||||||||||||||
if val is None: | ||||||||||||||||||||
return self.visit_true() | ||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to Java, I think we can return
Suggested change
|
||||||||||||||||||||
else: | ||||||||||||||||||||
return self.visit_false() | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_not_nan(self, term: BoundTerm[L]) -> BooleanExpression: | ||||||||||||||||||||
val = term.eval(self.struct) | ||||||||||||||||||||
if val is not None: | ||||||||||||||||||||
return self.visit_true() | ||||||||||||||||||||
else: | ||||||||||||||||||||
return self.visit_false() | ||||||||||||||||||||
Comment on lines
+1786
to
+1790
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Java takes a different approach and checks for
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change is causing a type casting problem as
|
||||||||||||||||||||
|
||||||||||||||||||||
def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression: | ||||||||||||||||||||
if term.eval(self.struct) < literal.value: | ||||||||||||||||||||
return self.visit_true() | ||||||||||||||||||||
else: | ||||||||||||||||||||
return self.visit_false() | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_less_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression: | ||||||||||||||||||||
if term.eval(self.struct) <= literal.value: | ||||||||||||||||||||
return self.visit_true() | ||||||||||||||||||||
else: | ||||||||||||||||||||
return self.visit_false() | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression: | ||||||||||||||||||||
if term.eval(self.struct) > literal.value: | ||||||||||||||||||||
return self.visit_true() | ||||||||||||||||||||
else: | ||||||||||||||||||||
return self.visit_false() | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression: | ||||||||||||||||||||
if term.eval(self.struct) >= literal.value: | ||||||||||||||||||||
return self.visit_true() | ||||||||||||||||||||
else: | ||||||||||||||||||||
return self.visit_false() | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression: | ||||||||||||||||||||
if term.eval(self.struct) == literal.value: | ||||||||||||||||||||
return self.visit_true() | ||||||||||||||||||||
else: | ||||||||||||||||||||
return self.visit_false() | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression: | ||||||||||||||||||||
if term.eval(self.struct) != literal.value: | ||||||||||||||||||||
return self.visit_true() | ||||||||||||||||||||
else: | ||||||||||||||||||||
return self.visit_false() | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> BooleanExpression: | ||||||||||||||||||||
if term.eval(self.struct) in literals: | ||||||||||||||||||||
return self.visit_true() | ||||||||||||||||||||
else: | ||||||||||||||||||||
return self.visit_false() | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> BooleanExpression: | ||||||||||||||||||||
if term.eval(self.struct) not in literals: | ||||||||||||||||||||
return self.visit_true() | ||||||||||||||||||||
else: | ||||||||||||||||||||
return self.visit_false() | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression: | ||||||||||||||||||||
eval_res = term.eval(self.struct) | ||||||||||||||||||||
if eval_res is not None and str(eval_res).startswith(str(literal.value)): | ||||||||||||||||||||
return AlwaysTrue() | ||||||||||||||||||||
else: | ||||||||||||||||||||
return AlwaysFalse() | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression: | ||||||||||||||||||||
if not self.visit_starts_with(term, literal): | ||||||||||||||||||||
return AlwaysTrue() | ||||||||||||||||||||
else: | ||||||||||||||||||||
return AlwaysFalse() | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> BooleanExpression: | ||||||||||||||||||||
""" | ||||||||||||||||||||
If there is no strict projection or if it evaluates to false, then return the predicate. | ||||||||||||||||||||
|
||||||||||||||||||||
Get the strict projection and inclusive projection of this predicate in partition data, | ||||||||||||||||||||
then use them to determine whether to return the original predicate. The strict projection | ||||||||||||||||||||
returns true iff the original predicate would have returned true, so the predicate can be | ||||||||||||||||||||
eliminated if the strict projection evaluates to true. Similarly the inclusive projection | ||||||||||||||||||||
returns false iff the original predicate would have returned false, so the predicate can | ||||||||||||||||||||
also be eliminated if the inclusive projection evaluates to false. | ||||||||||||||||||||
|
||||||||||||||||||||
""" | ||||||||||||||||||||
parts = self.spec.fields_by_source_id(predicate.term.ref().field.field_id) | ||||||||||||||||||||
if parts == []: | ||||||||||||||||||||
return predicate | ||||||||||||||||||||
|
||||||||||||||||||||
from pyiceberg.types import StructType | ||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's move this import to the top |
||||||||||||||||||||
|
||||||||||||||||||||
def struct_to_schema(struct: StructType) -> Schema: | ||||||||||||||||||||
return Schema(*list(struct.fields)) | ||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The conversion to a list is not needed:
Suggested change
|
||||||||||||||||||||
|
||||||||||||||||||||
for part in parts: | ||||||||||||||||||||
strict_projection = part.transform.strict_project(part.name, predicate) | ||||||||||||||||||||
strict_result = None | ||||||||||||||||||||
|
||||||||||||||||||||
if strict_projection is not None: | ||||||||||||||||||||
bound = strict_projection.bind(struct_to_schema(self.spec.partition_type(self.schema))) | ||||||||||||||||||||
if isinstance(bound, BoundPredicate): | ||||||||||||||||||||
strict_result = super().visit_bound_predicate(bound) | ||||||||||||||||||||
else: | ||||||||||||||||||||
strict_result = bound | ||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's keep the comments from Java in here, I think they are pretty helpful:
Suggested change
|
||||||||||||||||||||
|
||||||||||||||||||||
if strict_result is not None and isinstance(strict_result, AlwaysTrue): | ||||||||||||||||||||
return AlwaysTrue() | ||||||||||||||||||||
|
||||||||||||||||||||
inclusive_projection = part.transform.project(part.name, predicate) | ||||||||||||||||||||
inclusive_result = None | ||||||||||||||||||||
if inclusive_projection is not None: | ||||||||||||||||||||
bound_inclusive = inclusive_projection.bind(struct_to_schema(self.spec.partition_type(self.schema))) | ||||||||||||||||||||
if isinstance(bound_inclusive, BoundPredicate): | ||||||||||||||||||||
# using predicate method specific to inclusive | ||||||||||||||||||||
inclusive_result = super().visit_bound_predicate(bound_inclusive) | ||||||||||||||||||||
else: | ||||||||||||||||||||
# if the result is not a predicate, then it must be a constant like alwaysTrue or | ||||||||||||||||||||
# alwaysFalse | ||||||||||||||||||||
inclusive_result = bound_inclusive | ||||||||||||||||||||
if inclusive_result is not None and isinstance(inclusive_result, AlwaysFalse): | ||||||||||||||||||||
return AlwaysFalse() | ||||||||||||||||||||
|
||||||||||||||||||||
return predicate | ||||||||||||||||||||
|
||||||||||||||||||||
def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpression: | ||||||||||||||||||||
bound = predicate.bind(self.schema, case_sensitive=True) | ||||||||||||||||||||
|
||||||||||||||||||||
if isinstance(bound, BoundPredicate): | ||||||||||||||||||||
bound_residual = self.visit_bound_predicate(predicate=bound) | ||||||||||||||||||||
# if isinstance(bound_residual, BooleanExpression): | ||||||||||||||||||||
if bound_residual not in (AlwaysFalse(), AlwaysTrue()): | ||||||||||||||||||||
# replace inclusive original unbound predicate | ||||||||||||||||||||
return predicate | ||||||||||||||||||||
|
||||||||||||||||||||
# use the non-predicate residual (e.g. alwaysTrue) | ||||||||||||||||||||
return bound_residual | ||||||||||||||||||||
|
||||||||||||||||||||
# if binding didn't result in a Predicate, return the expression | ||||||||||||||||||||
return bound | ||||||||||||||||||||
|
||||||||||||||||||||
|
||||||||||||||||||||
class ResidualEvaluator(ResidualVisitor): | ||||||||||||||||||||
def residual_for(self, partition_data: Record) -> BooleanExpression: | ||||||||||||||||||||
return self.eval(partition_data) | ||||||||||||||||||||
|
||||||||||||||||||||
|
||||||||||||||||||||
class UnpartitionedResidualEvaluator(ResidualEvaluator): | ||||||||||||||||||||
# Finds the residuals for an Expression the partitions in the given PartitionSpec | ||||||||||||||||||||
def __init__(self, schema: Schema, expr: BooleanExpression): | ||||||||||||||||||||
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC | ||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's move this import to the top as well 👍 |
||||||||||||||||||||
|
||||||||||||||||||||
super().__init__(schema=schema, spec=UNPARTITIONED_PARTITION_SPEC, expr=expr, case_sensitive=False) | ||||||||||||||||||||
self.expr = expr | ||||||||||||||||||||
|
||||||||||||||||||||
def residual_for(self, partition_data: Record) -> BooleanExpression: | ||||||||||||||||||||
return self.expr | ||||||||||||||||||||
|
||||||||||||||||||||
|
||||||||||||||||||||
def residual_evaluator_of( | ||||||||||||||||||||
spec: PartitionSpec, expr: BooleanExpression, case_sensitive: bool, schema: Schema | ||||||||||||||||||||
) -> ResidualEvaluator: | ||||||||||||||||||||
if len(spec.fields) != 0: | ||||||||||||||||||||
return ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive) | ||||||||||||||||||||
else: | ||||||||||||||||||||
return UnpartitionedResidualEvaluator(schema=schema, expr=expr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we copy this comment?
https://github.com/apache/iceberg/blob/5fd16b5bfeb85e12b5a9ecb4e39504389d7b72ed/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java#L32
I think that would be helpful for the reader.