-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle empty partition iterators #367
Conversation
Empty edge partitions sometimes appear in the output of zipPartitions for unknown reasons, causing calls to Iterator#next to fail. This commit checks these cases, handles them by returning an empty iterator, and logs an error if this would cause GraphX to drop a corresponding non-empty partition. Resolves amplab/graphx#52.
I know you said "unknown", but any guesses on why they appear? Seems like they shouldn't. |
Merged build triggered. |
Merged build started. |
I don't have any ideas. cc @jegonzal @dcrankshaw |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13934/ |
I've looked into briefly but I'm not sure either. |
Jenkins, retest this please. |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14191/ |
@ankurdave do we still need this if we merge #497? |
No, #497 subsumes this. Closing. |
Alright, great. I took a quick look through #497 but I also want to test it locally. I think the Jenkins failure may have been due to some methods with unspecified return types, breaking MIMA or scalastyle. But we'll find out when we rerun it. |
GraphX: Unifying Graphs and Tables GraphX extends Spark's distributed fault-tolerant collections API and interactive console with a new graph API which leverages recent advances in graph systems (e.g., [GraphLab](http://graphlab.org)) to enable users to easily and interactively build, transform, and reason about graph structured data at scale. See http://amplab.github.io/graphx/. Thanks to @jegonzal, @rxin, @ankurdave, @dcrankshaw, @jianpingjwang, @amatsukawa, @kellrott, and @adamnovak. Tasks left: - [x] Graph-level uncache - [x] Uncache previous iterations in Pregel - [x] ~~Uncache previous iterations in GraphLab~~ (postponed to post-release) - [x] - Describe GC issue with GraphLab - [ ] Write `docs/graphx-programming-guide.md` - [x] - Mention future Bagel support in docs - [ ] - Section on caching/uncaching in docs: As with Spark, cache something that is used more than once. In an iterative algorithm, try to cache and force (i.e., materialize) something every iteration, then uncache the cached things that depended on the newly materialized RDD but that won't be referenced again. - [x] Undo modifications to core collections and instead copy them to org.apache.spark.graphx - [x] Make Graph serializable to work around capture in Spark shell - [x] Rename graph -> graphx in package name and subproject - [x] Remove standalone PageRank - [x] ~~Fix amplab/graphx#52 by checking `iter.hasNext`~~
* set RestartPolicy=Never for executor As for current implementation the RestartPolicy of executor pod is not set, so the default value "OnFailure" is in effect. But this causes problem. If an executor is terminated unexpectedly, for example, exit by java.lang.OutOfMemoryError, it'll be restarted by k8s with the same executor ID. When the new executor tries to fetch a block hold by the last executor, ShuffleBlockFetcherIterator.splitLocalRemoteBlocks() think it's a **local** block and tries to read it from it's local dir. But the executor's local dir is changed because random generated ID is part of local dir. FetchFailedException will raise and the stage will fail. The rolling Error message: 17/06/29 01:54:56 WARN KubernetesTaskSetManager: Lost task 0.1 in stage 2.0 (TID 7, 172.16.75.92, executor 1): FetchFailed(BlockManagerId(1, 172.16.75.92, 40539, None), shuffleId=2, mapId=0, reduceId=0, message= org.apache.spark.shuffle.FetchFailedException: /data2/spark/blockmgr-0e228d3c-8727-422e-aa97-2841a877c42a/32/shuffle_2_0_0.index (No such file or directory) at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) * Update KubernetesClusterSchedulerBackend.scala
* set RestartPolicy=Never for executor As for current implementation the RestartPolicy of executor pod is not set, so the default value "OnFailure" is in effect. But this causes problem. If an executor is terminated unexpectedly, for example, exit by java.lang.OutOfMemoryError, it'll be restarted by k8s with the same executor ID. When the new executor tries to fetch a block hold by the last executor, ShuffleBlockFetcherIterator.splitLocalRemoteBlocks() think it's a **local** block and tries to read it from it's local dir. But the executor's local dir is changed because random generated ID is part of local dir. FetchFailedException will raise and the stage will fail. The rolling Error message: 17/06/29 01:54:56 WARN KubernetesTaskSetManager: Lost task 0.1 in stage 2.0 (TID 7, 172.16.75.92, executor 1): FetchFailed(BlockManagerId(1, 172.16.75.92, 40539, None), shuffleId=2, mapId=0, reduceId=0, message= org.apache.spark.shuffle.FetchFailedException: /data2/spark/blockmgr-0e228d3c-8727-422e-aa97-2841a877c42a/32/shuffle_2_0_0.index (No such file or directory) at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) * Update KubernetesClusterSchedulerBackend.scala
This reverts commit a3c2539.
Disable Telefonica Cloud related jobs
Update version to 2.3.2-pie1.0.3
Empty edge partitions sometimes appear in the output of zipPartitions for unknown reasons, causing calls to Iterator#next to fail. This PR checks these cases, handles them by returning an empty iterator, and logs an error if this would cause GraphX to drop a corresponding non-empty partition.
Resolves amplab/graphx#52.