Skip to content

Commit

Permalink
Upgrade to polars 1.11 in cudf-polars (rapidsai#17154)
Browse files Browse the repository at this point in the history
Polars 1.11 is out, with slight updates to the IR, so we can correctly raise for dynamic groupbys and see inequality joins.

These changes adapt to that and do a first pass at supporting inequality joins (by translating to cross + filter). A followup (rapidsai#17000) will use libcudf's conditional joins.

Authors:
  - Lawrence Mitchell (https://github.com/wence-)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - Mike Sarahan (https://github.com/msarahan)

URL: rapidsai#17154
  • Loading branch information
wence- authored Oct 24, 2024
1 parent d7cdf44 commit 3a62314
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 29 deletions.
2 changes: 1 addition & 1 deletion conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ dependencies:
- pandas
- pandas>=2.0,<2.2.4dev0
- pandoc
- polars>=1.8,<1.9
- polars>=1.11,<1.12
- pre-commit
- ptxcompiler
- pyarrow>=14.0.0,<18.0.0a0
Expand Down
2 changes: 1 addition & 1 deletion conda/environments/all_cuda-125_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ dependencies:
- pandas
- pandas>=2.0,<2.2.4dev0
- pandoc
- polars>=1.8,<1.9
- polars>=1.11,<1.12
- pre-commit
- pyarrow>=14.0.0,<18.0.0a0
- pydata-sphinx-theme!=0.14.2
Expand Down
2 changes: 1 addition & 1 deletion conda/recipes/cudf-polars/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ requirements:
run:
- python
- pylibcudf ={{ version }}
- polars >=1.8,<1.9
- polars >=1.11,<1.12
- {{ pin_compatible('cuda-version', max_pin='x', min_pin='x') }}

test:
Expand Down
2 changes: 1 addition & 1 deletion dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ dependencies:
common:
- output_types: [conda, requirements, pyproject]
packages:
- polars>=1.8,<1.9
- polars>=1.11,<1.12
run_dask_cudf:
common:
- output_types: [conda, requirements, pyproject]
Expand Down
17 changes: 8 additions & 9 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,11 +666,11 @@ def __init__(
raise NotImplementedError(
"rolling window/groupby"
) # pragma: no cover; rollingwindow constructor has already raised
if self.options.dynamic:
raise NotImplementedError("dynamic group by")
if any(GroupBy.check_agg(a.value) > 1 for a in self.agg_requests):
raise NotImplementedError("Nested aggregations in groupby")
self.agg_infos = [req.collect_agg(depth=0) for req in self.agg_requests]
if len(self.keys) == 0:
raise NotImplementedError("dynamic groupby")

@staticmethod
def check_agg(agg: expr.Expr) -> int:
Expand Down Expand Up @@ -802,10 +802,10 @@ class Join(IR):
right_on: tuple[expr.NamedExpr, ...]
"""List of expressions used as keys in the right frame."""
options: tuple[
Literal["inner", "left", "right", "full", "leftsemi", "leftanti", "cross"],
Literal["inner", "left", "right", "full", "semi", "anti", "cross"],
bool,
tuple[int, int] | None,
str | None,
str,
bool,
]
"""
Expand Down Expand Up @@ -840,7 +840,7 @@ def __init__(
@staticmethod
@cache
def _joiners(
how: Literal["inner", "left", "right", "full", "leftsemi", "leftanti"],
how: Literal["inner", "left", "right", "full", "semi", "anti"],
) -> tuple[
Callable, plc.copying.OutOfBoundsPolicy, plc.copying.OutOfBoundsPolicy | None
]:
Expand All @@ -862,13 +862,13 @@ def _joiners(
plc.copying.OutOfBoundsPolicy.NULLIFY,
plc.copying.OutOfBoundsPolicy.NULLIFY,
)
elif how == "leftsemi":
elif how == "semi":
return (
plc.join.left_semi_join,
plc.copying.OutOfBoundsPolicy.DONT_CHECK,
None,
)
elif how == "leftanti":
elif how == "anti":
return (
plc.join.left_anti_join,
plc.copying.OutOfBoundsPolicy.DONT_CHECK,
Expand Down Expand Up @@ -933,7 +933,6 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
"""Evaluate and return a dataframe."""
left, right = (c.evaluate(cache=cache) for c in self.children)
how, join_nulls, zlice, suffix, coalesce = self.options
suffix = "_right" if suffix is None else suffix
if how == "cross":
# Separate implementation, since cross_join returns the
# result, not the gather maps
Expand All @@ -955,7 +954,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
columns[left.num_columns :], right.column_names, strict=True
)
]
return DataFrame([*left_cols, *right_cols])
return DataFrame([*left_cols, *right_cols]).slice(zlice)
# TODO: Waiting on clarity based on https://github.com/pola-rs/polars/issues/17184
left_on = DataFrame(broadcast(*(e.evaluate(left) for e in self.left_on)))
right_on = DataFrame(broadcast(*(e.evaluate(right) for e in self.right_on)))
Expand Down
76 changes: 72 additions & 4 deletions python/cudf_polars/cudf_polars/dsl/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

from __future__ import annotations

import functools
import json
from contextlib import AbstractContextManager, nullcontext
from functools import singledispatch
from typing import Any
from typing import TYPE_CHECKING, Any

import pyarrow as pa
import pylibcudf as plc
Expand All @@ -19,9 +20,13 @@
from polars.polars import _expr_nodes as pl_expr, _ir_nodes as pl_ir

from cudf_polars.dsl import expr, ir
from cudf_polars.dsl.traversal import make_recursive, reuse_if_unchanged
from cudf_polars.typing import NodeTraverser
from cudf_polars.utils import dtypes, sorting

if TYPE_CHECKING:
from cudf_polars.typing import ExprTransformer

__all__ = ["translate_ir", "translate_named_expr"]


Expand Down Expand Up @@ -182,7 +187,71 @@ def _(
with set_node(visitor, node.input_right):
inp_right = translate_ir(visitor, n=None)
right_on = [translate_named_expr(visitor, n=e) for e in node.right_on]
return ir.Join(schema, left_on, right_on, node.options, inp_left, inp_right)
if (how := node.options[0]) in {
"inner",
"left",
"right",
"full",
"cross",
"semi",
"anti",
}:
return ir.Join(schema, left_on, right_on, node.options, inp_left, inp_right)
else:
how, op1, op2 = how
if how != "ie_join":
raise NotImplementedError(
f"Unsupported join type {how}"
) # pragma: no cover; asof joins not yet exposed
# No exposure of mixed/conditional joins in pylibcudf yet, so in
# the first instance, implement by doing a cross join followed by
# a filter.
_, join_nulls, zlice, suffix, coalesce = node.options
cross = ir.Join(
schema,
[],
[],
("cross", join_nulls, None, suffix, coalesce),
inp_left,
inp_right,
)
dtype = plc.DataType(plc.TypeId.BOOL8)
if op2 is None:
ops = [op1]
else:
ops = [op1, op2]
suffix = cross.options[3]

# Column references in the right table refer to the post-join
# names, so with suffixes.
def _rename(e: expr.Expr, rec: ExprTransformer) -> expr.Expr:
if isinstance(e, expr.Col) and e.name in inp_left.schema:
return type(e)(e.dtype, f"{e.name}{suffix}")
return reuse_if_unchanged(e, rec)

mapper = make_recursive(_rename)
right_on = [
expr.NamedExpr(
f"{old.name}{suffix}" if old.name in inp_left.schema else old.name, new
)
for new, old in zip(
(mapper(e.value) for e in right_on), right_on, strict=True
)
]
mask = functools.reduce(
functools.partial(
expr.BinOp, dtype, plc.binaryop.BinaryOperator.LOGICAL_AND
),
(
expr.BinOp(dtype, expr.BinOp._MAPPING[op], left.value, right.value)
for op, left, right in zip(ops, left_on, right_on, strict=True)
),
)
filtered = ir.Filter(schema, expr.NamedExpr("mask", mask), cross)
if zlice is not None:
offset, length = zlice
return ir.Slice(schema, offset, length, filtered)
return filtered


@_translate_ir.register
Expand Down Expand Up @@ -319,8 +388,7 @@ def translate_ir(visitor: NodeTraverser, *, n: int | None = None) -> ir.IR:
# IR is versioned with major.minor, minor is bumped for backwards
# compatible changes (e.g. adding new nodes), major is bumped for
# incompatible changes (e.g. renaming nodes).
# Polars 1.7 changes definition of the CSV reader options schema name.
if (version := visitor.version()) >= (3, 0):
if (version := visitor.version()) >= (4, 0):
raise NotImplementedError(
f"No support for polars IR {version=}"
) # pragma: no cover; no such version for now.
Expand Down
38 changes: 31 additions & 7 deletions python/cudf_polars/cudf_polars/testing/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,34 @@ def pytest_configure(config: pytest.Config):
"tests/unit/io/test_lazy_parquet.py::test_parquet_is_in_statistics": "Debug output on stderr doesn't match",
"tests/unit/io/test_lazy_parquet.py::test_parquet_statistics": "Debug output on stderr doesn't match",
"tests/unit/io/test_lazy_parquet.py::test_parquet_different_schema[False]": "Needs cudf#16394",
"tests/unit/io/test_lazy_parquet.py::test_parquet_schema_arg[False-columns]": "Correctly raises but different error",
"tests/unit/io/test_lazy_parquet.py::test_parquet_schema_arg[False-row_groups]": "Correctly raises but different error",
"tests/unit/io/test_lazy_parquet.py::test_parquet_schema_arg[False-prefiltered]": "Correctly raises but different error",
"tests/unit/io/test_lazy_parquet.py::test_parquet_schema_arg[False-none]": "Correctly raises but different error",
"tests/unit/io/test_lazy_parquet.py::test_parquet_schema_mismatch_panic_17067[False]": "Needs cudf#16394",
"tests/unit/io/test_lazy_parquet.py::test_scan_parquet_ignores_dtype_mismatch_for_non_projected_columns_19249[False-False]": "Needs some variant of cudf#16394",
"tests/unit/io/test_lazy_parquet.py::test_scan_parquet_ignores_dtype_mismatch_for_non_projected_columns_19249[True-False]": "Needs some variant of cudf#16394",
"tests/unit/io/test_lazy_parquet.py::test_parquet_slice_pushdown_non_zero_offset[False]": "Thrift data not handled correctly/slice pushdown wrong?",
"tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read[False]": "Incomplete handling of projected reads with mismatching schemas, cudf#16394",
"tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read_dtype_mismatch[False]": "Different exception raised, but correctly raises an exception",
"tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read_missing_cols_from_first[False]": "Different exception raised, but correctly raises an exception",
"tests/unit/io/test_parquet.py::test_read_parquet_only_loads_selected_columns_15098": "Memory usage won't be correct due to GPU",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection0-False-none]": "Mismatching column read cudf#16394",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection1-False-none]": "Mismatching column read cudf#16394",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection0-False-prefiltered]": "Mismatching column read cudf#16394",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection1-False-prefiltered]": "Mismatching column read cudf#16394",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection0-False-row_groups]": "Mismatching column read cudf#16394",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection1-False-row_groups]": "Mismatching column read cudf#16394",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection0-False-columns]": "Mismatching column read cudf#16394",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection1-False-columns]": "Mismatching column read cudf#16394",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection0-True-none]": "Mismatching column read cudf#16394",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection1-True-none]": "Mismatching column read cudf#16394",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection0-True-prefiltered]": "Mismatching column read cudf#16394",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection1-True-prefiltered]": "Mismatching column read cudf#16394",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection0-True-row_groups]": "Mismatching column read cudf#16394",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection1-True-row_groups]": "Mismatching column read cudf#16394",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection0-True-columns]": "Mismatching column read cudf#16394",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection1-True-columns]": "Mismatching column read cudf#16394",
"tests/unit/io/test_scan.py::test_scan[single-csv-async]": "Debug output on stderr doesn't match",
"tests/unit/io/test_scan.py::test_scan_with_limit[single-csv-async]": "Debug output on stderr doesn't match",
"tests/unit/io/test_scan.py::test_scan_with_filter[single-csv-async]": "Debug output on stderr doesn't match",
Expand Down Expand Up @@ -107,6 +129,14 @@ def pytest_configure(config: pytest.Config):
"tests/unit/operations/aggregation/test_aggregations.py::test_sum_empty_and_null_set": "libcudf sums column of all nulls to null, not zero",
"tests/unit/operations/aggregation/test_aggregations.py::test_binary_op_agg_context_no_simplify_expr_12423": "groupby-agg of just literals should not produce collect_list",
"tests/unit/operations/aggregation/test_aggregations.py::test_nan_inf_aggregation": "treatment of nans and nulls together is different in libcudf and polars in groupby-agg context",
"tests/unit/operations/arithmetic/test_list_arithmetic.py::test_list_arithmetic_values[func0-func0-none]": "cudf-polars doesn't nullify division by zero",
"tests/unit/operations/arithmetic/test_list_arithmetic.py::test_list_arithmetic_values[func0-func1-none]": "cudf-polars doesn't nullify division by zero",
"tests/unit/operations/arithmetic/test_list_arithmetic.py::test_list_arithmetic_values[func0-func2-none]": "cudf-polars doesn't nullify division by zero",
"tests/unit/operations/arithmetic/test_list_arithmetic.py::test_list_arithmetic_values[func0-func3-none]": "cudf-polars doesn't nullify division by zero",
"tests/unit/operations/arithmetic/test_list_arithmetic.py::test_list_arithmetic_values[func1-func0-none]": "cudf-polars doesn't nullify division by zero",
"tests/unit/operations/arithmetic/test_list_arithmetic.py::test_list_arithmetic_values[func1-func1-none]": "cudf-polars doesn't nullify division by zero",
"tests/unit/operations/arithmetic/test_list_arithmetic.py::test_list_arithmetic_values[func1-func2-none]": "cudf-polars doesn't nullify division by zero",
"tests/unit/operations/arithmetic/test_list_arithmetic.py::test_list_arithmetic_values[func1-func3-none]": "cudf-polars doesn't nullify division by zero",
"tests/unit/operations/test_abs.py::test_abs_duration": "Need to raise for unsupported uops on timelike values",
"tests/unit/operations/test_group_by.py::test_group_by_mean_by_dtype[input7-expected7-Float32-Float32]": "Mismatching dtypes, needs cudf#15852",
"tests/unit/operations/test_group_by.py::test_group_by_mean_by_dtype[input10-expected10-Date-output_dtype10]": "Unsupported groupby-agg for a particular dtype",
Expand All @@ -124,13 +154,6 @@ def pytest_configure(config: pytest.Config):
"tests/unit/operations/test_group_by.py::test_group_by_binary_agg_with_literal": "Incorrect broadcasting of literals in groupby-agg",
"tests/unit/operations/test_group_by.py::test_aggregated_scalar_elementwise_15602": "Unsupported boolean function/dtype combination in groupby-agg",
"tests/unit/operations/test_group_by.py::test_schemas[data1-expr1-expected_select1-expected_gb1]": "Mismatching dtypes, needs cudf#15852",
"tests/unit/operations/test_group_by_dynamic.py::test_group_by_dynamic_by_monday_and_offset_5444": "IR needs to expose groupby-dynamic information",
"tests/unit/operations/test_group_by_dynamic.py::test_group_by_dynamic_label[left-expected0]": "IR needs to expose groupby-dynamic information",
"tests/unit/operations/test_group_by_dynamic.py::test_group_by_dynamic_label[right-expected1]": "IR needs to expose groupby-dynamic information",
"tests/unit/operations/test_group_by_dynamic.py::test_group_by_dynamic_label[datapoint-expected2]": "IR needs to expose groupby-dynamic information",
"tests/unit/operations/test_group_by_dynamic.py::test_rolling_dynamic_sortedness_check": "IR needs to expose groupby-dynamic information",
"tests/unit/operations/test_group_by_dynamic.py::test_group_by_dynamic_validation": "IR needs to expose groupby-dynamic information",
"tests/unit/operations/test_group_by_dynamic.py::test_group_by_dynamic_15225": "IR needs to expose groupby-dynamic information",
"tests/unit/operations/test_join.py::test_cross_join_slice_pushdown": "Need to implement slice pushdown for cross joins",
"tests/unit/sql/test_cast.py::test_cast_errors[values0-values::uint8-conversion from `f64` to `u64` failed]": "Casting that raises not supported on GPU",
"tests/unit/sql/test_cast.py::test_cast_errors[values1-values::uint4-conversion from `i64` to `u32` failed]": "Casting that raises not supported on GPU",
Expand All @@ -140,6 +163,7 @@ def pytest_configure(config: pytest.Config):
"tests/unit/streaming/test_streaming_io.py::test_parquet_eq_statistics": "Debug output on stderr doesn't match",
"tests/unit/test_cse.py::test_cse_predicate_self_join": "Debug output on stderr doesn't match",
"tests/unit/test_empty.py::test_empty_9137": "Mismatching dtypes, needs cudf#15852",
"tests/unit/test_errors.py::test_error_on_empty_group_by": "Incorrect exception raised",
# Maybe flaky, order-dependent?
"tests/unit/test_projections.py::test_schema_full_outer_join_projection_pd_13287": "Order-specific result check, query is correct but in different order",
"tests/unit/test_queries.py::test_group_by_agg_equals_zero_3535": "libcudf sums all nulls to null, not zero",
Expand Down
2 changes: 1 addition & 1 deletion python/cudf_polars/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ authors = [
license = { text = "Apache 2.0" }
requires-python = ">=3.10"
dependencies = [
"polars>=1.8,<1.9",
"polars>=1.11,<1.12",
"pylibcudf==24.12.*,>=0.0.0a0",
] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`.
classifiers = [
Expand Down
Loading

0 comments on commit 3a62314

Please sign in to comment.