Skip to content

Commit

Permalink
Check datasource is v1
Browse files Browse the repository at this point in the history
  • Loading branch information
Chong Gao committed Dec 10, 2024
1 parent 5e972d6 commit c6fa249
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,22 +133,24 @@ object HybridFileSourceScanExecMeta {
/**
* Check if runtimes are satisfied, including:
* - Spark distribution is not CDH or Databricks
* - Hybrid jar in the plasspath
* - Hybrid jar in the classpath
* - Java version is 1.8
* - Scala version is 2.12
* - Parquet V1 data source
*/
def checkRuntimes(): Unit = {
checkNotRuningCDHorDatabricks()
def checkRuntimes(v1DataSourceList: String): Unit = {
checkNotRunningCDHorDatabricks()
checkHybridJarInClassPath()
checkJavaVersion()
checkScalaVersion()
checkV1Datasource(v1DataSourceList)
}

/**
* Check Spark distribution is not CDH or Databricks,
* report error if it is
*/
def checkNotRuningCDHorDatabricks(): Unit = {
private def checkNotRunningCDHorDatabricks(): Unit = {
if (VersionUtils.isCloudera || VersionUtils.isDataBricks) {
throw new RuntimeException("Hybrid feature does not support Cloudera/Databricks " +
"Spark releases, Please disable Hybrid feature by setting " +
Expand All @@ -160,7 +162,7 @@ object HybridFileSourceScanExecMeta {
* Check if the Hybrid jar is in the classpath,
* report error if not
*/
def checkHybridJarInClassPath(): Unit = {
private def checkHybridJarInClassPath(): Unit = {
try {
Class.forName(HYBRID_JAR_PLUGIN_CLASS_NAME)
} catch {
Expand All @@ -175,7 +177,7 @@ object HybridFileSourceScanExecMeta {
* Hybrid feature only supports 1.8 Java version,
* report error if not
*/
def checkJavaVersion(): Unit = {
private def checkJavaVersion(): Unit = {
val javaVersion = System.getProperty("java.version")
if (javaVersion == null) {
throw new RuntimeException("Hybrid feature: Can not read java.version, get null")
Expand All @@ -192,11 +194,23 @@ object HybridFileSourceScanExecMeta {
* Hybrid feature only supports Scala 2.12 version,
* report error if not
*/
def checkScalaVersion(): Unit = {
private def checkScalaVersion(): Unit = {
val scalaVersion = scala.util.Properties.versionString
if (!scalaVersion.startsWith("version 2.12")) {
throw new RuntimeException(s"Hybrid feature only supports Scala 2.12 version, " +
s"but got $scalaVersion")
}
}

/**
* Hybrid feature only supports v1 datasource,
* report error if it's not satisfied
*/
private def checkV1Datasource(v1SourceList: String): Unit = {
// check spark.sql.sources.useV1SourceList contains parquet
if(!v1SourceList.contains("parquet")) {
throw new RuntimeException(s"Hybrid feature only supports v1 datasource, " +
s"please set spark.sql.sources.useV1SourceList=parquet")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ object ScanExecShims {
// TODO: HybridScan supports DataSourceV2
if (HybridFileSourceScanExecMeta.useHybridScan(conf, fsse)) {
// Check if runtimes are satisfied: Spark is not Databricks or CDH; Java version is 1.8;
// Scala version is 2.12; Hybrid jar is in the classpath
HybridFileSourceScanExecMeta.checkRuntimes()
// Scala version is 2.12; Hybrid jar is in the classpath; parquet v1 datasource
val sqlConf = fsse.relation.sparkSession.sessionState.conf
val v1SourceList = sqlConf.getConfString("spark.sql.sources.useV1SourceList", "")
HybridFileSourceScanExecMeta.checkRuntimes(v1SourceList)
new HybridFileSourceScanExecMeta(fsse, conf, p, r)
} else {
new FileSourceScanExecMeta(fsse, conf, p, r)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ object ScanExecShims {
// TODO: HybridScan supports DataSourceV2
if (HybridFileSourceScanExecMeta.useHybridScan(conf, fsse)) {
// Check if runtimes are satisfied: Spark is not Databricks or CDH; Java version is 1.8;
// Scala version is 2.12; Hybrid jar is in the classpath
HybridFileSourceScanExecMeta.checkRuntimes()
// Scala version is 2.12; Hybrid jar is in the classpath; parquet v1 datasource
val sqlConf = fsse.relation.sparkSession.sessionState.conf
val v1SourceList = sqlConf.getConfString("spark.sql.sources.useV1SourceList", "")
HybridFileSourceScanExecMeta.checkRuntimes(v1SourceList)
new HybridFileSourceScanExecMeta(fsse, conf, p, r)
} else {
new FileSourceScanExecMeta(fsse, conf, p, r)
Expand Down

0 comments on commit c6fa249

Please sign in to comment.