Skip to content

Commit

Permalink
Add support for dumping write data to try and reproduce error cases
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <[email protected]>
  • Loading branch information
revans2 committed Dec 11, 2024
1 parent e22a7ca commit 0e27736
Show file tree
Hide file tree
Showing 27 changed files with 216 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -219,8 +219,9 @@ class GpuOptimisticTransaction
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers,
options = options,
rapidsConf.stableSort,
rapidsConf.concurrentWriterPartitionFlushSize)
useStableSort = rapidsConf.stableSort,
concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize,
baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -219,8 +219,9 @@ class GpuOptimisticTransaction
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers,
options = options,
rapidsConf.stableSort,
rapidsConf.concurrentWriterPartitionFlushSize)
useStableSort = rapidsConf.stableSort,
concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize,
baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -241,8 +241,9 @@ class GpuOptimisticTransaction
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers,
options = options,
rapidsConf.stableSort,
rapidsConf.concurrentWriterPartitionFlushSize)
useStableSort = rapidsConf.stableSort,
concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize,
baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -241,8 +241,9 @@ class GpuOptimisticTransaction
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers,
options = options,
rapidsConf.stableSort,
rapidsConf.concurrentWriterPartitionFlushSize)
useStableSort = rapidsConf.stableSort,
concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize,
baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -243,8 +243,9 @@ class GpuOptimisticTransaction
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers,
options = options,
rapidsConf.stableSort,
rapidsConf.concurrentWriterPartitionFlushSize)
useStableSort = rapidsConf.stableSort,
concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize,
baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -261,8 +261,9 @@ class GpuOptimisticTransaction(
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq ++ identityTracker.toSeq ++ statsTrackers,
options = options,
rapidsConf.stableSort,
rapidsConf.concurrentWriterPartitionFlushSize)
useStableSort = rapidsConf.stableSort,
concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize,
baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,9 @@ class GpuOptimisticTransaction(
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq ++ identityTracker.toSeq ++ statsTrackers,
options = options,
rapidsConf.stableSort,
rapidsConf.concurrentWriterPartitionFlushSize)
useStableSort = rapidsConf.stableSort,
concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize,
baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -259,8 +259,9 @@ class GpuOptimisticTransaction(
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq ++ identityTracker.toSeq ++ statsTrackers,
options = options,
rapidsConf.stableSort,
rapidsConf.concurrentWriterPartitionFlushSize)
useStableSort = rapidsConf.stableSort,
concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize,
baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,9 @@ class GpuOptimisticTransaction(
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq ++ identityTracker.toSeq ++ statsTrackers,
options = options,
rapidsConf.stableSort,
rapidsConf.concurrentWriterPartitionFlushSize)
useStableSort = rapidsConf.stableSort,
concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize,
baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package com.nvidia.spark.rapids

import java.io.OutputStream
import java.io.{BufferedOutputStream, DataOutputStream, OutputStream}

import scala.collection.mutable

import ai.rapids.cudf.{HostBufferConsumer, HostMemoryBuffer, NvtxColor, NvtxRange, TableWriter}
import ai.rapids.cudf.{HostBufferConsumer, HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange, TableWriter}
import com.nvidia.spark.Retryable
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
Expand Down Expand Up @@ -61,7 +61,8 @@ abstract class ColumnarOutputWriterFactory extends Serializable {
def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): ColumnarOutputWriter
context: TaskAttemptContext,
debugOutputPath: Option[String]): ColumnarOutputWriter
}

/**
Expand All @@ -73,9 +74,43 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext,
dataSchema: StructType,
rangeName: String,
includeRetry: Boolean,
debugDumpPath: Option[String],
holdGpuBetweenBatches: Boolean = false) extends HostBufferConsumer with Logging {

protected val tableWriter: TableWriter
private lazy val debugDumpOutputStream: Option[OutputStream] = try {
debugDumpPath.map { path =>
val tc = TaskContext.get()
logWarning(s"DEBUG FILE OUTPUT $rangeName FOR " +
s"STAGE ${tc.stageId()} TASK ${tc.taskAttemptId()} is $path")
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(conf)
new DataOutputStream(new BufferedOutputStream(fs.create(hadoopPath, false)))
}
} catch {
case e: Exception =>
logError(s"Could Not Write Debug Table $debugDumpPath", e)
None
}

/**
* Write out a debug batch to the debug output stream if it is configured.
* If it is not configured, this is a noop. If an exception happens the exception
* is ignored, but it is logged.
*/
private def debugWriteBatch(batch: ColumnarBatch): Unit = {
debugDumpOutputStream.foreach { output =>
try {
withResource(GpuColumnVector.from(batch)) { table =>
JCudfSerialization.writeToStream(table, output, 0, table.getRowCount)
}
output.flush()
} catch {
case t: Throwable =>
logError(s"Could Not Write Debug Table $debugDumpPath", t)
}
}
}

protected val conf: Configuration = context.getConfiguration

Expand Down Expand Up @@ -219,6 +254,7 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext,
// where corrupt files can be written if nothing is encoded via the writer.
anythingWritten = true

debugWriteBatch(batch)
// tableWriter.write() serializes the table into the HostMemoryBuffer, and buffers it
// by calling handleBuffer() on the ColumnarOutputWriter. It may not write to the
// output stream just yet.
Expand All @@ -239,6 +275,9 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext,
GpuSemaphore.releaseIfNecessary(TaskContext.get())
writeBufferedData()
outputStream.close()
debugDumpOutputStream.foreach { os =>
os.close()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ final class InsertIntoHadoopFsRelationCommandMeta(
cmd.fileIndex,
cmd.outputColumnNames,
conf.stableSort,
conf.concurrentWriterPartitionFlushSize)
conf.concurrentWriterPartitionFlushSize,
conf.outputDebugDumpPrefix)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,11 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging {
override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): ColumnarOutputWriter = {
context: TaskAttemptContext,
debugOutputPath: Option[String]): ColumnarOutputWriter = {
new GpuParquetWriter(path, dataSchema, compressionType, outputTimestampType.toString,
dateTimeRebaseMode, timestampRebaseMode, context, parquetFieldIdWriteEnabled,
holdGpuBetweenBatches)
holdGpuBetweenBatches, debugOutputPath)
}

override def getFileExtension(context: TaskAttemptContext): String = {
Expand All @@ -306,8 +307,10 @@ class GpuParquetWriter(
timestampRebaseMode: DateTimeRebaseMode,
context: TaskAttemptContext,
parquetFieldIdEnabled: Boolean,
holdGpuBetweenBatches: Boolean)
extends ColumnarOutputWriter(context, dataSchema, "Parquet", true, holdGpuBetweenBatches) {
holdGpuBetweenBatches: Boolean,
debugDumpPath: Option[String])
extends ColumnarOutputWriter(context, dataSchema, "Parquet", true,
debugDumpPath, holdGpuBetweenBatches) {
override def throwIfRebaseNeededInExceptionMode(batch: ColumnarBatch): Unit = {
val cols = GpuColumnVector.extractBases(batch)
cols.foreach { col =>
Expand Down
11 changes: 11 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1682,6 +1682,15 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.booleanConf
.createWithDefault(false)

val OUTPUT_DEBUG_DUMP_PREFIX = conf("spark.rapids.sql.output.debug.dumpPrefix")
.doc("A path prefix where data that is intended to be written out as the result " +
"of a query should be dumped for debugging. The format of this is based on " +
"JCudfSerialization and is trying to capture the underlying table so that if " +
"there are errors in the output format we can try to recreate it.")
.internal()
.stringConf
.createOptional

val PARQUET_DEBUG_DUMP_PREFIX = conf("spark.rapids.sql.parquet.debug.dumpPrefix")
.doc("A path prefix where Parquet split file data is dumped for debugging.")
.internal()
Expand Down Expand Up @@ -2883,6 +2892,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val maxGpuColumnSizeBytes: Long = get(MAX_GPU_COLUMN_SIZE_BYTES)

lazy val outputDebugDumpPrefix: Option[String] = get(OUTPUT_DEBUG_DUMP_PREFIX)

lazy val parquetDebugDumpPrefix: Option[String] = get(PARQUET_DEBUG_DUMP_PREFIX)

lazy val parquetDebugDumpAlways: Boolean = get(PARQUET_DEBUG_DUMP_ALWAYS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,18 @@ class GpuHiveParquetFileFormat(compType: CompressionType) extends ColumnarFileFo

override def newInstance(path: String,
dataSchema: StructType,
context: TaskAttemptContext): ColumnarOutputWriter = {
new GpuHiveParquetWriter(path, dataSchema, context, compressionType)
context: TaskAttemptContext,
debugOutputPath: Option[String]): ColumnarOutputWriter = {
new GpuHiveParquetWriter(path, dataSchema, context, compressionType, debugOutputPath)
}
}
}
}

class GpuHiveParquetWriter(override val path: String, dataSchema: StructType,
context: TaskAttemptContext, compType: CompressionType)
extends ColumnarOutputWriter(context, dataSchema, "HiveParquet", true) {
context: TaskAttemptContext, compType: CompressionType,
debugOutputPath: Option[String])
extends ColumnarOutputWriter(context, dataSchema, "HiveParquet", true, debugOutputPath) {

override protected val tableWriter: CudfTableWriter = {
val optionsBuilder = SchemaUtils
Expand All @@ -217,17 +219,19 @@ class GpuHiveTextFileFormat extends ColumnarFileFormat with Logging {

override def newInstance(path: String,
dataSchema: StructType,
context: TaskAttemptContext): ColumnarOutputWriter = {
new GpuHiveTextWriter(path, dataSchema, context)
context: TaskAttemptContext,
debugOutputPath: Option[String]): ColumnarOutputWriter = {
new GpuHiveTextWriter(path, dataSchema, context, debugOutputPath)
}
}
}
}

class GpuHiveTextWriter(override val path: String,
dataSchema: StructType,
context: TaskAttemptContext)
extends ColumnarOutputWriter(context, dataSchema, "HiveText", false) {
context: TaskAttemptContext,
debugOutputPath: Option[String])
extends ColumnarOutputWriter(context, dataSchema, "HiveText", false, debugOutputPath) {

/**
* This reformats columns, to iron out inconsistencies between
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ private[hive] trait GpuSaveAsHiveFile extends GpuDataWritingCommand with SaveAsH
fileFormat: ColumnarFileFormat,
outputLocation: String,
forceHiveHashForBucketing: Boolean,
baseDebugOutputPath: Option[String],
customPartitionLocations: Map[TablePartitionSpec,String] = Map.empty,
partitionAttributes: Seq[Attribute] = Nil,
bucketSpec: Option[BucketSpec] = None,
Expand All @@ -67,7 +68,8 @@ private[hive] trait GpuSaveAsHiveFile extends GpuDataWritingCommand with SaveAsH
options = options,
useStableSort = false, // TODO: Fetch from RapidsConf.
forceHiveHashForBucketing = forceHiveHashForBucketing,
concurrentWriterPartitionFlushSize = 0L // TODO: Fetch from RapidsConf.
concurrentWriterPartitionFlushSize = 0L, // TODO: Fetch from RapidsConf.
baseDebugOutputPath = baseDebugOutputPath
)
}
}
Loading

0 comments on commit 0e27736

Please sign in to comment.