Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Comet shuffle read size is larger than Spark shuffle #1268

Open
kazuyukitanimura opened this issue Jan 11, 2025 · 0 comments
Open

Comet shuffle read size is larger than Spark shuffle #1268

kazuyukitanimura opened this issue Jan 11, 2025 · 0 comments
Labels
bug Something isn't working performance
Milestone

Comments

@kazuyukitanimura
Copy link
Contributor

kazuyukitanimura commented Jan 11, 2025

Describe the bug

The attached test is taken from WriteDistributionAndOrderingSuite Spark test ordered distribution and sort with same exprs: append

Looks like Comet shuffle read size is reported much larger than Spark shuffle that causes more partitions

Steps to reproduce

package org.apache.spark.sql

import java.sql.Date
import java.util.Collections

import org.scalactic.source.Position
import org.scalatest.Tag

import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryCatalog}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
import org.apache.spark.sql.connector.distributions.Distributions
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.connector.expressions.LogicalExpressions.sort
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, IntegerType, StringType, StructType}
import org.apache.spark.sql.util.QueryExecutionListener

import org.apache.comet.CometConf

class CSuite extends CometTestBase {
  import testImplicits._

  override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
      pos: Position): Unit = {
    super.test(testName, testTags: _*) {
      withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
        testFun
      }
    }
  }

  test("a") {

    def catalog: InMemoryCatalog = {
      spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryCatalog].getName)
      val catalog = spark.sessionState.catalogManager.catalog("testcat")
      catalog.asTableCatalog.asInstanceOf[InMemoryCatalog]
    }
    val namespace = Array("ns1")
    val ident = Identifier.of(namespace, "test_table")
    val tableNameAsString = "testcat." + ident.toString
    val emptyProps = Collections.emptyMap[String, String]
    val schema = new StructType()
      .add("id", IntegerType)
      .add("data", StringType)
      .add("day", DateType)
    val tableOrdering = Array[SortOrder](
      sort(FieldReference("data"), SortDirection.ASCENDING, NullOrdering.NULLS_FIRST))
    val tableDistribution = Distributions.ordered(tableOrdering)
    val writeTransform: DataFrame => DataFrame = df => df

    catalog.createTable(
      ident = ident,
      schema = schema,
      partitions = Array.empty,
      properties = emptyProps,
      distribution = tableDistribution,
      ordering = tableOrdering,
      requiredNumPartitions = None,
      advisoryPartitionSize = Some(1000),
      distributionStrictlyRequired = true)

    val df =
      spark.sparkContext
        .parallelize(
          (1 to 10).map { i =>
            (if (i > 4) 5 else i, i.toString, Date.valueOf(s"${2020 + i}-$i-$i"))
          },
          3)
        .toDF("id", "data", "day")
    val writer = writeTransform(df).writeTo(tableNameAsString)

    def execute(writeFunc: => Unit): SparkPlan = {
      var executedPlan: SparkPlan = null

      val listener = new QueryExecutionListener {
        override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
          executedPlan = qe.executedPlan
        }
        override def onFailure(
            funcName: String,
            qe: QueryExecution,
            exception: Exception): Unit = {}
      }
      spark.listenerManager.register(listener)

      writeFunc

      sparkContext.listenerBus.waitUntilEmpty()

      executedPlan match {
        case w: V2TableWriteExec =>
          stripAQEPlan(w.query)
        case _ =>
          fail("expected V2TableWriteExec")
      }
    }

    def executeCommand(): SparkPlan = execute(writer.append())

    // if the partition size is configured for the table, set the SQL conf to something small
    // so that the overriding behavior is tested
    val defaultAdvisoryPartitionSize = "15"
    withSQLConf(
      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
      SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> defaultAdvisoryPartitionSize,
      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {

      val executedPlan = executeCommand()
      val read = collect(executedPlan) { case r: AQEShuffleReadExec =>
        r
      }
      assert(read.size == 1)
      println(read.head.partitionSpecs)
      assert(read.head.partitionSpecs.size == 1)
    }
  }
}

Expected behavior

Spark shuffle partition specs
ArrayBuffer(CoalescedPartitionSpec(0,5,Some(394)))

Comet shuffle partion specs
ArrayBuffer(CoalescedPartitionSpec(0,1,Some(890)), CoalescedPartitionSpec(1,3,Some(890)), CoalescedPartitionSpec(3,4,Some(890)), CoalescedPartitionSpec(4,5,Some(445)))

Additional context

May need Spark 3.5+ for the above test or backport https://issues.apache.org/jira/browse/SPARK-42779

Currently WriteDistributionAndOrderingSuite is disabled in Spark 3.5+ by #834

@kazuyukitanimura kazuyukitanimura added the bug Something isn't working label Jan 11, 2025
@kazuyukitanimura kazuyukitanimura added this to the 0.6.0 milestone Jan 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working performance
Projects
None yet
Development

No branches or pull requests

2 participants