From 47c8e9351faa21cc52e3e0c28cd118caca4fcbde Mon Sep 17 00:00:00 2001 From: cbynum202 Date: Wed, 8 Feb 2017 19:29:02 -0700 Subject: [PATCH 1/7] Add method for conversion of RDD[GenericRecord] to DataFrame --- .../spark/avro/SchemaConverters.scala | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala index aa634d4c..a21496f9 100644 --- a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala +++ b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala @@ -18,15 +18,15 @@ package com.databricks.spark.avro import java.nio.ByteBuffer import scala.collection.JavaConverters._ - import org.apache.avro.generic.GenericData.Fixed import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.{Schema, SchemaBuilder} import org.apache.avro.SchemaBuilder._ import org.apache.avro.Schema.Type._ - import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.rdd.RDD /** * This object contains method that are used to convert sparkSQL schemas to avro schemas and vice @@ -38,6 +38,26 @@ object SchemaConverters { case class SchemaType(dataType: DataType, nullable: Boolean) + /** + * Convert a [[RDD]] of [[GenericRecord]]s to a [[DataFrame]] + * + * @param rdd the [[RDD]] + * @return the [[DataFrame]] + */ + def rddToDataFrame(rdd: RDD[GenericRecord]): DataFrame = { + val spark = SparkSession + .builder + .config(rdd.sparkContext.getConf) + .getOrCreate() + + val avroSchema = rdd.take(1)(0).getSchema + val dataFrameSchema = toSqlType(avroSchema).dataType.asInstanceOf[StructType] + val converter = createConverterToSQL(avroSchema, dataFrameSchema) + val rows = rdd.map(converter(_).asInstanceOf[Row]) + + spark.createDataFrame(rows, dataFrameSchema) + } + /** * This function takes an avro schema and returns a sql schema. */ From 58349ee48632fd08fc1eb13b120a4039355f0bb4 Mon Sep 17 00:00:00 2001 From: cbynum202 Date: Thu, 9 Feb 2017 11:32:04 -0700 Subject: [PATCH 2/7] Take SparkSession as a parameter --- .../com/databricks/spark/avro/SchemaConverters.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala index a21496f9..9b4e0b7b 100644 --- a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala +++ b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala @@ -18,11 +18,13 @@ package com.databricks.spark.avro import java.nio.ByteBuffer import scala.collection.JavaConverters._ + import org.apache.avro.generic.GenericData.Fixed import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.{Schema, SchemaBuilder} import org.apache.avro.SchemaBuilder._ import org.apache.avro.Schema.Type._ + import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row, SparkSession} @@ -42,14 +44,11 @@ object SchemaConverters { * Convert a [[RDD]] of [[GenericRecord]]s to a [[DataFrame]] * * @param rdd the [[RDD]] + * @param spark the [[SparkSession]] * @return the [[DataFrame]] */ - def rddToDataFrame(rdd: RDD[GenericRecord]): DataFrame = { - val spark = SparkSession - .builder - .config(rdd.sparkContext.getConf) - .getOrCreate() - + def rddToDataFrame(rdd: RDD[GenericRecord], + spark: SparkSession): DataFrame = { val avroSchema = rdd.take(1)(0).getSchema val dataFrameSchema = toSqlType(avroSchema).dataType.asInstanceOf[StructType] val converter = createConverterToSQL(avroSchema, dataFrameSchema) From 1f03753bcf70eb4b14f48be5f163ee644f573b31 Mon Sep 17 00:00:00 2001 From: cbynum202 Date: Thu, 9 Feb 2017 11:36:48 -0700 Subject: [PATCH 3/7] Test SchemaConverters.rddToDataFrame --- .../com/databricks/spark/avro/AvroSuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala index 1b5d07aa..11a0f68e 100644 --- a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala +++ b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala @@ -30,8 +30,10 @@ import org.apache.avro.Schema.{Field, Type} import org.apache.avro.file.DataFileWriter import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed} import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord} +import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper} import org.apache.commons.io.FileUtils +import org.apache.hadoop.io.NullWritable import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.types._ import org.scalatest.{BeforeAndAfterAll, FunSuite} @@ -48,6 +50,7 @@ class AvroSuite extends FunSuite with BeforeAndAfterAll { .master("local[2]") .appName("AvroSuite") .config("spark.sql.files.maxPartitionBytes", 1024) + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate() } @@ -59,6 +62,20 @@ class AvroSuite extends FunSuite with BeforeAndAfterAll { } } + test("converting rdd to dataframe") { + val rdd = spark.sparkContext.hadoopFile[ + AvroWrapper[GenericRecord], + NullWritable, + AvroInputFormat[GenericRecord] + ](testFile).map(_._1.datum) + + val df1 = SchemaConverters.rddToDataFrame(rdd, spark) + val df2 = spark.read.avro(testFile) + + assert(df1.schema.simpleString === df2.schema.simpleString) + assert(df1.orderBy("string").collect === df2.orderBy("string").collect) + } + test("reading and writing partitioned data") { val df = spark.read.avro(episodesFile) val fields = List("title", "air_date", "doctor") From 3f53e7afbd4e6441b6da28ac384ca5e04b5d5dbf Mon Sep 17 00:00:00 2001 From: cbynum202 Date: Fri, 10 Feb 2017 07:50:01 -0700 Subject: [PATCH 4/7] Make DataFrame conversion implicit --- .../spark/avro/SchemaConverters.scala | 39 +++++++++++++------ .../com/databricks/spark/avro/AvroSuite.scala | 3 +- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala index 9b4e0b7b..ec90388d 100644 --- a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala +++ b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala @@ -41,20 +41,35 @@ object SchemaConverters { case class SchemaType(dataType: DataType, nullable: Boolean) /** - * Convert a [[RDD]] of [[GenericRecord]]s to a [[DataFrame]] + * Extensions to [[RDD]]s of [[GenericRecord]]s. * - * @param rdd the [[RDD]] - * @param spark the [[SparkSession]] - * @return the [[DataFrame]] + * @param rdd the [[RDD]] to decorate with additional functionality. */ - def rddToDataFrame(rdd: RDD[GenericRecord], - spark: SparkSession): DataFrame = { - val avroSchema = rdd.take(1)(0).getSchema - val dataFrameSchema = toSqlType(avroSchema).dataType.asInstanceOf[StructType] - val converter = createConverterToSQL(avroSchema, dataFrameSchema) - val rows = rdd.map(converter(_).asInstanceOf[Row]) - - spark.createDataFrame(rows, dataFrameSchema) + implicit class RddToDataFrame(val rdd: RDD[GenericRecord]) { + /** + * Convert a [[RDD]] of [[GenericRecord]]s to a [[DataFrame]] + * + * @return the [[DataFrame]] + */ + def toDF(): DataFrame = { + val spark = SparkSession + .builder + .config(rdd.sparkContext.getConf) + .getOrCreate() + + val avroSchema = rdd.take(1)(0).getSchema + val dataFrameSchema = toSqlType(avroSchema).dataType.asInstanceOf[StructType] + val converter = createConverterToSQL(avroSchema, dataFrameSchema) + + val rowRdd = rdd.flatMap { + converter(_) match { + case row: Row => Some(row) + case _ => None + } + } + + spark.createDataFrame(rowRdd, dataFrameSchema) + } } /** diff --git a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala index 11a0f68e..31df196f 100644 --- a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala +++ b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala @@ -25,6 +25,7 @@ import java.util.UUID import scala.collection.JavaConversions._ import com.databricks.spark.avro.SchemaConverters.IncompatibleSchemaException +import com.databricks.spark.avro.SchemaConverters.RddToDataFrame import org.apache.avro.Schema import org.apache.avro.Schema.{Field, Type} import org.apache.avro.file.DataFileWriter @@ -69,7 +70,7 @@ class AvroSuite extends FunSuite with BeforeAndAfterAll { AvroInputFormat[GenericRecord] ](testFile).map(_._1.datum) - val df1 = SchemaConverters.rddToDataFrame(rdd, spark) + val df1 = rdd.toDF val df2 = spark.read.avro(testFile) assert(df1.schema.simpleString === df2.schema.simpleString) From cc949dafddb21235874dacbba3fc05208d4840f4 Mon Sep 17 00:00:00 2001 From: cbynum202 Date: Fri, 10 Feb 2017 09:07:21 -0700 Subject: [PATCH 5/7] Handle conversion failures --- .../com/databricks/spark/avro/SchemaConverters.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala index ec90388d..25c4f24b 100644 --- a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala +++ b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala @@ -18,6 +18,7 @@ package com.databricks.spark.avro import java.nio.ByteBuffer import scala.collection.JavaConverters._ +import scala.util.Try import org.apache.avro.generic.GenericData.Fixed import org.apache.avro.generic.{GenericData, GenericRecord} @@ -61,11 +62,8 @@ object SchemaConverters { val dataFrameSchema = toSqlType(avroSchema).dataType.asInstanceOf[StructType] val converter = createConverterToSQL(avroSchema, dataFrameSchema) - val rowRdd = rdd.flatMap { - converter(_) match { - case row: Row => Some(row) - case _ => None - } + val rowRdd = rdd.flatMap { record => + Try(converter(record).asInstanceOf[Row]).toOption } spark.createDataFrame(rowRdd, dataFrameSchema) From eaabe34ac6d8ce670a0216d597f6b2a9d4051aa5 Mon Sep 17 00:00:00 2001 From: cbynum202 Date: Fri, 10 Feb 2017 13:42:54 -0700 Subject: [PATCH 6/7] Move RddToDataFrame to separate object --- .../com/databricks/spark/avro/RddUtils.scala | 57 ++++++++++ .../spark/avro/SchemaConverters.scala | 102 ++++++------------ .../com/databricks/spark/avro/AvroSuite.scala | 3 +- 3 files changed, 94 insertions(+), 68 deletions(-) create mode 100644 src/main/scala/com/databricks/spark/avro/RddUtils.scala diff --git a/src/main/scala/com/databricks/spark/avro/RddUtils.scala b/src/main/scala/com/databricks/spark/avro/RddUtils.scala new file mode 100644 index 00000000..12495bc1 --- /dev/null +++ b/src/main/scala/com/databricks/spark/avro/RddUtils.scala @@ -0,0 +1,57 @@ +/* + * Copyright 2014 Databricks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.databricks.spark.avro + +import SchemaConverters._ +import scala.util.Try +import org.apache.avro.generic.GenericRecord +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.rdd.RDD + +/** + * [[RDD]] implicits. + */ +object RddUtils { + /** + * Extensions to [[RDD]]s of [[GenericRecord]]s. + * + * @param rdd the [[RDD]] to decorate with additional functionality. + */ + implicit class RddToDataFrame(val rdd: RDD[GenericRecord]) { + /** + * Convert a [[RDD]] of [[GenericRecord]]s to a [[DataFrame]] + * + * @return the [[DataFrame]] + */ + def toDF(): DataFrame = { + val spark = SparkSession + .builder + .config(rdd.sparkContext.getConf) + .getOrCreate() + + val avroSchema = rdd.take(1)(0).getSchema + val dataFrameSchema = toSqlType(avroSchema).dataType.asInstanceOf[StructType] + val converter = createConverterToSQL(avroSchema, dataFrameSchema) + + val rowRdd = rdd.flatMap { record => + Try(converter(record).asInstanceOf[Row]).toOption + } + + spark.createDataFrame(rowRdd, dataFrameSchema) + } + } +} diff --git a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala index 25c4f24b..db3798bd 100644 --- a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala +++ b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala @@ -18,7 +18,6 @@ package com.databricks.spark.avro import java.nio.ByteBuffer import scala.collection.JavaConverters._ -import scala.util.Try import org.apache.avro.generic.GenericData.Fixed import org.apache.avro.generic.{GenericData, GenericRecord} @@ -28,13 +27,11 @@ import org.apache.avro.Schema.Type._ import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, Row, SparkSession} -import org.apache.spark.rdd.RDD /** - * This object contains method that are used to convert sparkSQL schemas to avro schemas and vice - * versa. - */ + * This object contains method that are used to convert sparkSQL schemas to avro schemas and vice + * versa. + */ object SchemaConverters { class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex) @@ -42,37 +39,8 @@ object SchemaConverters { case class SchemaType(dataType: DataType, nullable: Boolean) /** - * Extensions to [[RDD]]s of [[GenericRecord]]s. - * - * @param rdd the [[RDD]] to decorate with additional functionality. + * This function takes an avro schema and returns a sql schema. */ - implicit class RddToDataFrame(val rdd: RDD[GenericRecord]) { - /** - * Convert a [[RDD]] of [[GenericRecord]]s to a [[DataFrame]] - * - * @return the [[DataFrame]] - */ - def toDF(): DataFrame = { - val spark = SparkSession - .builder - .config(rdd.sparkContext.getConf) - .getOrCreate() - - val avroSchema = rdd.take(1)(0).getSchema - val dataFrameSchema = toSqlType(avroSchema).dataType.asInstanceOf[StructType] - val converter = createConverterToSQL(avroSchema, dataFrameSchema) - - val rowRdd = rdd.flatMap { record => - Try(converter(record).asInstanceOf[Row]).toOption - } - - spark.createDataFrame(rowRdd, dataFrameSchema) - } - } - - /** - * This function takes an avro schema and returns a sql schema. - */ def toSqlType(avroSchema: Schema): SchemaType = { avroSchema.getType match { case INT => SchemaType(IntegerType, nullable = false) @@ -139,13 +107,13 @@ object SchemaConverters { } /** - * This function converts sparkSQL StructType into avro schema. This method uses two other - * converter methods in order to do the conversion. - */ + * This function converts sparkSQL StructType into avro schema. This method uses two other + * converter methods in order to do the conversion. + */ def convertStructToAvro[T]( - structType: StructType, - schemaBuilder: RecordBuilder[T], - recordNamespace: String): T = { + structType: StructType, + schemaBuilder: RecordBuilder[T], + recordNamespace: String): T = { val fieldsAssembler: FieldAssembler[T] = schemaBuilder.fields() structType.fields.foreach { field => val newField = fieldsAssembler.name(field.name).`type`() @@ -162,19 +130,19 @@ object SchemaConverters { } /** - * Returns a converter function to convert row in avro format to GenericRow of catalyst. - * - * @param sourceAvroSchema Source schema before conversion inferred from avro file by passed in - * by user. - * @param targetSqlType Target catalyst sql type after the conversion. - * @return returns a converter function to convert row in avro format to GenericRow of catalyst. - */ + * Returns a converter function to convert row in avro format to GenericRow of catalyst. + * + * @param sourceAvroSchema Source schema before conversion inferred from avro file by passed in + * by user. + * @param targetSqlType Target catalyst sql type after the conversion. + * @return returns a converter function to convert row in avro format to GenericRow of catalyst. + */ private[avro] def createConverterToSQL( - sourceAvroSchema: Schema, - targetSqlType: DataType): AnyRef => AnyRef = { + sourceAvroSchema: Schema, + targetSqlType: DataType): AnyRef => AnyRef = { def createConverter(avroSchema: Schema, - sqlType: DataType, path: List[String]): AnyRef => AnyRef = { + sqlType: DataType, path: List[String]): AnyRef => AnyRef = { val avroType = avroSchema.getType (sqlType, avroType) match { // Avro strings are in Utf8, so we have to call toString on them @@ -340,14 +308,14 @@ object SchemaConverters { } /** - * This function is used to convert some sparkSQL type to avro type. Note that this function won't - * be used to construct fields of avro record (convertFieldTypeToAvro is used for that). - */ + * This function is used to convert some sparkSQL type to avro type. Note that this function won't + * be used to construct fields of avro record (convertFieldTypeToAvro is used for that). + */ private def convertTypeToAvro[T]( - dataType: DataType, - schemaBuilder: BaseTypeBuilder[T], - structName: String, - recordNamespace: String): T = { + dataType: DataType, + schemaBuilder: BaseTypeBuilder[T], + structName: String, + recordNamespace: String): T = { dataType match { case ByteType => schemaBuilder.intType() case ShortType => schemaBuilder.intType() @@ -382,15 +350,15 @@ object SchemaConverters { } /** - * This function is used to construct fields of the avro record, where schema of the field is - * specified by avro representation of dataType. Since builders for record fields are different - * from those for everything else, we have to use a separate method. - */ + * This function is used to construct fields of the avro record, where schema of the field is + * specified by avro representation of dataType. Since builders for record fields are different + * from those for everything else, we have to use a separate method. + */ private def convertFieldTypeToAvro[T]( - dataType: DataType, - newFieldBuilder: BaseFieldTypeBuilder[T], - structName: String, - recordNamespace: String): FieldDefault[T, _] = { + dataType: DataType, + newFieldBuilder: BaseFieldTypeBuilder[T], + structName: String, + recordNamespace: String): FieldDefault[T, _] = { dataType match { case ByteType => newFieldBuilder.intType() case ShortType => newFieldBuilder.intType() diff --git a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala index 31df196f..b3a0c97f 100644 --- a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala +++ b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala @@ -25,7 +25,6 @@ import java.util.UUID import scala.collection.JavaConversions._ import com.databricks.spark.avro.SchemaConverters.IncompatibleSchemaException -import com.databricks.spark.avro.SchemaConverters.RddToDataFrame import org.apache.avro.Schema import org.apache.avro.Schema.{Field, Type} import org.apache.avro.file.DataFileWriter @@ -70,6 +69,8 @@ class AvroSuite extends FunSuite with BeforeAndAfterAll { AvroInputFormat[GenericRecord] ](testFile).map(_._1.datum) + import RddUtils.RddToDataFrame + val df1 = rdd.toDF val df2 = spark.read.avro(testFile) From 96caad206a3993556621fffb5b96c78121da7533 Mon Sep 17 00:00:00 2001 From: cbynum202 Date: Fri, 10 Feb 2017 13:45:48 -0700 Subject: [PATCH 7/7] Revert to unchanged --- .../spark/avro/SchemaConverters.scala | 72 +++++++++---------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala index db3798bd..aa634d4c 100644 --- a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala +++ b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala @@ -29,9 +29,9 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.types._ /** - * This object contains method that are used to convert sparkSQL schemas to avro schemas and vice - * versa. - */ + * This object contains method that are used to convert sparkSQL schemas to avro schemas and vice + * versa. + */ object SchemaConverters { class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex) @@ -39,8 +39,8 @@ object SchemaConverters { case class SchemaType(dataType: DataType, nullable: Boolean) /** - * This function takes an avro schema and returns a sql schema. - */ + * This function takes an avro schema and returns a sql schema. + */ def toSqlType(avroSchema: Schema): SchemaType = { avroSchema.getType match { case INT => SchemaType(IntegerType, nullable = false) @@ -107,13 +107,13 @@ object SchemaConverters { } /** - * This function converts sparkSQL StructType into avro schema. This method uses two other - * converter methods in order to do the conversion. - */ + * This function converts sparkSQL StructType into avro schema. This method uses two other + * converter methods in order to do the conversion. + */ def convertStructToAvro[T]( - structType: StructType, - schemaBuilder: RecordBuilder[T], - recordNamespace: String): T = { + structType: StructType, + schemaBuilder: RecordBuilder[T], + recordNamespace: String): T = { val fieldsAssembler: FieldAssembler[T] = schemaBuilder.fields() structType.fields.foreach { field => val newField = fieldsAssembler.name(field.name).`type`() @@ -130,19 +130,19 @@ object SchemaConverters { } /** - * Returns a converter function to convert row in avro format to GenericRow of catalyst. - * - * @param sourceAvroSchema Source schema before conversion inferred from avro file by passed in - * by user. - * @param targetSqlType Target catalyst sql type after the conversion. - * @return returns a converter function to convert row in avro format to GenericRow of catalyst. - */ + * Returns a converter function to convert row in avro format to GenericRow of catalyst. + * + * @param sourceAvroSchema Source schema before conversion inferred from avro file by passed in + * by user. + * @param targetSqlType Target catalyst sql type after the conversion. + * @return returns a converter function to convert row in avro format to GenericRow of catalyst. + */ private[avro] def createConverterToSQL( - sourceAvroSchema: Schema, - targetSqlType: DataType): AnyRef => AnyRef = { + sourceAvroSchema: Schema, + targetSqlType: DataType): AnyRef => AnyRef = { def createConverter(avroSchema: Schema, - sqlType: DataType, path: List[String]): AnyRef => AnyRef = { + sqlType: DataType, path: List[String]): AnyRef => AnyRef = { val avroType = avroSchema.getType (sqlType, avroType) match { // Avro strings are in Utf8, so we have to call toString on them @@ -308,14 +308,14 @@ object SchemaConverters { } /** - * This function is used to convert some sparkSQL type to avro type. Note that this function won't - * be used to construct fields of avro record (convertFieldTypeToAvro is used for that). - */ + * This function is used to convert some sparkSQL type to avro type. Note that this function won't + * be used to construct fields of avro record (convertFieldTypeToAvro is used for that). + */ private def convertTypeToAvro[T]( - dataType: DataType, - schemaBuilder: BaseTypeBuilder[T], - structName: String, - recordNamespace: String): T = { + dataType: DataType, + schemaBuilder: BaseTypeBuilder[T], + structName: String, + recordNamespace: String): T = { dataType match { case ByteType => schemaBuilder.intType() case ShortType => schemaBuilder.intType() @@ -350,15 +350,15 @@ object SchemaConverters { } /** - * This function is used to construct fields of the avro record, where schema of the field is - * specified by avro representation of dataType. Since builders for record fields are different - * from those for everything else, we have to use a separate method. - */ + * This function is used to construct fields of the avro record, where schema of the field is + * specified by avro representation of dataType. Since builders for record fields are different + * from those for everything else, we have to use a separate method. + */ private def convertFieldTypeToAvro[T]( - dataType: DataType, - newFieldBuilder: BaseFieldTypeBuilder[T], - structName: String, - recordNamespace: String): FieldDefault[T, _] = { + dataType: DataType, + newFieldBuilder: BaseFieldTypeBuilder[T], + structName: String, + recordNamespace: String): FieldDefault[T, _] = { dataType match { case ByteType => newFieldBuilder.intType() case ShortType => newFieldBuilder.intType()