Skip to content

Commit

Permalink
Make eval_sql_where available to DefaultPredicateEvaluator (#627)
Browse files Browse the repository at this point in the history
## What changes are proposed in this pull request?

Parquet footer skipping code includes (and uses) a helpful
`eval_sql_where` method that handles NULL values in comparisons
gracefully, by injecting null checking automatically into the
predicate's evaluation. It turns out that capability is also useful for
the other predicate evaluator implementations (especially now that
partition pruning will likely rely on the default predicate evaluator).
So we generalize the logic as the provided method
`PredicateEvaluator::eval_sql_where`. In order to support that method,
we also declare a new `eval_scalar_is_null` trait method, with
appropriate implementations. This has the side effect adding support for
literal null checks -- previously, only columns could be null-checked.

## How was this change tested?

Replace the existing unit test for the parquet skipping evaluator with
adapted versions for the default and stats skipping predicate evaluator,
which respectively verify that the provided method works correctly in
both bool-output and expression-output cases. The parquet skipping
module version is removed because it is redundant -- the default
evaluator exercises boolean output, and the data skipping evaluator
exercises column resolution.
  • Loading branch information
scovich authored Jan 16, 2025
1 parent 76c65c8 commit 8494126
Show file tree
Hide file tree
Showing 9 changed files with 393 additions and 294 deletions.
5 changes: 2 additions & 3 deletions kernel/src/engine/parquet_row_group_skipping.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
//! An implementation of parquet row group skipping using data skipping predicates over footer stats.
use crate::predicates::parquet_stats_skipping::{
ParquetStatsProvider, ParquetStatsSkippingFilter as _,
};
use crate::expressions::{ColumnName, Expression, Scalar, UnaryExpression, BinaryExpression, VariadicExpression};
use crate::predicates::parquet_stats_skipping::ParquetStatsProvider;
use crate::schema::{DataType, PrimitiveType};
use chrono::{DateTime, Days};
use parquet::arrow::arrow_reader::ArrowReaderBuilder;
Expand Down Expand Up @@ -57,6 +55,7 @@ impl<'a> RowGroupFilter<'a> {

/// Applies a filtering predicate to a row group. Return value false means to skip it.
fn apply(row_group: &'a RowGroupMetaData, predicate: &Expression) -> bool {
use crate::predicates::PredicateEvaluator as _;
RowGroupFilter::new(row_group, predicate).eval_sql_where(predicate) != Some(false)
}

Expand Down
11 changes: 11 additions & 0 deletions kernel/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ pub enum BinaryOperator {
}

impl BinaryOperator {
/// True if this is a comparison for which NULL input always produces NULL output
pub(crate) fn is_null_intolerant_comparison(&self) -> bool {
use BinaryOperator::*;
match self {
Plus | Minus | Multiply | Divide => false, // not a comparison
LessThan | LessThanOrEqual | GreaterThan | GreaterThanOrEqual => true,
Equal | NotEqual => true,
Distinct | In | NotIn => false, // tolerates NULL input
}
}

/// Returns `<op2>` (if any) such that `B <op2> A` is equivalent to `A <op> B`.
pub(crate) fn commute(&self) -> Option<BinaryOperator> {
use BinaryOperator::*;
Expand Down
192 changes: 176 additions & 16 deletions kernel/src/predicates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,21 @@ mod tests;
///
/// Because inversion (`NOT` operator) has special semantics and can often be optimized away by
/// pushing it down, most methods take an `inverted` flag. That allows operations like
/// [`UnaryOperator::Not`] to simply evaluate their operand with a flipped `inverted` flag,
/// [`UnaryOperator::Not`] to simply evaluate their operand with a flipped `inverted` flag, and
/// greatly simplifies the implementations of most operators (other than those which have to
/// directly implement NOT semantics, which are unavoidably complex in that regard).
///
/// # Parameterized output type
///
/// The types involved in predicate evaluation are parameterized and implementation-specific. For
/// example, [`crate::engine::parquet_stats_skipping::ParquetStatsProvider`] directly evaluates the
/// predicate over parquet footer stats and returns boolean results, while
/// [`crate::scan::data_skipping::DataSkippingPredicateCreator`] instead transforms the input
/// predicate expression to a data skipping predicate expresion that the engine can evaluated
/// directly against Delta data skipping stats during log replay. Although this approach is harder
/// to read and reason about at first, the majority of expressions can be implemented generically,
/// which greatly reduces redundancy and ensures that all flavors of predicate evaluation have the
/// same semantics.
///
/// # NULL and error semantics
///
Expand All @@ -44,6 +58,9 @@ mod tests;
pub(crate) trait PredicateEvaluator {
type Output;

/// A (possibly inverted) scalar NULL test, e.g. `<value> IS [NOT] NULL`.
fn eval_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option<Self::Output>;

/// A (possibly inverted) boolean scalar value, e.g. `[NOT] <value>`.
fn eval_scalar(&self, val: &Scalar, inverted: bool) -> Option<Self::Output>;

Expand Down Expand Up @@ -123,14 +140,19 @@ pub(crate) trait PredicateEvaluator {
fn eval_unary(&self, op: UnaryOperator, expr: &Expr, inverted: bool) -> Option<Self::Output> {
match op {
UnaryOperator::Not => self.eval_expr(expr, !inverted),
UnaryOperator::IsNull => {
// Data skipping only supports IS [NOT] NULL over columns (not expressions)
let Expr::Column(col) = expr else {
UnaryOperator::IsNull => match expr {
// WARNING: Only literals and columns can be safely null-checked. Attempting to
// null-check an expressions such as `a < 10` could wrongly produce FALSE in case
// `a` is just plain missing (rather than known to be NULL. A missing-value can
// arise e.g. if data skipping encounters a column with missing stats, or if
// partition pruning encounters a non-partition column.
Expr::Literal(val) => self.eval_scalar_is_null(val, inverted),
Expr::Column(col) => self.eval_is_null(col, inverted),
_ => {
debug!("Unsupported operand: IS [NOT] NULL: {expr:?}");
return None;
};
self.eval_is_null(col, inverted)
}
None
}
},
}
}

Expand Down Expand Up @@ -229,12 +251,137 @@ pub(crate) trait PredicateEvaluator {
Variadic(VariadicExpression { op, exprs }) => self.eval_variadic(*op, exprs, inverted),
}
}

/// Evaluates a predicate with SQL WHERE semantics.
///
/// By default, [`eval_expr`] behaves badly for comparisons involving NULL columns (e.g. `a <
/// 10` when `a` is NULL), because the comparison correctly evaluates to NULL, but NULL
/// expressions are interpreted as "stats missing" (= cannot skip). This ambiguity can "poison"
/// the entire expression, causing it to return NULL instead of FALSE that would allow skipping:
///
/// ```text
/// WHERE a < 10 -- NULL (can't skip file)
/// WHERE a < 10 AND TRUE -- NULL (can't skip file)
/// WHERE a < 10 OR FALSE -- NULL (can't skip file)
/// ```
///
/// Meanwhile, SQL WHERE semantics only keeps rows for which the filter evaluates to
/// TRUE (discarding rows that evaluate to FALSE or NULL):
///
/// ```text
/// WHERE a < 10 -- NULL (discard row)
/// WHERE a < 10 AND TRUE -- NULL (discard row)
/// WHERE a < 10 OR FALSE -- NULL (discard row)
/// ```
///
/// Conceptually, the behavior difference between data skipping and SQL WHERE semantics can be
/// addressed by evaluating with null-safe semantics, as if by `<expr> IS NOT NULL AND <expr>`:
///
/// ```text
/// WHERE (a < 10) IS NOT NULL AND (a < 10) -- FALSE (skip file)
/// WHERE (a < 10 AND TRUE) IS NOT NULL AND (a < 10 AND TRUE) -- FALSE (skip file)
/// WHERE (a < 10 OR FALSE) IS NOT NULL AND (a < 10 OR FALSE) -- FALSE (skip file)
/// ```
///
/// HOWEVER, we cannot safely NULL-check the result of an arbitrary data skipping predicate
/// because an expression will also produce NULL if the value is just plain missing (e.g. data
/// skipping over a column that lacks stats), and if that NULL should propagate all the way to
/// top-level, it would be wrongly interpreted as FALSE (= skippable).
///
/// To prevent wrong data skipping, the predicate evaluator always returns NULL for a NULL check
/// over anything except for literals and columns with known values. So we must push the NULL
/// check down through supported operations (AND as well as null-intolerant comparisons like
/// `<`, `!=`, etc) until it reaches columns and literals where it can do some good, e.g.:
///
/// ```text
/// WHERE a < 10 AND (b < 20 OR c < 30)
/// ```
///
/// would conceptually be interpreted as
///
/// ```text
/// WHERE
/// (a < 10 AND (b < 20 OR c < 30)) IS NOT NULL AND
/// (a < 10 AND (b < 20 OR c < 30))
/// ```
///
/// We then push the NULL check down through the top-level AND:
///
/// ```text
/// WHERE
/// (a < 10 IS NOT NULL AND a < 10) AND
/// ((b < 20 OR c < 30) IS NOT NULL AND (b < 20 OR c < 30))
/// ```
///
/// and attempt to push it further into the `a < 10` and `OR` clauses:
///
/// ```text
/// WHERE
/// (a IS NOT NULL AND 10 IS NOT NULL AND a < 10) AND
/// (b < 20 OR c < 30)
/// ```
///
/// Any time the push-down reaches an operator that does not support push-down (such as OR), we
/// simply drop the NULL check. This way, the top-level NULL check only applies to
/// sub-expressions that can safely implement it, while ignoring other sub-expressions. The
/// unsupported sub-expressions could produce nulls at runtime that prevent skipping, but false
/// positives are OK -- the query will still correctly filter out the unwanted rows that result.
///
/// At expression evaluation time, a NULL value of `a` (from our example) would evaluate as:
///
/// ```text
/// AND(..., AND(a IS NOT NULL, 10 IS NOT NULL, a < 10), ...)
/// AND(..., AND(FALSE, TRUE, NULL), ...)
/// AND(..., FALSE, ...)
/// FALSE
/// ```
///
/// While a non-NULL value of `a` would instead evaluate as:
///
/// ```text
/// AND(..., AND(a IS NOT NULL, 10 IS NOT NULL, a < 10), ...)
/// AND(..., AND(TRUE, TRUE, <result>), ...)
/// AND(..., <result>, ...)
/// ```
///
/// And a missing value for `a` would safely disable the clause:
///
/// ```text
/// AND(..., AND(a IS NOT NULL, 10 IS NOT NULL, a < 10), ...)
/// AND(..., AND(NULL, TRUE, NULL), ...)
/// AND(..., NULL, ...)
/// ```
fn eval_sql_where(&self, filter: &Expr) -> Option<Self::Output> {
use Expr::{Binary, Variadic};
match filter {
Variadic(v) => {
// Recursively invoke `eval_sql_where` instead of the usual `eval_expr` for AND/OR.
let exprs = v.exprs.iter().map(|expr| self.eval_sql_where(expr));
self.finish_eval_variadic(v.op, exprs, false)
}
Binary(BinaryExpression { op, left, right }) if op.is_null_intolerant_comparison() => {
// Perform a nullsafe comparison instead of the usual `eval_binary`
let exprs = [
self.eval_unary(UnaryOperator::IsNull, left, true),
self.eval_unary(UnaryOperator::IsNull, right, true),
self.eval_binary(*op, left, right, false),
];
self.finish_eval_variadic(VariadicOperator::And, exprs, false)
}
_ => self.eval_expr(filter, false),
}
}
}

/// A collection of provided methods from the [`PredicateEvaluator`] trait, factored out to allow
/// reuse by the different predicate evaluator implementations.
/// reuse by multiple bool-output predicate evaluator implementations.
pub(crate) struct PredicateEvaluatorDefaults;
impl PredicateEvaluatorDefaults {
/// Directly null-tests a scalar. See [`PredicateEvaluator::eval_scalar_is_null`].
pub(crate) fn eval_scalar_is_null(val: &Scalar, inverted: bool) -> Option<bool> {
Some(val.is_null() != inverted)
}

/// Directly evaluates a boolean scalar. See [`PredicateEvaluator::eval_scalar`].
pub(crate) fn eval_scalar(val: &Scalar, inverted: bool) -> Option<bool> {
match val {
Expand Down Expand Up @@ -326,6 +473,14 @@ impl ResolveColumnAsScalar for UnimplementedColumnResolver {
}
}

// Used internally and by some tests
pub(crate) struct EmptyColumnResolver;
impl ResolveColumnAsScalar for EmptyColumnResolver {
fn resolve_column(&self, _col: &ColumnName) -> Option<Scalar> {
None
}
}

// In testing, it is convenient to just build a hashmap of scalar values.
#[cfg(test)]
impl ResolveColumnAsScalar for std::collections::HashMap<ColumnName, Scalar> {
Expand Down Expand Up @@ -358,13 +513,17 @@ impl<R: ResolveColumnAsScalar + 'static> From<R> for DefaultPredicateEvaluator<R
impl<R: ResolveColumnAsScalar> PredicateEvaluator for DefaultPredicateEvaluator<R> {
type Output = bool;

fn eval_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option<bool> {
PredicateEvaluatorDefaults::eval_scalar_is_null(val, inverted)
}

fn eval_scalar(&self, val: &Scalar, inverted: bool) -> Option<bool> {
PredicateEvaluatorDefaults::eval_scalar(val, inverted)
}

fn eval_is_null(&self, col: &ColumnName, inverted: bool) -> Option<bool> {
let col = self.resolve_column(col)?;
Some(matches!(col, Scalar::Null(_)) != inverted)
self.eval_scalar_is_null(&col, inverted)
}

fn eval_lt(&self, col: &ColumnName, val: &Scalar) -> Option<bool> {
Expand Down Expand Up @@ -428,12 +587,6 @@ impl<R: ResolveColumnAsScalar> PredicateEvaluator for DefaultPredicateEvaluator<
/// example, comparisons involving a column are converted into comparisons over that column's
/// min/max stats, and NULL checks are converted into comparisons involving the column's nullcount
/// and rowcount stats.
///
/// The types involved in these operations are parameterized and implementation-specific. For
/// example, [`crate::engine::parquet_stats_skipping::ParquetStatsProvider`] directly evaluates data
/// skipping expressions and returnss boolean results, while
/// [`crate::scan::data_skipping::DataSkippingPredicateCreator`] instead converts the input
/// predicate to a data skipping predicate that can be evaluated directly later.
pub(crate) trait DataSkippingPredicateEvaluator {
/// The output type produced by this expression evaluator
type Output;
Expand All @@ -454,6 +607,9 @@ pub(crate) trait DataSkippingPredicateEvaluator {
/// Retrieves the row count of a column (parquet footers always include this stat).
fn get_rowcount_stat(&self) -> Option<Self::IntStat>;

/// See [`PredicateEvaluator::eval_scalar_is_null`]
fn eval_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option<Self::Output>;

/// See [`PredicateEvaluator::eval_scalar`]
fn eval_scalar(&self, val: &Scalar, inverted: bool) -> Option<Self::Output>;

Expand Down Expand Up @@ -589,6 +745,10 @@ pub(crate) trait DataSkippingPredicateEvaluator {
impl<T: DataSkippingPredicateEvaluator> PredicateEvaluator for T {
type Output = T::Output;

fn eval_scalar_is_null(&self, val: &Scalar, inverted: bool) -> Option<Self::Output> {
self.eval_scalar_is_null(val, inverted)
}

fn eval_scalar(&self, val: &Scalar, inverted: bool) -> Option<Self::Output> {
self.eval_scalar(val, inverted)
}
Expand Down
Loading

0 comments on commit 8494126

Please sign in to comment.