Skip to content
This repository has been archived by the owner on Dec 20, 2018. It is now read-only.

Getting user defined types working #239

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.databricks.spark.avro

import org.apache.hadoop.mapreduce.TaskAttemptContext

import org.apache.spark.avro.AvroOutputWriter
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.types.StructType

Expand Down
3 changes: 1 addition & 2 deletions src/main/scala/com/databricks/spark/avro/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.net.URI
import java.util.zip.Deflater

import scala.util.control.NonFatal

import com.databricks.spark.avro.DefaultSource.{AvroSchema, IgnoreFilesWithoutExtensionProperty, SerializableConfiguration}
import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
import com.esotericsoftware.kryo.io.{Input, Output}
Expand All @@ -34,8 +33,8 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.Job
import org.slf4j.LoggerFactory

import org.apache.spark.TaskContext
import org.apache.spark.avro.SchemaConverters
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,29 @@
* limitations under the License.
*/

package com.databricks.spark.avro
package org.apache.spark.avro
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to reference UserDefinedType we have to change the package name.


import java.io.{IOException, OutputStream}
import java.nio.ByteBuffer
import java.sql.Timestamp
import java.sql.Date
import java.sql.{Date, Timestamp}
import java.util.HashMap

import org.apache.hadoop.fs.Path
import scala.collection.immutable.Map

import org.apache.avro.generic.GenericData.Record
import org.apache.avro.generic.GenericRecord
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyOutputFormat
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext, TaskAttemptID}

import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.types._

import scala.collection.immutable.Map

// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
private[avro] class AvroOutputWriter(
class AvroOutputWriter(
path: String,
context: TaskAttemptContext,
schema: StructType,
Expand Down Expand Up @@ -87,8 +85,8 @@ private[avro] class AvroOutputWriter(
case bytes: Array[Byte] => ByteBuffer.wrap(bytes)
}
case ByteType | ShortType | IntegerType | LongType |
FloatType | DoubleType | StringType | BooleanType => identity
case _: DecimalType => (item: Any) => if (item == null) null else item.toString
FloatType | DoubleType | BooleanType => identity
case _: DecimalType | StringType => (item: Any) => if (item == null) null else item.toString
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My use case for this is serializing Enum[_] types. I'm doing this as strings on the backend. However, in my tests it was still presenting the native enum type to the native avro reader. By moving the StringType out and explicitly calling .toString on it I'm able to get around this issue assuming the UserDefinedType for my enums is StringType.

case TimestampType => (item: Any) =>
if (item == null) null else item.asInstanceOf[Timestamp].getTime
case DateType => (item: Any) =>
Expand Down Expand Up @@ -145,6 +143,7 @@ private[avro] class AvroOutputWriter(
record
}
}
case t: UserDefinedType[_] => createConverterToAvro(t.sqlType, structName, recordNamespace)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.databricks.spark.avro
package org.apache.spark.avro

import java.nio.ByteBuffer

import scala.collection.JavaConverters._

import org.apache.avro.Schema.Type._
import org.apache.avro.SchemaBuilder._
import org.apache.avro.generic.GenericData.Fixed
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.avro.SchemaBuilder._
import org.apache.avro.Schema.Type._

import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types._

import scala.collection.JavaConverters._

/**
* This object contains method that are used to convert sparkSQL schemas to avro schemas and vice
* versa.
Expand Down Expand Up @@ -137,7 +136,7 @@ object SchemaConverters {
* @param targetSqlType Target catalyst sql type after the conversion.
* @return returns a converter function to convert row in avro format to GenericRow of catalyst.
*/
private[avro] def createConverterToSQL(
def createConverterToSQL(
sourceAvroSchema: Schema,
targetSqlType: DataType): AnyRef => AnyRef = {

Expand Down Expand Up @@ -346,6 +345,13 @@ object SchemaConverters {
schemaBuilder.record(structName).namespace(recordNamespace),
recordNamespace)

case t: UserDefinedType[_] => convertTypeToAvro(
t.sqlType,
schemaBuilder,
structName,
recordNamespace
)

case other => throw new IncompatibleSchemaException(s"Unexpected type $dataType.")
}
}
Expand Down Expand Up @@ -390,6 +396,13 @@ object SchemaConverters {
newFieldBuilder.record(structName).namespace(recordNamespace),
recordNamespace)

case t: UserDefinedType[_] => convertFieldTypeToAvro(
t.sqlType,
newFieldBuilder,
structName,
recordNamespace
)

case other => throw new IncompatibleSchemaException(s"Unexpected type $dataType.")
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/com/databricks/spark/avro/AvroSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import com.databricks.spark.avro.SchemaConverters.IncompatibleSchemaException
import org.apache.spark.avro.SchemaConverters.IncompatibleSchemaException

class AvroSuite extends FunSuite with BeforeAndAfterAll {
val episodesFile = "src/test/resources/episodes.avro"
Expand Down