-
Notifications
You must be signed in to change notification settings - Fork 308
Close #169 and add support for DataSet of Avro records #217
base: master
Are you sure you want to change the base?
Close #169 and add support for DataSet of Avro records #217
Conversation
To avoid confusion, we should remove the documentation / linking instructions for the 2.x line of releases since the current README describes features which don't apply there. Instead, we should link to the older docs. Author: Josh Rosen <[email protected]> Closes databricks#199 from JoshRosen/readme-fixes. (cherry picked from commit b01a034) Signed-off-by: Josh Rosen <[email protected]>
There is still an issue with ExternalMapToCatalyst to be resolved
…t will fail under pacakaging
This patch builds on databricks#206 in order to restore support for Spark 2.0.x. This means that a single binary artifact can be used with both Spark 2.0.x and 2.1.x, simplifying the builds of downstream projects which are compatible with both Spark versions. Author: Josh Rosen <[email protected]> Closes databricks#212 from JoshRosen/add-spark-2.1.
Author: Josh Rosen <[email protected]> Closes databricks#213 from JoshRosen/prepare-for-3.2-release.
Codecov Report
@@ Coverage Diff @@
## master #217 +/- ##
==========================================
+ Coverage 90.71% 93.16% +2.45%
==========================================
Files 5 7 +2
Lines 334 688 +354
Branches 50 73 +23
==========================================
+ Hits 303 641 +338
- Misses 31 47 +16 |
@themodernlife thanks for putting this pull-request together, I certainly didn't have the SBT expertise necessary to handle multiple builds of Spark, so this additional work is just what the encoder needed. I had a look at the inlining of additional object types that aren't common to both 2.0.0 and 2.1.x, as well as the serializing of schemas (when used in parsing complex unions), those changes look fine to me. I noticed just a few stray commented-out lines from what may have been some refactoring experiments and a few style points in doc comments. Otherwise this looks great, thanks for the additional legwork on this issue. |
I'm trying to add the following test based on this PR, but I get a I'm not sure if this use case wasn't meant to be supported in this PR or maybe I'm doing something wrong in the test code.
|
Hmm... ran into this with I'm looking into a way to solve this. I have a fix for your unit test, but looking at the code I'm worried there could be other runtime errors scattered about. |
@gcpagano pushed change for your unit test and added coverage for fixed and enum cases. Let me know if you have any feedback. |
Hi, based on this issue databricks#67 I create this pull request Author: Nihed MBAREK <[email protected]> Author: vlyubin <[email protected]> Author: nihed <[email protected]> Closes databricks#124 from nihed/master. (cherry picked from commit c19f01a) Signed-off-by: vlyubin <[email protected]>
LGTM |
Been watching this PR for a while. Any reason to hold back on this issue? |
@themodernlife Could you check and resolve this last merge conflict? It appears to be pretty minor. |
@bdrillard Do you have time to create a clean PR based on latest master? We will keep reviewing on the changes. Thanks! |
@themodernlife I'm afraid you need to send a new PR to set the base branch to master... |
@cloud-fan I was able to update the base branch to master. Looking into why Travis is complaining. |
Hi @themodernlife you changes contains commits from @JoshRosen and @liancheng , which should not be related to this PR. |
I took a look at the implementation and it depends on a lot of the internals of Spark, with a lot of expressions that even do code generation. Basically this would break with every new release of Spark and isn't really maintainable. We should really design and implement a public API in Spark for expressing encoders, and then implement that here. |
I discussed this with @bdrillard, who did the bulk of the Encoder implementation, and we would welcome a stable public API in Spark for custom encoders like this. We will transition this logic to that when it's available. Should we log a JIRA to the Spark project to track that? In the meantime, we realize this taps into some Spark classes that are subject to change, and are willing to update our internal usage of it as they evolve. We realize this isn't ideal, but it provides quite a bit of value to our existing workloads for reasons described earlier in this chain. Hopefully this state of affairs is temporary and we'll be able to migrate to the public API when available. |
Certainly you can use this if you are willing to pay the cost to upgrade it with each version of Spark. But if that's the case, perhaps just do it in your fork for now? |
Sure, we're happy to continue with our internal fork for the time being. But of course we're only one of several interested parties on this thread, so I'm sure a public API to support this would be appreciated in general. |
Yup designing a public API for encoder/decoders make a lot of sense in Spark proper. We should do it! |
@rbrush @bdrillard Aside from that, is there a way to update the patch to not rely on custom code generation? We might need to add couple small expressions to Spark to do that. If that works, perhaps we can create a release based on that first, and then use that as the basis to create the encoder API. |
@rxin, to check my understanding of your question, it depends on how broadly you mean "to not rely on custom code generation". The Serializer/Deserializer of the More specifically though, the functions producing the SerDe rely on currently public functions in the existing Spark expressions API, e.g. We do however, have a few functions that are custom extensions of
Let me know your thoughts. |
@bdrillard I think it is the custom extensions of Regarding the specific
|
Spark doesn't have |
@marmbrus, @cloud-fan, I'd be happy to open a PR in Spark proper with those changes, so we can then refactor this PR to make use of those public functions. |
why not just implement it here? Otherwise you can only support Spark 2.3 or later |
@cloud-fan the fear is that having an external library depend on the specifics of code-gen is too brittle. I think its okay if this requires a new version of Spark if that reduces the tight coupling with the specifics of execution. Until that is released we can update this PR by depending on a nightly snapshot and then once 2.3 is out we can make a new spark-avro release. @bdrillard, that sounds great. Feel free to message me on the Spark JIRA / PR and I can help shepherd them in. Thanks for working on this! |
@themodernlife
Here's the test I added: test("create Dataset from Fixed Generic Record") {
val sparkSession = spark
import sparkSession.implicits._
val schema: Schema =
SchemaBuilder
.record("GenericRecordTest2")
.namespace("com.databricks.spark.avro")
.fields()
.name("fixedUnionVal").`type`()
.unionOf()
.fixed("IPv4").size(4).and()
.intType()
.endUnion()
.noDefault()
.endRecord()
implicit val enc = AvroEncoder.of[GenericData.Record](schema)
val genericRecords = (1 to 10) map { i =>
new GenericRecordBuilder(schema)
.set("fixedUnionVal", i)
.build()
}
val rdd: RDD[GenericData.Record] = sparkSession.sparkContext
.parallelize(genericRecords)
val ds = rdd.toDS()
assert(ds.count() == genericRecords.size)
} Gist to the stacktrace |
@marmbrus I've logged SPARK-22739 to track the additions to the @kalvinnchau Thanks for the additional test case! We'll be circling back to this once the Spark ticket I link just above is resolved. |
@bdrillard @themodernlife I've attempted to use this but ran into an exception, conversion between The stack trace is:
So this seems to take place within Serialization of the Avro POJO to Row. I've tracked the particular field of the Avro schema down to a property called That shema field is defined like so: {
"name": "attributes",
"type": [
"null",
{
"type": "map",
"avro.java.string": "String",
"values": {
"type": "string",
"avro.java.string": "String"
}
}
],
"doc": "A mapping of arbitrary event attributes, as key-value pairs.",
"default": null
} The Interestingly enough, this error does not occur for fields that are not {
"name": "segment",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
} |
Well I think I resolved the issue, I just cast it to @@ -604,7 +604,7 @@ private object AvroTypeInference {
ExternalMapToCatalyst(
inputObject,
- ObjectType(classOf[org.apache.avro.util.Utf8]),
+ ObjectType(classOf[java.lang.CharSequence]),
serializerFor(_, Schema.create(STRING)),
valueType,
serializerFor(_, valueSchema), There was also a Hive issue which is closely related that I kinda of stole the idea for the fix from: https://issues.apache.org/jira/browse/HIVE-5865 |
@bdrillard you did an amazing job getting this working and I hope you'll have a look at the slight refactoring I've done to get this over the finish line:
The main challenge was getting past Spark version incompatibilities. I've done that through the addition of SBT modules, liberal use of reflection and some straight up copy/paste. I've tried to put proper fences around the hacks.
Let me know what you guys think! Again, all credit to @bdrillard since this is 90% his work.
Cheers!