Skip to content

Commit

Permalink
set RestartPolicy=Never for executor (apache#367)
Browse files Browse the repository at this point in the history
* set RestartPolicy=Never for executor

As for current implementation the RestartPolicy of executor pod is
not set, so the default value "OnFailure" is in effect. But this
causes problem.

If an executor is terminated unexpectedly, for example, exit by
java.lang.OutOfMemoryError,  it'll be restarted by k8s with the
same executor ID.  When the new executor tries to fetch a block hold by
the last executor, ShuffleBlockFetcherIterator.splitLocalRemoteBlocks()
think it's a **local** block and tries to read it from it's local dir.
But the executor's local dir is changed because random generated ID is
part of local dir. FetchFailedException will raise and the stage will
fail.

The rolling Error message:

17/06/29 01:54:56 WARN KubernetesTaskSetManager: Lost task 0.1 in stage
2.0 (TID 7, 172.16.75.92, executor 1): FetchFailed(BlockManagerId(1,
172.16.75.92, 40539, None), shuffleId=2, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException:
/data2/spark/blockmgr-0e228d3c-8727-422e-aa97-2841a877c42a/32/shuffle_2_0_0.index
(No such file or directory)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

* Update KubernetesClusterSchedulerBackend.scala
  • Loading branch information
Hong Zhiguo authored and foxish committed Jul 19, 2017
1 parent 3ec9410 commit e1ff2f0
Showing 1 changed file with 1 addition and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
.endMetadata()
.withNewSpec()
.withHostname(hostname)
.withRestartPolicy("Never")
.withNodeSelector(nodeSelector.asJava)
.endSpec()
.build()
Expand Down

0 comments on commit e1ff2f0

Please sign in to comment.