diff --git a/integration_tests/run_pyspark_from_build.sh b/integration_tests/run_pyspark_from_build.sh index 183df6a169b..643c2b74f0f 100755 --- a/integration_tests/run_pyspark_from_build.sh +++ b/integration_tests/run_pyspark_from_build.sh @@ -364,16 +364,18 @@ EOF fi export PYSP_TEST_spark_rapids_memory_gpu_allocSize=${PYSP_TEST_spark_rapids_memory_gpu_allocSize:-'1536m'} - if [[ "$VELOX_TEST" -eq 1 ]]; then - if [ -z "${VELOX_JARS}" ]; then - echo "Error: Environment VELOX_JARS is not set." + # Turns on $LOAD_HYBRID_BACKEND and setup the filepath of hybrid backend jars, to activate the + # hybrid backend while running subsequent integration tests. + if [[ "$LOAD_HYBRID_BACKEND" -eq 1 ]]; then + if [ -z "${HYBRID_BACKEND_JARS}" ]; then + echo "Error: Environment HYBRID_BACKEND_JARS is not set." exit 1 fi - export PYSP_TEST_spark_jars="${PYSP_TEST_spark_jars},${VELOX_JARS//:/,}" + export PYSP_TEST_spark_jars="${PYSP_TEST_spark_jars},${HYBRID_BACKEND_JARS//:/,}" export PYSP_TEST_spark_memory_offHeap_enabled=true export PYSP_TEST_spark_memory_offHeap_size=512M + export PYSP_TEST_spark_rapids_sql_hybrid_load=true export PYSP_TEST_spark_gluten_loadLibFromJar=true - export PYSP_TEST_spark_rapids_sql_loadVelox=true fi SPARK_SHELL_SMOKE_TEST="${SPARK_SHELL_SMOKE_TEST:-0}" diff --git a/integration_tests/src/main/python/marks.py b/integration_tests/src/main/python/marks.py index 9a0bde11113..6ea57b99423 100644 --- a/integration_tests/src/main/python/marks.py +++ b/integration_tests/src/main/python/marks.py @@ -35,3 +35,4 @@ pyarrow_test = pytest.mark.pyarrow_test datagen_overrides = pytest.mark.datagen_overrides tz_sensitive_test = pytest.mark.tz_sensitive_test +hybrid_test = pytest.mark.hybrid_test diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 233fa0a8125..c1846d664e0 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -1588,3 +1588,82 @@ def setup_table(spark): with_cpu_session(lambda spark: setup_table(spark)) assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.read.parquet(data_path).select("p"), conf={"spark.rapids.sql.columnSizeBytes": "100"}) + +""" +VeloxScan: +1. DecimalType is NOT fully supported +2. TimestampType can NOT be the KeyType of MapType +3. NestedMap is disabled because it may produce incorrect result (usually occurring when table is very small) +""" +velox_gens = [ + [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, + string_gen, boolean_gen, date_gen, + TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc)), + ArrayGen(byte_gen), + ArrayGen(long_gen), ArrayGen(string_gen), ArrayGen(date_gen), + ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))), + ArrayGen(ArrayGen(byte_gen)), + StructGen([['child0', ArrayGen(byte_gen)], ['child1', byte_gen], ['child2', float_gen], + ['child3', decimal_gen_64bit]]), + ArrayGen(StructGen([['child0', string_gen], ['child1', double_gen], ['child2', int_gen]])) + ], + [MapGen(f(nullable=False), f()) for f in [ + BooleanGen, ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen, DateGen] + ], + [simple_string_to_string_map_gen, + MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(string_gen), + max_length=10), + MapGen(RepeatSeqGen(IntegerGen(nullable=False), 10), long_gen, max_length=10), + # TODO: It seems that Velox Parquet Scan can NOT handle nested Map correctly + # MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen) + ], +] + +@pytest.mark.skipif(not is_hybrid_backend_loaded(), reason="HybridScan specialized tests") +@pytest.mark.parametrize('parquet_gens', velox_gens, ids=idfn) +@pytest.mark.parametrize('gen_rows', [20, 100, 512, 1024, 4096], ids=idfn) +@hybrid_test +def test_parquet_read_round_trip_hybrid(spark_tmp_path, parquet_gens, gen_rows): + gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] + data_path = spark_tmp_path + '/PARQUET_DATA' + with_cpu_session( + lambda spark: gen_df(spark, gen_list, length=gen_rows).write.parquet(data_path), + conf=rebase_write_corrected_conf) + + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.read.parquet(data_path), + conf={ + 'spark.sql.sources.useV1SourceList': 'parquet', + 'spark.rapids.sql.parquet.useHybridReader': 'true', + }) + +# Creating scenarios in which CoalesceConverter will coalesce several input batches by adjusting +# reader_batch_size and coalesced_batch_size, tests if the CoalesceConverter functions correctly +# when coalescing is needed. +@pytest.mark.skipif(not is_hybrid_backend_loaded(), reason="HybridScan specialized tests") +@pytest.mark.parametrize('reader_batch_size', [512, 1024, 2048], ids=idfn) +@pytest.mark.parametrize('coalesced_batch_size', [1 << 25, 1 << 27], ids=idfn) +@pytest.mark.parametrize('gen_rows', [8192, 10000], ids=idfn) +@hybrid_test +def test_parquet_read_round_trip_hybrid_multiple_batches(spark_tmp_path, + reader_batch_size, + coalesced_batch_size, + gen_rows): + gens = [] + for g in velox_gens: + gens.extend(g) + + gen_list = [('_c' + str(i), gen) for i, gen in enumerate(gens)] + data_path = spark_tmp_path + '/PARQUET_DATA' + with_cpu_session( + lambda spark: gen_df(spark, gen_list, length=gen_rows).write.parquet(data_path), + conf=rebase_write_corrected_conf) + + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.read.parquet(data_path), + conf={ + 'spark.sql.sources.useV1SourceList': 'parquet', + 'spark.rapids.sql.parquet.useHybridReader': 'true', + 'spark.gluten.sql.columnar.maxBatchSize': reader_batch_size, + 'spark.rapids.sql.batchSizeBytes': coalesced_batch_size, + }) diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index 831680e4feb..2dd61a05c1a 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -327,3 +327,6 @@ def is_hive_available(): if is_at_least_precommit_run(): return True return _spark.conf.get("spark.sql.catalogImplementation") == "hive" + +def is_hybrid_backend_loaded(): + return _spark.conf.get("spark.rapids.sql.hybrid.load") == "true" diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index f6882e92bb0..afd9cc029b3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -39,7 +39,6 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.filecache.FileCache import com.nvidia.spark.rapids.jni.{DateTimeRebase, ParquetFooter} import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, GpuParquetCrypto, GpuTypeShims, ParquetLegacyNanoAsLongShims, ParquetSchemaClipShims, ParquetStringPredShims, ShimFilePartitionReaderFactory, SparkShimImpl} -import com.nvidia.spark.rapids.velox.VeloxBackendApis import org.apache.commons.io.IOUtils import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream} import org.apache.hadoop.conf.Configuration diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 8227e6e57c9..bb76e352bc1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -28,6 +28,7 @@ import com.nvidia.spark.rapids.lore.{LoreId, OutputLoreId} import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.network.util.{ByteUnit, JavaUtils} +import org.apache.spark.rapids.hybrid.HybridBackend import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.RapidsPrivateUtil @@ -1684,21 +1685,18 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") .booleanConf .createWithDefault(false) - val PARQUET_VELOX_READER = conf("spark.rapids.sql.parquet.useVelox") - .doc("Use velox to do ParquetScan (on CPUs)") + val HYBRID_PARQUET_READER = conf("spark.rapids.sql.parquet.useHybridReader") + .doc("Use HybridScan to read Parquet data via CPUs") .internal() .booleanConf - .createWithDefault(true) - - object VeloxFilterPushdownType extends Enumeration { - val ALL_SUPPORTED, NONE, UNCHANGED = Value - } + .createWithDefault(false) - val LOAD_VELOX = conf("spark.rapids.sql.loadVelox") - .doc("Load Velox (through Gluten) as a spark driver plugin") + // spark.rapids.sql.hybrid.loadBackend defined at HybridPluginWrapper of spark-rapids-private + val LOAD_HYBRID_BACKEND = conf(HybridBackend.LOAD_BACKEND_KEY) + .doc("Load hybrid backend as an extra plugin of spark-rapids during launch time") .startupOnly() .booleanConf - .createWithDefault(true) + .createWithDefault(false) val HASH_AGG_REPLACE_MODE = conf("spark.rapids.sql.hashAgg.replaceMode") .doc("Only when hash aggregate exec has these modes (\"all\" by default): " + @@ -2832,9 +2830,9 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val avroDebugDumpAlways: Boolean = get(AVRO_DEBUG_DUMP_ALWAYS) - lazy val parquetVeloxReader: Boolean = get(PARQUET_VELOX_READER) + lazy val useHybridParquetReader: Boolean = get(HYBRID_PARQUET_READER) - lazy val loadVelox: Boolean = get(LOAD_VELOX) + lazy val loadHybridBackend: Boolean = get(LOAD_HYBRID_BACKEND) lazy val hashAggReplaceMode: String = get(HASH_AGG_REPLACE_MODE) diff --git a/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/CoalesceConvertIterator.scala b/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/CoalesceConvertIterator.scala index f9966cdc97b..4835db387cd 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/CoalesceConvertIterator.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/CoalesceConvertIterator.scala @@ -21,7 +21,6 @@ import com.nvidia.spark.rapids.{AcquireFailed, GpuColumnVector, GpuMetric, GpuSe import com.nvidia.spark.rapids.Arm._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.hybrid.{CoalesceBatchConverter => NativeConverter} -import com.nvidia.spark.rapids.hybrid.HybridJniWrapper import com.nvidia.spark.rapids.hybrid.RapidsHostColumn import org.apache.spark.TaskContext @@ -43,8 +42,6 @@ class CoalesceConvertIterator(veloxIter: Iterator[ColumnarBatch], private val converterMetrics = Map( "C2COutputSize" -> GpuMetric.unwrap(metrics("C2COutputSize"))) - @transient private lazy val runtime: HybridJniWrapper = HybridJniWrapper.getOrCreate() - override def hasNext(): Boolean = { // either converter holds data or upstreaming iterator holds data val ret = withResource(new NvtxWithMetrics("VeloxC2CHasNext", NvtxColor.WHITE, @@ -101,7 +98,6 @@ class CoalesceConvertIterator(veloxIter: Iterator[ColumnarBatch], } } if (converterImpl.isEmpty) { - require(runtime != null, "Please setRuntime before fetching the iterator") val converter = NativeConverter( veloxIter.next(), targetBatchSizeInBytes, schema, converterMetrics diff --git a/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridBackend.scala b/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridBackend.scala new file mode 100644 index 00000000000..676fa8252aa --- /dev/null +++ b/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridBackend.scala @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rapids.hybrid + +import com.nvidia.spark.rapids.hybrid.HybridPluginWrapper + +object HybridBackend { + val LOAD_BACKEND_KEY: String = HybridPluginWrapper.LOAD_BACKEND_KEY +} diff --git a/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridFileSourceScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridFileSourceScanExec.scala index 1592fd6b3e2..f7fbb7197d7 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridFileSourceScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridFileSourceScanExec.scala @@ -32,19 +32,22 @@ import org.apache.spark.sql.rapids.GpuDataSourceScanExec import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.vectorized.ColumnarBatch +/** + * The SparkPlan for HybridParquetScan which does the same job as GpuFileSourceScanExec but in a + * different approach. Therefore, this class is similar to GpuFileSourceScanExec in a lot of way. + */ case class HybridFileSourceScanExec(originPlan: FileSourceScanExec )(@transient val rapidsConf: RapidsConf) extends GpuDataSourceScanExec with GpuExec { import GpuMetric._ - require(originPlan.relation.fileFormat.isInstanceOf[ParquetFileFormat], + require(originPlan.relation.fileFormat.getClass == classOf[ParquetFileFormat], "HybridScan only supports ParquetFormat") override def relation: BaseRelation = originPlan.relation override def tableIdentifier: Option[TableIdentifier] = originPlan.tableIdentifier - // All expressions are on the CPU. override def gpuExpressions: Seq[Expression] = Nil diff --git a/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/HybridFileSourceScanExecMeta.scala b/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/HybridFileSourceScanExecMeta.scala new file mode 100644 index 00000000000..e0d567caf49 --- /dev/null +++ b/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/HybridFileSourceScanExecMeta.scala @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "320"} +{"spark": "321"} +{"spark": "321cdh"} +{"spark": "322"} +{"spark": "323"} +{"spark": "324"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids.shims + +import com.nvidia.spark.rapids._ + +import org.apache.spark.rapids.hybrid.HybridFileSourceScanExec +import org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.sql.types.{BinaryType, MapType} + +class HybridFileSourceScanExecMeta(plan: FileSourceScanExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) + extends SparkPlanMeta[FileSourceScanExec](plan, conf, parent, rule) { + + // Replaces SubqueryBroadcastExec inside dynamic pruning filters with native counterpart + // if possible. Instead regarding filters as childExprs of current Meta, we create + // a new meta for SubqueryBroadcastExec. The reason is that the native replacement of + // FileSourceScan is independent from the replacement of the partitionFilters. + private lazy val partitionFilters = { + val convertBroadcast = (bc: SubqueryBroadcastExec) => { + val meta = GpuOverrides.wrapAndTagPlan(bc, conf) + meta.tagForExplain() + meta.convertIfNeeded().asInstanceOf[BaseSubqueryExec] + } + wrapped.partitionFilters.map { filter => + filter.transformDown { + case dpe@DynamicPruningExpression(inSub: InSubqueryExec) => + inSub.plan match { + case bc: SubqueryBroadcastExec => + dpe.copy(inSub.copy(plan = convertBroadcast(bc))) + case reuse@ReusedSubqueryExec(bc: SubqueryBroadcastExec) => + dpe.copy(inSub.copy(plan = reuse.copy(convertBroadcast(bc)))) + case _ => + dpe + } + } + } + } + + // partition filters and data filters are not run on the GPU + override val childExprs: Seq[ExprMeta[_]] = Seq.empty + + override def tagPlanForGpu(): Unit = { + val cls = wrapped.relation.fileFormat.getClass + if (cls != classOf[ParquetFileFormat]) { + willNotWorkOnGpu(s"unsupported file format: ${cls.getCanonicalName}") + } + } + + override def convertToGpu(): GpuExec = { + // Modifies the original plan to support DPP + val fixedExec = wrapped.copy(partitionFilters = partitionFilters) + HybridFileSourceScanExec(fixedExec)(conf) + } +} + +object HybridFileSourceScanExecMeta { + + // Determines whether using HybridScan or GpuScan + def useHybridScan(conf: RapidsConf, fsse: FileSourceScanExec): Boolean = { + val isEnabled = if (conf.useHybridParquetReader) { + require(conf.loadHybridBackend, + "Hybrid backend was NOT loaded during the launch of spark-rapids plugin") + true + } else { + false + } + // For the time being, only support reading Parquet + lazy val isParquet = fsse.relation.fileFormat.getClass == classOf[ParquetFileFormat] + // Check if data types of all fields are supported by HybridParquetReader + lazy val allSupportedTypes = fsse.requiredSchema.exists { field => + TrampolineUtil.dataTypeExistsRecursively(field.dataType, { + // For the time being, the native backend may return incorrect results over nestedMap + case MapType(kt, vt, _) if kt.isInstanceOf[MapType] || vt.isInstanceOf[MapType] => false + // For the time being, BinaryType is not supported yet + case _: BinaryType => false + case _ => true + }) + } + // TODO: supports BucketedScan + lazy val noBucketedScan = !fsse.bucketedScan + // TODO: supports working along with Alluxio + lazy val noAlluxio = !AlluxioCfgUtils.enabledAlluxioReplacementAlgoConvertTime(conf) + + isEnabled && isParquet && allSupportedTypes && noBucketedScan && noAlluxio + } + +} diff --git a/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/ScanExecShims.scala b/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/ScanExecShims.scala index b432c989f35..ab8596eeb81 100644 --- a/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/ScanExecShims.scala +++ b/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/ScanExecShims.scala @@ -41,7 +41,15 @@ object ScanExecShims { (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.STRUCT + TypeSig.MAP + TypeSig.ARRAY + TypeSig.BINARY + TypeSig.DECIMAL_128).nested(), TypeSig.all), - (fsse, conf, p, r) => new FileSourceScanExecMeta(fsse, conf, p, r)), + (fsse, conf, p, r) => { + // TODO: HybridScan supports DataSourceV2 + // TODO: HybridScan only supports Spark 32X for now. + if (HybridFileSourceScanExecMeta.useHybridScan(conf, fsse)) { + new HybridFileSourceScanExecMeta(fsse, conf, p, r) + } else { + new FileSourceScanExecMeta(fsse, conf, p, r) + } + }), GpuOverrides.exec[BatchScanExec]( "The backend for most file input", ExecChecks( diff --git a/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/spark320/VeloxFileSourceScanExecMeta.scala b/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/spark320/VeloxFileSourceScanExecMeta.scala deleted file mode 100644 index 389b4990355..00000000000 --- a/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/spark320/VeloxFileSourceScanExecMeta.scala +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Copyright (c) 2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "320"} -{"spark": "321"} -{"spark": "321cdh"} -{"spark": "322"} -{"spark": "323"} -{"spark": "324"} -spark-rapids-shim-json-lines ***/ -package com.nvidia.spark.rapids.shims - -import com.nvidia.spark.rapids._ -import org.apache.spark.rapids.hybrid.HybridFileSourceScanExec - -import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression} -import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex} - -class VeloxFileSourceScanExecMeta(plan: FileSourceScanExec, - conf: RapidsConf, - parent: Option[RapidsMeta[_, _, _]], - rule: DataFromReplacementRule, - pushedFilterSchema: Option[Seq[Attribute]] = None) - extends SparkPlanMeta[FileSourceScanExec](plan, conf, parent, rule) { - - // Replaces SubqueryBroadcastExec inside dynamic pruning filters with GPU counterpart - // if possible. Instead regarding filters as childExprs of current Meta, we create - // a new meta for SubqueryBroadcastExec. The reason is that the GPU replacement of - // FileSourceScan is independent from the replacement of the partitionFilters. It is - // possible that the FileSourceScan is on the CPU, while the dynamic partitionFilters - // are on the GPU. And vice versa. - private lazy val partitionFilters = { - val convertBroadcast = (bc: SubqueryBroadcastExec) => { - val meta = GpuOverrides.wrapAndTagPlan(bc, conf) - meta.tagForExplain() - meta.convertIfNeeded().asInstanceOf[BaseSubqueryExec] - } - wrapped.partitionFilters.map { filter => - filter.transformDown { - case dpe@DynamicPruningExpression(inSub: InSubqueryExec) => - inSub.plan match { - case bc: SubqueryBroadcastExec => - dpe.copy(inSub.copy(plan = convertBroadcast(bc))) - case reuse@ReusedSubqueryExec(bc: SubqueryBroadcastExec) => - dpe.copy(inSub.copy(plan = reuse.copy(convertBroadcast(bc)))) - case _ => - dpe - } - } - } - } - - // partition filters and data filters are not run on the GPU - override val childExprs: Seq[ExprMeta[_]] = Seq.empty - - override def tagPlanForGpu(): Unit = ScanExecShims.tagGpuFileSourceScanExecSupport(this) - - override def convertToGpu(): GpuExec = { - val sparkSession = wrapped.relation.sparkSession - val options = wrapped.relation.options - val (location, _) = - if (AlluxioCfgUtils.enabledAlluxioReplacementAlgoConvertTime(conf)) { - val shouldReadFromS3 = wrapped.relation.location match { - // Only handle InMemoryFileIndex - // - // skip handle `MetadataLogFileIndex`, from the description of this class: - // it's about the files generated by the `FileStreamSink`. - // The streaming data source is not in our scope. - // - // For CatalogFileIndex and FileIndex of `delta` data source, - // need more investigation. - case inMemory: InMemoryFileIndex => - // List all the partitions to reduce overhead, pass in 2 empty filters. - // Subsequent process will do the right partition pruning. - // This operation is fast, because it lists files from the caches and the caches - // already exist in the `InMemoryFileIndex`. - val pds = inMemory.listFiles(Seq.empty, Seq.empty) - AlluxioUtils.shouldReadDirectlyFromS3(conf, pds) - case _ => - false - } - - if (!shouldReadFromS3) { - // it's convert time algorithm and some paths are not large tables - AlluxioUtils.replacePathIfNeeded( - conf, - wrapped.relation, - partitionFilters, - wrapped.dataFilters) - } else { - // convert time algorithm and read large files - (wrapped.relation.location, None) - } - } else { - // it's not convert time algorithm or read large files, do not replace - (wrapped.relation.location, None) - } - - val newRelation = HadoopFsRelation( - location, - wrapped.relation.partitionSchema, - wrapped.relation.dataSchema, - wrapped.relation.bucketSpec, - wrapped.relation.fileFormat, - options)(sparkSession) - - HybridFileSourceScanExec( - newRelation, - wrapped.output, - wrapped.requiredSchema, - partitionFilters, - wrapped.optionalBucketSet, - wrapped.optionalNumCoalescedBuckets, - wrapped.dataFilters, - wrapped.tableIdentifier, - wrapped.disableBucketedScan, - queryUsesInputFile = false, - None, - filteredOutput = pushedFilterSchema)(conf) - } -}