Skip to content

Commit 715c90a

Browse files
drcrallenMarcelo Vanzin
authored andcommitted
[SPARK-17696][core] Partial backport of SPARK-12330 to branch-1.6.
From the original commit message: This PR also fixes a regression caused by [SPARK-10987] whereby submitting a shutdown causes a race between the local shutdown procedure and the notification of the scheduler driver disconnection. If the scheduler driver disconnection wins the race, the coarse executor incorrectly exits with status 1 (instead of the proper status 0) Author: Charles Allen <[email protected]> (cherry picked from commit 2eaeafe)
1 parent e2ce0ca commit 715c90a

File tree

1 file changed

+7
-1
lines changed

1 file changed

+7
-1
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.executor
1919

2020
import java.net.URL
2121
import java.nio.ByteBuffer
22+
import java.util.concurrent.atomic.AtomicBoolean
2223

2324
import org.apache.hadoop.conf.Configuration
2425

@@ -45,6 +46,7 @@ private[spark] class CoarseGrainedExecutorBackend(
4546
env: SparkEnv)
4647
extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
4748

49+
private[this] val stopping = new AtomicBoolean(false)
4850
var executor: Executor = null
4951
@volatile var driver: Option[RpcEndpointRef] = None
5052

@@ -106,19 +108,23 @@ private[spark] class CoarseGrainedExecutorBackend(
106108
}
107109

108110
case StopExecutor =>
111+
stopping.set(true)
109112
logInfo("Driver commanded a shutdown")
110113
// Cannot shutdown here because an ack may need to be sent back to the caller. So send
111114
// a message to self to actually do the shutdown.
112115
self.send(Shutdown)
113116

114117
case Shutdown =>
118+
stopping.set(true)
115119
executor.stop()
116120
stop()
117121
rpcEnv.shutdown()
118122
}
119123

120124
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
121-
if (driver.exists(_.address == remoteAddress)) {
125+
if (stopping.get()) {
126+
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
127+
} else if (driver.exists(_.address == remoteAddress)) {
122128
logError(s"Driver $remoteAddress disassociated! Shutting down.")
123129
System.exit(1)
124130
} else {

0 commit comments

Comments
 (0)