Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27388][SQL] encoder for objects defined by properties (ie. Avro) #24299

Closed
wants to merge 10 commits into from
Closed

[SPARK-27388][SQL] encoder for objects defined by properties (ie. Avro) #24299

wants to merge 10 commits into from

Conversation

mazeboard
Copy link

@mazeboard mazeboard commented Apr 4, 2019

What changes were proposed in this pull request?

This PR adds expression encoders for beans, java.util.List, java.util.Map and java enum. This addition make it possible to encode Avro objects.

The Beans are objects defined by properties; A property is defined by a setter and a getter functions where the getter return type is equal to the setter unique parameter type and the getter and setter functions have the same name; if the getter name is prefixed by "get" then the setter name must be prefixed by "set"; see tests for bean examples.

Avro objects are beans and thus we can create an expression encoder for avro objects as follows:

implicit val exprEncoder = ExpressionEncoder[Foo]()

All avro types, including fixed types, and excluding complex union types, are suppported by this addition.

The avro fixed types are beans with exactly one property: bytes.

Currently complex avro unions are not supported because a complex union is declared as Object and there is no expression encoder for Object type (need to use a custom serializer like kryo for example)

How was this patch tested?

currently only 1 encodeDecodeTest was added to ExpressionEncoderSuite; the test uses an avro object with all accepted types (ie. array, map, fixed, bytes, enum, ...).

    AvroExample1.newBuilder()
      .setMyarray(List("Foo", "Bar").asJava)
      .setMyboolean(true)
      .setMybytes(java.nio.ByteBuffer.wrap("MyBytes".getBytes()))
      .setMydouble(2.5)
      .setMyfixed(new Magic("magic".getBytes))
      .setMyfloat(25.0F)
      .setMyint(100)
      .setMylong(10L)
      .setMystring("hello")
      .setMymap(Map(
        "foo" -> new java.lang.Integer(1),
        "bar" -> new java.lang.Integer(2)).asJava)
      .setMymoney(Money.newBuilder().setAmount(100.0F).setCurrency(Currency.EUR).build())
      .build(),
    "Avro encoder with map, array and fixed types")(
    ExpressionEncoder[AvroExample1])

with this addition avro fixed types are encoded correctly

Currently the avro complex unions are not supported
@mazeboard mazeboard changed the title [SPARK-27388][SQL] expression encoder for avro [SPARK-27388][SQL] expression encoder for objects defined by properties Apr 5, 2019
@srowen
Copy link
Member

srowen commented Apr 8, 2019

Overall: isn't this what the Encoder for Java Beans is for and already supports? I am not sure it obvious to me this should go in ScalaReflection.
Separately I don't think it's worth further supporting JavaBean properties. That's pretty old and rare, I think.

@mazeboard
Copy link
Author

mazeboard commented Apr 8, 2019 via email

@mazeboard
Copy link
Author

@srowen This PR is mainly about adding support to Avro objects and make our code easier

@gatorsmile
Copy link
Member

cc @gengliangwang @cloud-fan

@bdrillard
Copy link

bdrillard commented Apr 10, 2019

Many thanks to @mazeboard for bringing this PR to my attention and for taking a crack at the problem of typed Avro Datasets in Spark!

I feel it's a matter of due diligence for me to point to another PR supporting Avro-typed Datasets in Spark, namely #22878, which (full transparency) is the work of @xuanyuanking and myself. The approaches taken here and there are different, and it would seem so are the coverages of the Avro spec. I'd like to take the time to compare/contrast.

I am more qualified to speak to the approach and capabilities introduced #22878 (which has a history going back to Spark-Avro), and so if I misread this PR in the process, @mazeboard, please do correct my understanding.

#24299

I'll summarize my reading of this PR's approach: to extend the existing Java Bean Encoder functionality to more broadly support tricky types generated by Avro SpecificRecord classes, especially Bytes, which Avro doesn't allow access to via typical getters/setters, and Enums, which have to be encoded as Strings and therefore have to be “discovered” via their class.

One stated limitation is complex Union types (unions of two types where Null is not one of the types, or having more than two types), which Avro will represent as nested Object. It’s stated that there isn’t an Encoder for Object type (I think a serializer/deserializer could perhaps be made using the Object and NewInstance, but I can’t say how it would work in this Reflection-driven approach). It’s said things could get tough with how Avro serializes those objects when things like Kyro are used. I can see that as a limitation to this Bean-based approach even if the DataTypes and Expressions were sorted out.

Correct my understanding if this is wrong, but because this approach is based on Reflection over the types of the generated SpecificRecord class (viewing the record as a Bean), it would not be (in a first-class sense) “aware” of the AvroSchema which generates the Class. I think this distinction may matter, and I’ll discuss it more below.

#22878

To summarize the approach of #22878: creates an AvroEncoder which generates a pair of serializer/deserializer Expressions based on the AvroSchema (whether as gleaned from a SpecificRecord class, or as passed directly as a JSON string).

Its work stands on the Avro efforts that were recently folded into Spark-proper from the (now deprecated) Databricks/Spark-Avro project. However, to date, Spark-Avro does not provide support for a first-class Encoder, along with the efficiency and Strong/Static typing that entails. Being based on Spark-Avro however, #22878 does gain benefits of an AvroSchema driven approach. To avoid confusion, I’m going to refer to this recently folded-in functionality as “Spark-Avro”, referencing this portion of the current Spark project, rather than to the former deprecated project.

AvroSchema

Because #22878 generates its Encoder through the AvroSchema, we gain a couple things:

  1. An isomorphism between the Dataset Schema and the source of truth for the structure of any Avro, namely, its AvroSchema. The important thing here is we can generate an Encoder both from the Class and the AvroSchema in its different representations, like JSON, which opens support for Schema evolution, and use of Schema stores.
  2. Direct support Encoders of any GenericRecord, where an AvroSchema is known, but no SpecificRecord class can be generated.

With #22878 you can create an Encoder via both

var Dataset[MyClass] ds = AvroEncoder.of(MyClass.class)
// or
var Dataset[MyClass] ds = AvroEncoder.of(myClassAvroSchema) // a JSON string for a SpecificRecord
var Dataset[GenericRecord] ds = AvroEncoder.of(genericRecordAvroSchema) // a JSON string for a GenericRecord

Two implications:

  1. This approach does not rely on implicits or reflection, as the AvroEncoder determines all its typing statically, at compile-time, from from the AvroSchema.
  2. The API does not rely on any Scala-exclusive typing, so the Java API is intuitively static.

Coverage

Spark-Avro’s ability to move between AvroSchema and Spark Dataset Schema also gives us the ability to traverse the DataType to create our Encoder’s Ser/De Expressions rather than using Reflection. This gives us two immediate benefits

  1. The qualities of Bytes and Enums are more transparently represented in the AvroSchema, and so the SerDe expressions can be more directly generated.
  2. Nested Records and Complex Unions (as well as Avro Lists and Maps) are a solved-problem in Spark-Avro, so we can generate a Dataset Schema of arbitrary complexity and nesting (using the Object DataType and NewInstance Expression I mentioned previously).
  3. Naturally, we don’t have to extend the Bean Encoder for properties.

These two items mean the AvroEncoder in #22878 can generate an Encoder having full coverage of Avro types, and this coverage of the various combinations of types that can appear when adding nested Records and Unions is well tested in the PR.

Last thoughts

The PR goes a long way in support of Avro while still being very concise, which is definitely advantageous from a maintainability perspective. My concerns with a Reflection-based approach are:

  1. It would need Object extension for handling Complex Unions. We do sometimes see Avro Union types having two or more Nested Records, or a Record and a String, so I personally feel this is necessary functionality.
  2. A Reflection-based approach also foregoes (at least at my reading) Datasets of GenericRecord objects.

Parting words for #22878:

  1. While it’s length (in terms of lines of code) as a feature has been discussed, however, I’d say ultimately it’s very well tested, considering we’re ultimately testing a closed set of type combinations described by AvroSchema, and nested complex types are tested via inductive unit tests. Anecdotally, I’ll say we’ve been using it in a fork of Spark-Avro with great success over exceptionally complicated Schemas.
  2. It fits quite natively in the new Spark-Avro “external” sub-project, and accomplishes the goal: providing an Encoder for Datasets typed by arbitrary Avro (Generic and Specific), with native support for Schema evolution.

Again, I'm very happy to see where this discussion goes as it evolves (:

@mazeboard
Copy link
Author

mazeboard commented Apr 10, 2019

This PR is not an extension of the existing Java Bean Encoder: The PR adds support for bean objects, java.util.List, java.util.Map, and java enums to ScalaReflection; unlike the existing javaBean Encoder, properties can be named without the set/get prefix (this is one of the key points that allows the encoding of Avro Fixed types. I believe, the other key point is that the addition must be in ScalaReflection).

Reminder of Avro types:
primitive types: null, boolean, int, long, float, double, bytes, string
complex types: Records, Enums, Arrays, Maps, Unions, Fixed

All Avro types are supported by this PR (may be a better test including all Avro types is required), including simple union types and excluding complex union types: Simple Avro unions are [null, type1], all other unions are complex.

this PR supports simple unions but not complex unions for the simple reason that the Avro compiler will generate java code with type Object for all complex unions, and fields with simple unions will be typed as the non-null type of the union.

Currently, the ScalaReflection does not have an encoder for Object type, but we can modify the ScalaReflection to use a default encoder (ie. kryo) for Object type (currently I do not know if this is advisable, but why not? instead of throwing an error we could use a default encoder for the objects that have no encoder found).

I do not understand the issue related to being Reflection-driven approach: all common scala objects are encoded using reflection.

I may be wrong, but as I tried to explain in this PR, that we need to add types in ScalaReflection to be able to transform Datasets to other Datasets of embedded Avro types.

As an example the following map function transforms the Dataset[A] to a Dataset[(B, C)]

{code}
val r: Dataset[A] = List(makeA).toDS()
val ds: Dataset[(B, C)] = r.map(e => (e.getB, e.getC))
{/code}

The map function will recursively use ScalaReflection to find encoders for B, C types (Do you know if this runs with #22878 solution? Or does it complain at runtime with no encoder found?)

Finally, I did not understand the benefits of AvroSchema driven approach, for me an Avro object is completely defined by its properties (that are derived from the Avro schema); the Avro compiler generates java code with all the properties in the Avro schema.

@bdrillard
Copy link

bdrillard commented Apr 10, 2019

Thanks @mazeboard for the reply and clarifying the support here for Unions. I've edited my post to a more targeted discussion of Complex Union types for clarity:

unions of two types where Null is not one of the types, or [Unions] having more than two types

Also now better understood this PR expands Spark's ScalaReflection capabilities (rather than the Bean Encoder) to support more types that would be present in an Avro SpecificRecord.

I'm inclined to wait a bit for some more direction or questions from the Spark committers.

@@ -308,6 +309,19 @@ class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTes
encodeDecodeTest(Option("abc"), "option of string")
encodeDecodeTest(Option.empty[String], "empty option of string")

encodeDecodeTest(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @bdrillard for the summary of previous work.

Seems #22878 covered this scenario? Please correct me if I missing something. https://github.com/apache/spark/pull/22878/files#diff-24ca7610c9c163779104e6c797713431R327

Currently all Avro types are supported, except complex unions

2. for testing, add an Avro having all possible types:
  primitive types: null, boolean, int, long, float, double, bytes, string
  complex types: Records, Enums, Arrays, Maps, Unions, Fixed
@mazeboard
Copy link
Author

just a note about the number of lines changes: the java code for the Avro objects used in the test have 1252 lines of code; so if we ignore those lines of code the overall changed lines of code is +288 -17

@mazeboard
Copy link
Author

mazeboard commented Apr 11, 2019

I would like to give an example to explain why the PR addition
must be in ScalaReflection

The test is done with spark-sql 2.4.0

The first transformation (map), from Dataset[Foo) to Dataset[Bar],
pass without error, but the second transformation, from Dataset[Foo]
to Dataset[(Foo, Bar)] raises an exception, No Encoder found; this
is explained by the fact that the tuple encoder will search recursively
in ScalaReflection and fails to find an encoder for Foo and Bar

import spark.implicits._

implicit val encoderFoo = Encoders.bean[Foo](classOf[Foo])
implicit val encoderBar = Encoders.bean[Bar](classOf[Bar])

val bar = new Bar
bar.setBar("ok")
val a = new Foo
a.setA(55)
a.setB(bar)
val ds = List(a).toDS()
val x = ds.map(_.getB)               // Ok
val y = ds.map(x => (x, x.getB))   // Ko - No Encoder found for Bar and Foo

class Bar {
    private var bar$: java.lang.String = _
    def setBar(value: java.lang.String): Unit = {
       bar$ = value
    }
    def getBar(): java.lang.String = bar$
 }

 class Foo extends Bar {
      var a: java.lang.Integer = _
      var b: Bar = _
      def getA() = a
      def setA(x: java.lang.Integer) {a = x}
      def getB(): Bar = b
      def setB(x: Bar) { b = x}
 }

With the PR additon the following code runs without errors

import spark.implicits._

implicit val encoderFoo = ExpressionEncoder[Foo]

val bar = new Bar
bar.setBar("ok")
val a = new Foo
a.setA(55)
a.setB(bar)
val ds = List(a).toDS()
val x = ds.map(_.getB)               // Ok
val y = ds.map(x => (x, x.getB))   // Ok

It is possible to make the first program work by adding the following implicit:

implicit val tupleEncoder: Encoder[(Foo, Bar)] = Encoders.tuple(encoderFoo, encoderBar)

But this becomes rapidly a burden if we need to expose many embedded objects within the Avro object.

@mazeboard
Copy link
Author

mazeboard commented Apr 13, 2019

Even though we prefer the solution that modifies ScalaReflection in this PR, we also modified JavaTypeInference (in PR #24367) to be able to create encoders for Avro objects using the bean encoder;

We currently have two solutions, the one in this PR and another one using Encoders.bean

The issue here is how the input and the convertedBack objects are compared

if we replace the check, in line ExpressionEncoderSuite.scala:442,

left.asInstanceOf[Comparable[Any]].compareTo(right) == 0
By
left.asInstanceOf[Comparable[Any]].equals(right) == 0

the test for Avro encoder passes, but unfortunately other tests fail

Equality of objects is tricky.

The GenericData compare function

  /** Comparison implementation.  When equals is true, only checks for equality,
   * not for order. */
  @SuppressWarnings(value="unchecked")
  protected int compare(Object o1, Object o2, Schema s, boolean equals) {
fails to compare Maps when the parameter equals is false

I propose the following

replace ExpressionEncoderSuite:434:444 lines

      val isCorrect = (input, convertedBack) match {
        case (b1: Array[Byte], b2: Array[Byte]) => Arrays.equals(b1, b2)
        case (b1: Array[Int], b2: Array[Int]) => Arrays.equals(b1, b2)
        case (b1: Array[Array[_]], b2: Array[Array[_]]) =>
          Arrays.deepEquals(b1.asInstanceOf[Array[AnyRef]], b2.asInstanceOf[Array[AnyRef]])
        case (b1: Array[_], b2: Array[_]) =>
          Arrays.equals(b1.asInstanceOf[Array[AnyRef]], b2.asInstanceOf[Array[AnyRef]])
        case (left: Comparable[_], right: Comparable[_]) =>
          left.asInstanceOf[Comparable[Any]].compareTo(right) == 0
        case _ => input == convertedBack
      }
by

      val convertedBackRow = encoder.toRow(convertedBack)
      val isCorrect = row == convertedBackRow
With the proposed modification all the tests in ExpressionEncoderSuite passes
@mazeboard mazeboard changed the title [SPARK-27388][SQL] expression encoder for objects defined by properties [SPARK-27388][SQL] encoder for objects defined by properties (ie. Avro) May 16, 2019
@mazeboard
Copy link
Author

mazeboard commented May 25, 2019

Overall: isn't this what the Encoder for Java Beans is for and already supports? I am not sure it obvious to me this should go in ScalaReflection.
Separately I don't think it's worth further supporting JavaBean properties. That's pretty old and rare, I think.

@srowen Hi, I would like to hear from you about the possibility to merge this PR, many developer teams need to have the possibility to work with Datasets of SpecificRecords (ie. Avro), I implemented a version that modifies the bean encoder (see #24367) but I believe that adding encoders to ScalaReflection, as done by this PR, is the way to go for reasons I explained in this PR; the solution is simple with small changes/additions; please let me know if this can be considered as a viable solution and if someone can take this PR forward; as you said JavaBean is pretty old and rarely used.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Dec 31, 2019
@github-actions github-actions bot closed this Jan 1, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants