Skip to content
This repository has been archived by the owner on Dec 20, 2018. It is now read-only.

Pick last file sorted by path for schema #269

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

koertkuipers
Copy link
Contributor

Picking the same file consistently for schema avoids weird bugs where the schema of an avro data source changes randomly or unexpectedly.

@codecov-io
Copy link

codecov-io commented Feb 12, 2018

Codecov Report

Merging #269 into master will increase coverage by 0.4%.
The diff coverage is 87.5%.

@@            Coverage Diff            @@
##           master     #269     +/-   ##
=========================================
+ Coverage   92.21%   92.61%   +0.4%     
=========================================
  Files           5        5             
  Lines         321      325      +4     
  Branches       43       41      -2     
=========================================
+ Hits          296      301      +5     
+ Misses         25       24      -1

)
}
def sampleFilePath = if (conf.getBoolean(IgnoreFilesWithoutExtensionProperty, true)) {
files.iterator.map(_.getPath).filter(_.getName.endsWith(".avro"))
Copy link
Contributor

@gengliangwang gengliangwang Jun 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

files.map(.getPath).sortBy(.getName)....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it has same result right?

files can be a very large sequence. the iterator approach avoids creating 2 copies of that sequence. also it is not necessary to do a full sort just to get the first sorted element.

are you saying its not worth the optimization?

Copy link
Contributor

@gengliangwang gengliangwang Jun 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right for not sorting all the file names.
But I don't think we need to convert it to an iterator.
Maybe we can try to make it more shorter like files.map(_.getPath).minBy(_.getName) ?
We can create a function which accepts parameter Seq(Path), then check if it is empty before getting the minimal one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iterator is lightweight and avoids materialization

minBy(_.getName) wouldnt work because we want to sort by the path, not just the filename (e.g. /some/path/x=1/part-0000.avro comes before /some/path/x=2/part-0000.avro)

minBy(_.toString) might work but i don't feel too certain about it. rather use Comparable to do the right thing. unfortunately Path is just Comparable, not Comparable[Path], so scala doesn't understand how to use it, which is why i resorted to using compareTo directly.

files.headOption.getOrElse {
throw new FileNotFoundException("No Avro files found.")
}
files.iterator.map(_.getPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

df1.write.avro(s"$tempDir/different_schemas/z=1")
val df2 = spark.createDataFrame(Seq(Tuple1("a"), Tuple1("b")))
df2.write.avro(s"$tempDir/different_schemas/z=2")
val df3 = spark.read.avro(s"$tempDir/different_schemas")
Copy link
Contributor

@gengliangwang gengliangwang Jun 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a loop for the reading? I am not sure if the order will be different every time

@cwlaird3
Copy link

cwlaird3 commented Jun 4, 2018

Have you considered using the schema from the newest data file to get the most up to date version of the schema? Or perhaps a configuration option to do that? Seems like most would update their schemas in a backwards compatible way and using the most recent schema would expose newer fields in the schema.

@koertkuipers
Copy link
Contributor Author

koertkuipers commented Jun 4, 2018 via email

@gengliangwang
Copy link
Contributor

@cwlaird3 good idea
@koertkuipers how about by default use the latest AVRO file's schema?

@koertkuipers koertkuipers changed the title Pick first file sorted by path for schema Pick last file sorted by path for schema Jun 7, 2018
@gengliangwang
Copy link
Contributor

@koertkuipers @cwlaird3 I checked with @liancheng , which is PMC member and one of the original author of Data source project.
He doesn't think we should make such assumption. If the schema is different among files, users are supposed to specify the schema:
https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema

This PR changes the behavior and is possible to cause regression to other users.

@koertkuipers
Copy link
Contributor Author

koertkuipers commented Jun 8, 2018

currently it uses a random file to pick schema. what would be an example of a user for which you break things by going from a random file to the last file?

@cwlaird3
Copy link

cwlaird3 commented Jun 8, 2018

I agree with @koertkuipers .. but if there's still a concern adding a configuration option to change the behavior could address that.

@koertkuipers
Copy link
Contributor Author

spark-avro already provides a mechanism for the user to provide a schema with the avroSchema key in options

the thing that is currently missing is merging of schemas across all files

@cwlaird3
Copy link

cwlaird3 commented Jun 8, 2018

By configuration I meant a flag to enable the behavior you've implemented here - not to provide a schema.

@koertkuipers
Copy link
Contributor Author

koertkuipers commented Jun 8, 2018 via email

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants