Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove test checks for Spark versions before 3.2.0 #11316

Open
wants to merge 5 commits into
base: branch-25.02
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions integration_tests/src/main/python/arithmetic_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,7 @@ def test_mod_pmod_long_min_value():
'cast(-12 as {}) % cast(0 as {})'], ids=idfn)
def test_mod_pmod_by_zero(data_gen, overflow_exp):
string_type = to_cast_string(data_gen.data_type)
if is_before_spark_320():
exception_str = 'java.lang.ArithmeticException: divide by zero'
elif is_before_spark_330():
if is_before_spark_330():
exception_str = 'SparkArithmeticException: divide by zero'
elif is_before_spark_340() and not is_databricks113_or_later():
exception_str = 'SparkArithmeticException: Division by zero'
Expand Down Expand Up @@ -571,7 +569,6 @@ def test_abs_ansi_no_overflow_decimal128(data_gen):

# Only run this test for Spark v3.2.0 and later to verify abs will
# throw exceptions for overflow when ANSI mode is enabled.
@pytest.mark.skipif(is_before_spark_320(), reason='SPARK-33275')
@pytest.mark.parametrize('data_type,value', [
(LongType(), LONG_MIN),
(IntegerType(), INT_MIN),
Expand Down Expand Up @@ -1049,9 +1046,7 @@ def _test_div_by_zero(ansi_mode, expr, is_lit=False):
ansi_conf = {'spark.sql.ansi.enabled': ansi_mode == 'ansi'}
data_gen = lambda spark: two_col_df(spark, IntegerGen(), IntegerGen(min_val=0, max_val=0), length=1)
div_by_zero_func = lambda spark: data_gen(spark).selectExpr(expr)
if is_before_spark_320():
err_message = 'java.lang.ArithmeticException: divide by zero'
elif is_before_spark_330():
if is_before_spark_330():
err_message = 'SparkArithmeticException: divide by zero'
elif is_before_spark_340() and not is_databricks113_or_later():
err_message = 'SparkArithmeticException: Division by zero'
Expand Down Expand Up @@ -1105,7 +1100,6 @@ def _div_overflow_exception_when(expr, ansi_enabled, is_lit=False):

# Only run this test for Spark v3.2.0 and later to verify IntegralDivide will
# throw exceptions for overflow when ANSI mode is enabled.
@pytest.mark.skipif(is_before_spark_320(), reason='https://github.com/apache/spark/pull/32260')
@pytest.mark.parametrize('expr', ['a DIV CAST(-1 AS INT)', 'a DIV b'])
@pytest.mark.parametrize('ansi_enabled', [False, True])
def test_div_overflow_exception_when_ansi(expr, ansi_enabled):
Expand All @@ -1115,15 +1109,13 @@ def test_div_overflow_exception_when_ansi(expr, ansi_enabled):
# throw exceptions for overflow when ANSI mode is enabled.
# We have split this test from test_div_overflow_exception_when_ansi because Spark 3.4
# throws a different exception for literals
@pytest.mark.skipif(is_before_spark_320(), reason='https://github.com/apache/spark/pull/32260')
@pytest.mark.parametrize('expr', ['CAST(-9223372036854775808L as LONG) DIV -1'])
@pytest.mark.parametrize('ansi_enabled', [False, True])
def test_div_overflow_exception_when_ansi_literal(expr, ansi_enabled):
_div_overflow_exception_when(expr, ansi_enabled, is_lit=True)

# Only run this test before Spark v3.2.0 to verify IntegralDivide will NOT
# throw exceptions for overflow even ANSI mode is enabled.
@pytest.mark.skipif(not is_before_spark_320(), reason='https://github.com/apache/spark/pull/32260')
@pytest.mark.parametrize('expr', ['CAST(-9223372036854775808L as LONG) DIV -1', 'a DIV CAST(-1 AS INT)', 'a DIV b'])
@pytest.mark.parametrize('ansi_enabled', ['false', 'true'])
def test_div_overflow_no_exception_when_ansi(expr, ansi_enabled):
Expand Down
9 changes: 1 addition & 8 deletions integration_tests/src/main/python/array_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ def q1(spark):

@incompat
@pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens + decimal_gens, ids=idfn)
@pytest.mark.skipif(is_before_spark_313() or is_spark_330() or is_spark_330cdh(), reason="NaN equality is only handled in Spark 3.1.3+ and SPARK-39976 issue with null and ArrayIntersect in Spark 3.3.0")
@pytest.mark.skipif(is_spark_330() or is_spark_330cdh(), reason="SPARK-39976 issue with null and ArrayIntersect in Spark 3.3.0")
def test_array_intersect(data_gen):
gen = StructGen(
[('a', ArrayGen(data_gen, nullable=True)),
Expand Down Expand Up @@ -570,7 +570,6 @@ def test_array_intersect_spark330(data_gen):

@incompat
@pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens_no_nans + decimal_gens, ids=idfn)
@pytest.mark.skipif(not is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+")
def test_array_intersect_before_spark313(data_gen):
gen = StructGen(
[('a', ArrayGen(data_gen, nullable=True)),
Expand All @@ -590,7 +589,6 @@ def test_array_intersect_before_spark313(data_gen):

@incompat
@pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens + decimal_gens, ids=idfn)
@pytest.mark.skipif(is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+")
def test_array_union(data_gen):
gen = StructGen(
[('a', ArrayGen(data_gen, nullable=True)),
Expand All @@ -610,7 +608,6 @@ def test_array_union(data_gen):

@incompat
@pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens_no_nans + decimal_gens, ids=idfn)
@pytest.mark.skipif(not is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+")
def test_array_union_before_spark313(data_gen):
gen = StructGen(
[('a', ArrayGen(data_gen, nullable=True)),
Expand All @@ -630,7 +627,6 @@ def test_array_union_before_spark313(data_gen):

@incompat
@pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens + decimal_gens, ids=idfn)
@pytest.mark.skipif(is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+")
def test_array_except(data_gen):
gen = StructGen(
[('a', ArrayGen(data_gen, nullable=True)),
Expand All @@ -650,7 +646,6 @@ def test_array_except(data_gen):

@incompat
@pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens_no_nans + decimal_gens, ids=idfn)
@pytest.mark.skipif(not is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+")
def test_array_except_before_spark313(data_gen):
gen = StructGen(
[('a', ArrayGen(data_gen, nullable=True)),
Expand All @@ -670,7 +665,6 @@ def test_array_except_before_spark313(data_gen):

@incompat
@pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens + decimal_gens, ids=idfn)
@pytest.mark.skipif(is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+")
def test_arrays_overlap(data_gen):
gen = StructGen(
[('a', ArrayGen(data_gen, nullable=True)),
Expand All @@ -691,7 +685,6 @@ def test_arrays_overlap(data_gen):

@incompat
@pytest.mark.parametrize('data_gen', no_neg_zero_all_basic_gens_no_nans + decimal_gens, ids=idfn)
@pytest.mark.skipif(not is_before_spark_313(), reason="NaN equality is only handled in Spark 3.1.3+")
def test_arrays_overlap_before_spark313(data_gen):
gen = StructGen(
[('a', ArrayGen(data_gen, nullable=True)),
Expand Down
17 changes: 1 addition & 16 deletions integration_tests/src/main/python/cast_test.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_neg_dec_scale_bug_version should also be updated, as it's using spark_version() comparisons directly against 3.1.1 and 3.1.3.

Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,7 @@ def test_cast_string_date_valid_format():
]
values_string_to_data = invalid_values_string_to_date + valid_values_string_to_date

# Spark 320+ and databricks support Ansi mode when casting string to date
# This means an exception will be thrown when casting invalid string to date on Spark 320+ or databricks
# test Spark versions < 3.2.0 and non databricks, ANSI mode
@pytest.mark.skipif(not is_before_spark_320(), reason="ansi cast(string as date) throws exception only in 3.2.0+ or db")
def test_cast_string_date_invalid_ansi_before_320():
data_rows = [(v,) for v in values_string_to_data]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.createDataFrame(data_rows, "a string").select(f.col('a').cast(DateType())),
conf={'spark.rapids.sql.hasExtendedYearValues': 'false',
'spark.sql.ansi.enabled': 'true'}, )

# test Spark versions >= 320 and databricks, ANSI mode, valid values
@pytest.mark.skipif(is_before_spark_320(), reason="Spark versions(< 320) not support Ansi mode when casting string to date")
def test_cast_string_date_valid_ansi():
data_rows = [(v,) for v in valid_values_string_to_date]
assert_gpu_and_cpu_are_equal_collect(
Expand All @@ -107,7 +95,6 @@ def test_cast_string_date_valid_ansi():
'spark.sql.ansi.enabled': 'true'})

# test Spark versions >= 320, ANSI mode
@pytest.mark.skipif(is_before_spark_320(), reason="ansi cast(string as date) throws exception only in 3.2.0+")
@pytest.mark.parametrize('invalid', invalid_values_string_to_date)
def test_cast_string_date_invalid_ansi(invalid):
assert_gpu_and_cpu_error(
Expand All @@ -118,7 +105,7 @@ def test_cast_string_date_invalid_ansi(invalid):


# test try_cast in Spark versions >= 320 and < 340
@pytest.mark.skipif(is_before_spark_320() or is_spark_340_or_later() or is_databricks113_or_later(), reason="try_cast only in Spark 3.2+")
@pytest.mark.skipif(is_before_spark_340() or is_databricks113_or_later(), reason="try_cast only in Spark 3.2+")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this supposed to be is_spark_340_or_later instead of is_before_spark_340?

@allow_non_gpu('ProjectExec', 'TryCast')
@pytest.mark.parametrize('invalid', invalid_values_string_to_date)
def test_try_cast_fallback(invalid):
Expand Down Expand Up @@ -162,15 +149,13 @@ def test_cast_string_ts_valid_format(data_gen):
'spark.rapids.sql.castStringToTimestamp.enabled': 'true'})

@allow_non_gpu('ProjectExec', 'Cast', 'Alias')
@pytest.mark.skipif(is_before_spark_320(), reason="Only in Spark 3.2.0+ do we have issues with extended years")
def test_cast_string_date_fallback():
assert_gpu_fallback_collect(
# Cast back to String because this goes beyond what python can support for years
lambda spark : unary_op_df(spark, StringGen('([0-9]|-|\\+){4,12}')).select(f.col('a').cast(DateType()).cast(StringType())),
'Cast')

@allow_non_gpu('ProjectExec', 'Cast', 'Alias')
@pytest.mark.skipif(is_before_spark_320(), reason="Only in Spark 3.2.0+ do we have issues with extended years")
def test_cast_string_timestamp_fallback():
assert_gpu_fallback_collect(
# Cast back to String because this goes beyond what python can support for years
Expand Down
10 changes: 2 additions & 8 deletions integration_tests/src/main/python/cmp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from asserts import assert_gpu_and_cpu_are_equal_collect
from conftest import is_not_utc
from data_gen import *
from spark_session import with_cpu_session, is_before_spark_313, is_before_spark_330
from spark_session import with_cpu_session, is_before_spark_330
from pyspark.sql.types import *
from marks import datagen_overrides, allow_non_gpu
import pyspark.sql.functions as f
Expand Down Expand Up @@ -335,16 +335,10 @@ def test_in(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars)))

# We avoid testing inset with NaN in Spark < 3.1.3 since it has issue with NaN comparisons.
# See https://github.com/NVIDIA/spark-rapids/issues/9687.
test_inset_data_gen = [gen for gen in eq_gens_with_decimal_gen if gen != float_gen if gen != double_gen] + \
[FloatGen(no_nans=True), DoubleGen(no_nans=True)] \
if is_before_spark_313() else eq_gens_with_decimal_gen

# Spark supports two different versions of 'IN', and it depends on the spark.sql.optimizer.inSetConversionThreshold conf
# This is to test entries over that value.
@allow_non_gpu(*non_utc_allow)
@pytest.mark.parametrize('data_gen', test_inset_data_gen, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen, ids=idfn)
def test_in_set(data_gen):
# nulls are not supported for in on the GPU yet
num_entries = int(with_cpu_session(lambda spark: spark.conf.get('spark.sql.optimizer.inSetConversionThreshold'))) + 1
Expand Down
3 changes: 1 addition & 2 deletions integration_tests/src/main/python/conditionals_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql
from data_gen import *
from spark_session import is_before_spark_320, is_jvm_charset_utf8
from spark_session import is_jvm_charset_utf8
from pyspark.sql.types import *
from marks import datagen_overrides, allow_non_gpu
import pyspark.sql.functions as f
Expand Down Expand Up @@ -242,7 +242,6 @@ def test_conditional_with_side_effects_sequence(data_gen):
ELSE null END'),
conf = ansi_enabled_conf)

@pytest.mark.skipif(is_before_spark_320(), reason='Earlier versions of Spark cannot cast sequence to string')
@pytest.mark.parametrize('data_gen', [mk_str_gen('[a-z]{0,3}')], ids=idfn)
@allow_non_gpu(*non_utc_allow)
def test_conditional_with_side_effects_sequence_cast(data_gen):
Expand Down
7 changes: 1 addition & 6 deletions integration_tests/src/main/python/delta_lake_delete_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from data_gen import *
from delta_lake_utils import *
from marks import *
from spark_session import is_before_spark_320, is_databricks_runtime, supports_delta_lake_deletion_vectors, \
from spark_session import is_databricks_runtime, supports_delta_lake_deletion_vectors, \
with_cpu_session, with_gpu_session

delta_delete_enabled_conf = copy_and_update(delta_writes_enabled_conf,
Expand Down Expand Up @@ -72,7 +72,6 @@ def checker(data_path, do_delete):
{"spark.rapids.sql.command.DeleteCommand": "false"},
delta_writes_enabled_conf # Test disabled by default
], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
def test_delta_delete_disabled_fallback(spark_tmp_path, disable_conf):
data_path = spark_tmp_path + "/DELTA_DATA"
def setup_tables(spark):
Expand Down Expand Up @@ -113,7 +112,6 @@ def write_func(spark, path):
@ignore_order
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
def test_delta_delete_entire_table(spark_tmp_path, use_cdf, partition_columns):
def generate_dest_data(spark):
return three_col_df(spark,
Expand All @@ -134,7 +132,6 @@ def generate_dest_data(spark):
@ignore_order
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("partition_columns", [["a"], ["a", "b"]], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
def test_delta_delete_partitions(spark_tmp_path, use_cdf, partition_columns):
def generate_dest_data(spark):
return three_col_df(spark,
Expand All @@ -155,7 +152,6 @@ def generate_dest_data(spark):
@ignore_order
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@datagen_overrides(seed=0, permanent=True, reason='https://github.com/NVIDIA/spark-rapids/issues/9884')
def test_delta_delete_rows(spark_tmp_path, use_cdf, partition_columns):
# Databricks changes the number of files being written, so we cannot compare logs unless there's only one slice
Expand All @@ -174,7 +170,6 @@ def generate_dest_data(spark):
@ignore_order
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@datagen_overrides(seed=0, permanent=True, reason='https://github.com/NVIDIA/spark-rapids/issues/9884')
def test_delta_delete_dataframe_api(spark_tmp_path, use_cdf, partition_columns):
from delta.tables import DeltaTable
Expand Down
11 changes: 1 addition & 10 deletions integration_tests/src/main/python/delta_lake_merge_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from delta_lake_merge_common import *
from marks import *
from pyspark.sql.types import *
from spark_session import is_before_spark_320, is_databricks_runtime, spark_version
from spark_session import is_databricks_runtime, spark_version


delta_merge_enabled_conf = copy_and_update(delta_writes_enabled_conf,
Expand All @@ -36,7 +36,6 @@
{"spark.rapids.sql.command.MergeIntoCommand": "false"},
delta_writes_enabled_conf # Test disabled by default
], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
def test_delta_merge_disabled_fallback(spark_tmp_path, spark_tmp_table_factory, disable_conf):
def checker(data_path, do_merge):
assert_gpu_fallback_write(do_merge, read_delta_path, data_path,
Expand Down Expand Up @@ -77,7 +76,6 @@ def checker(data_path, do_merge):
@allow_non_gpu(*delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("partition_columns", [None, ["a"], ["b"], ["a", "b"]], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
Expand All @@ -103,7 +101,6 @@ def test_delta_merge_partial_fallback_via_conf(spark_tmp_path, spark_tmp_table_f
@allow_non_gpu(*delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.parametrize("table_ranges", [(range(20), range(10)), # partial insert of source
(range(5), range(5)), # no-op insert
(range(10), range(20, 30)) # full insert of source
Expand All @@ -120,7 +117,6 @@ def test_delta_merge_not_match_insert_only(spark_tmp_path, spark_tmp_table_facto
@allow_non_gpu(*delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.parametrize("table_ranges", [(range(10), range(20)), # partial delete of target
(range(5), range(5)), # full delete of target
(range(10), range(20, 30)) # no-op delete
Expand All @@ -137,7 +133,6 @@ def test_delta_merge_match_delete_only(spark_tmp_path, spark_tmp_table_factory,
@allow_non_gpu(*delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
def test_delta_merge_standard_upsert(spark_tmp_path, spark_tmp_table_factory, use_cdf, num_slices):
Expand All @@ -148,7 +143,6 @@ def test_delta_merge_standard_upsert(spark_tmp_path, spark_tmp_table_factory, us
@allow_non_gpu(*delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("merge_sql", [
"MERGE INTO {dest_table} d USING {src_table} s ON d.a == s.a" \
Expand All @@ -171,7 +165,6 @@ def test_delta_merge_upsert_with_condition(spark_tmp_path, spark_tmp_table_facto
@allow_non_gpu(*delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
def test_delta_merge_upsert_with_unmatchable_match_condition(spark_tmp_path, spark_tmp_table_factory, use_cdf, num_slices):
Expand All @@ -183,7 +176,6 @@ def test_delta_merge_upsert_with_unmatchable_match_condition(spark_tmp_path, spa
@allow_non_gpu(*delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
def test_delta_merge_update_with_aggregation(spark_tmp_path, spark_tmp_table_factory, use_cdf):
do_test_delta_merge_update_with_aggregation(spark_tmp_path, spark_tmp_table_factory, use_cdf,
Expand All @@ -192,7 +184,6 @@ def test_delta_merge_update_with_aggregation(spark_tmp_path, spark_tmp_table_fac
@allow_non_gpu(*delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.xfail(not is_databricks_runtime(), reason="https://github.com/NVIDIA/spark-rapids/issues/7573")
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.parametrize("num_slices", num_slices_to_test, ids=idfn)
Expand Down
Loading