Skip to content

Commit d1814a3

Browse files
committed
Merge remote-tracking branch 'upstream/master' into remove-history-master-SPARK-12299
Conflicts: core/src/main/scala/org/apache/spark/deploy/master/Master.scala
2 parents f9c09a1 + 5cb2e33 commit d1814a3

File tree

32 files changed

+233
-124
lines changed

32 files changed

+233
-124
lines changed

core/src/main/scala/org/apache/spark/FutureAction.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.annotation.DeveloperApi
2828
import org.apache.spark.api.java.JavaFutureAction
2929
import org.apache.spark.rdd.RDD
3030
import org.apache.spark.scheduler.JobWaiter
31+
import org.apache.spark.util.ThreadUtils
3132

3233

3334
/**
@@ -45,6 +46,7 @@ trait FutureAction[T] extends Future[T] {
4546

4647
/**
4748
* Blocks until this action completes.
49+
*
4850
* @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf
4951
* for unbounded waiting, or a finite positive duration
5052
* @return this FutureAction
@@ -53,6 +55,7 @@ trait FutureAction[T] extends Future[T] {
5355

5456
/**
5557
* Awaits and returns the result (of type T) of this action.
58+
*
5659
* @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf
5760
* for unbounded waiting, or a finite positive duration
5861
* @throws Exception exception during action execution
@@ -89,8 +92,8 @@ trait FutureAction[T] extends Future[T] {
8992
/**
9093
* Blocks and returns the result of this job.
9194
*/
92-
@throws(classOf[Exception])
93-
def get(): T = Await.result(this, Duration.Inf)
95+
@throws(classOf[SparkException])
96+
def get(): T = ThreadUtils.awaitResult(this, Duration.Inf)
9497

9598
/**
9699
* Returns the job IDs run by the underlying async operation.

core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
2323
import java.util.concurrent.TimeoutException
2424

2525
import scala.collection.mutable.ListBuffer
26-
import scala.concurrent.{Await, Future, Promise}
26+
import scala.concurrent.{Future, Promise}
2727
import scala.concurrent.ExecutionContext.Implicits.global
2828
import scala.concurrent.duration._
2929
import scala.language.postfixOps
@@ -35,7 +35,7 @@ import org.json4s.jackson.JsonMethods
3535
import org.apache.spark.{SparkConf, SparkContext}
3636
import org.apache.spark.deploy.master.RecoveryState
3737
import org.apache.spark.internal.Logging
38-
import org.apache.spark.util.Utils
38+
import org.apache.spark.util.{ThreadUtils, Utils}
3939

4040
/**
4141
* This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
@@ -265,7 +265,7 @@ private object FaultToleranceTest extends App with Logging {
265265
}
266266

267267
// Avoid waiting indefinitely (e.g., we could register but get no executors).
268-
assertTrue(Await.result(f, 120 seconds))
268+
assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
269269
}
270270

271271
/**
@@ -318,7 +318,7 @@ private object FaultToleranceTest extends App with Logging {
318318
}
319319

320320
try {
321-
assertTrue(Await.result(f, 120 seconds))
321+
assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
322322
} catch {
323323
case e: TimeoutException =>
324324
logError("Master states: " + masters.map(_.state))
@@ -422,7 +422,7 @@ private object SparkDocker {
422422
}
423423

424424
dockerCmd.run(ProcessLogger(findIpAndLog _))
425-
val ip = Await.result(ipPromise.future, 30 seconds)
425+
val ip = ThreadUtils.awaitResult(ipPromise.future, 30 seconds)
426426
val dockerId = Docker.getLastProcessId
427427
(ip, dockerId, outFile)
428428
}

core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@ import scala.collection.mutable
2727
import scala.concurrent.{Await, Future}
2828
import scala.concurrent.duration._
2929
import scala.io.Source
30+
import scala.util.control.NonFatal
3031

3132
import com.fasterxml.jackson.core.JsonProcessingException
3233

33-
import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
34+
import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf, SparkException}
3435
import org.apache.spark.internal.Logging
3536
import org.apache.spark.util.Utils
3637

@@ -258,13 +259,17 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
258259
}
259260
}
260261

262+
// scalastyle:off awaitresult
261263
try { Await.result(responseFuture, 10.seconds) } catch {
264+
// scalastyle:on awaitresult
262265
case unreachable @ (_: FileNotFoundException | _: SocketException) =>
263266
throw new SubmitRestConnectionException("Unable to connect to server", unreachable)
264267
case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
265268
throw new SubmitRestProtocolException("Malformed response received from server", malformed)
266269
case timeout: TimeoutException =>
267270
throw new SubmitRestConnectionException("No response from server", timeout)
271+
case NonFatal(t) =>
272+
throw new SparkException("Exception while waiting for response", t)
268273
}
269274
}
270275

core/src/main/scala/org/apache/spark/network/BlockTransferService.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@ package org.apache.spark.network
2020
import java.io.Closeable
2121
import java.nio.ByteBuffer
2222

23-
import scala.concurrent.{Await, Future, Promise}
23+
import scala.concurrent.{Future, Promise}
2424
import scala.concurrent.duration.Duration
2525
import scala.reflect.ClassTag
2626

2727
import org.apache.spark.internal.Logging
2828
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
2929
import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient}
3030
import org.apache.spark.storage.{BlockId, StorageLevel}
31+
import org.apache.spark.util.ThreadUtils
3132

3233
private[spark]
3334
abstract class BlockTransferService extends ShuffleClient with Closeable with Logging {
@@ -100,8 +101,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
100101
result.success(new NioManagedBuffer(ret))
101102
}
102103
})
103-
104-
Await.result(result.future, Duration.Inf)
104+
ThreadUtils.awaitResult(result.future, Duration.Inf)
105105
}
106106

107107
/**
@@ -119,6 +119,6 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
119119
level: StorageLevel,
120120
classTag: ClassTag[_]): Unit = {
121121
val future = uploadBlock(hostname, port, execId, blockId, blockData, level, classTag)
122-
Await.result(future, Duration.Inf)
122+
ThreadUtils.awaitResult(future, Duration.Inf)
123123
}
124124
}

core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ package org.apache.spark.rpc
1919

2020
import java.util.concurrent.TimeoutException
2121

22-
import scala.concurrent.{Await, Awaitable}
22+
import scala.concurrent.{Await, Future}
2323
import scala.concurrent.duration._
24+
import scala.util.control.NonFatal
2425

25-
import org.apache.spark.SparkConf
26+
import org.apache.spark.{SparkConf, SparkException}
2627
import org.apache.spark.util.Utils
2728

2829
/**
@@ -65,14 +66,21 @@ private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: S
6566
/**
6667
* Wait for the completed result and return it. If the result is not available within this
6768
* timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout.
68-
* @param awaitable the `Awaitable` to be awaited
69-
* @throws RpcTimeoutException if after waiting for the specified time `awaitable`
69+
*
70+
* @param future the `Future` to be awaited
71+
* @throws RpcTimeoutException if after waiting for the specified time `future`
7072
* is still not ready
7173
*/
72-
def awaitResult[T](awaitable: Awaitable[T]): T = {
74+
def awaitResult[T](future: Future[T]): T = {
75+
val wrapAndRethrow: PartialFunction[Throwable, T] = {
76+
case NonFatal(t) =>
77+
throw new SparkException("Exception thrown in awaitResult", t)
78+
}
7379
try {
74-
Await.result(awaitable, duration)
75-
} catch addMessageIfTimeout
80+
// scalastyle:off awaitresult
81+
Await.result(future, duration)
82+
// scalastyle:on awaitresult
83+
} catch addMessageIfTimeout.orElse(wrapAndRethrow)
7684
}
7785
}
7886

@@ -82,6 +90,7 @@ private[spark] object RpcTimeout {
8290
/**
8391
* Lookup the timeout property in the configuration and create
8492
* a RpcTimeout with the property key in the description.
93+
*
8594
* @param conf configuration properties containing the timeout
8695
* @param timeoutProp property key for the timeout in seconds
8796
* @throws NoSuchElementException if property is not set
@@ -95,6 +104,7 @@ private[spark] object RpcTimeout {
95104
* Lookup the timeout property in the configuration and create
96105
* a RpcTimeout with the property key in the description.
97106
* Uses the given default value if property is not set
107+
*
98108
* @param conf configuration properties containing the timeout
99109
* @param timeoutProp property key for the timeout in seconds
100110
* @param defaultValue default timeout value in seconds if property not found
@@ -109,6 +119,7 @@ private[spark] object RpcTimeout {
109119
* and create a RpcTimeout with the first set property key in the
110120
* description.
111121
* Uses the given default value if property is not set
122+
*
112123
* @param conf configuration properties containing the timeout
113124
* @param timeoutPropList prioritized list of property keys for the timeout in seconds
114125
* @param defaultValue default timeout value in seconds if no properties found

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,12 @@ private[spark] class BlockManager(
260260
def waitForAsyncReregister(): Unit = {
261261
val task = asyncReregisterTask
262262
if (task != null) {
263-
Await.ready(task, Duration.Inf)
263+
try {
264+
Await.ready(task, Duration.Inf)
265+
} catch {
266+
case NonFatal(t) =>
267+
throw new Exception("Error occurred while waiting for async. reregistration", t)
268+
}
264269
}
265270
}
266271

@@ -802,7 +807,12 @@ private[spark] class BlockManager(
802807
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
803808
if (level.replication > 1) {
804809
// Wait for asynchronous replication to finish
805-
Await.ready(replicationFuture, Duration.Inf)
810+
try {
811+
Await.ready(replicationFuture, Duration.Inf)
812+
} catch {
813+
case NonFatal(t) =>
814+
throw new Exception("Error occurred while waiting for replication to finish", t)
815+
}
806816
}
807817
if (blockWasSuccessfullyStored) {
808818
None

core/src/main/scala/org/apache/spark/util/ThreadUtils.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@ package org.apache.spark.util
1919

2020
import java.util.concurrent._
2121

22-
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
22+
import scala.concurrent.{Await, Awaitable, ExecutionContext, ExecutionContextExecutor}
23+
import scala.concurrent.duration.Duration
2324
import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
2425
import scala.util.control.NonFatal
2526

2627
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
2728

29+
import org.apache.spark.SparkException
30+
2831
private[spark] object ThreadUtils {
2932

3033
private val sameThreadExecutionContext =
@@ -174,4 +177,21 @@ private[spark] object ThreadUtils {
174177
false // asyncMode
175178
)
176179
}
180+
181+
// scalastyle:off awaitresult
182+
/**
183+
* Preferred alternative to [[Await.result()]]. This method wraps and re-throws any exceptions
184+
* thrown by the underlying [[Await]] call, ensuring that this thread's stack trace appears in
185+
* logs.
186+
*/
187+
@throws(classOf[SparkException])
188+
def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
189+
try {
190+
Await.result(awaitable, atMost)
191+
// scalastyle:on awaitresult
192+
} catch {
193+
case NonFatal(t) =>
194+
throw new SparkException("Exception thrown in awaitResult: ", t)
195+
}
196+
}
177197
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1598,6 +1598,7 @@ private[spark] object Utils extends Logging {
15981598

15991599
/**
16001600
* Timing method based on iterations that permit JVM JIT optimization.
1601+
*
16011602
* @param numIters number of iterations
16021603
* @param f function to be executed. If prepare is not None, the running time of each call to f
16031604
* must be an order of magnitude longer than one millisecond for accurate timing.
@@ -1639,6 +1640,7 @@ private[spark] object Utils extends Logging {
16391640

16401641
/**
16411642
* Creates a symlink.
1643+
*
16421644
* @param src absolute path to the source
16431645
* @param dst relative path for the destination
16441646
*/

core/src/test/scala/org/apache/spark/FutureActionSuite.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717

1818
package org.apache.spark
1919

20-
import scala.concurrent.Await
2120
import scala.concurrent.duration.Duration
2221

2322
import org.scalatest.{BeforeAndAfter, Matchers}
2423

24+
import org.apache.spark.util.ThreadUtils
25+
2526

2627
class FutureActionSuite
2728
extends SparkFunSuite
@@ -36,15 +37,15 @@ class FutureActionSuite
3637
test("simple async action") {
3738
val rdd = sc.parallelize(1 to 10, 2)
3839
val job = rdd.countAsync()
39-
val res = Await.result(job, Duration.Inf)
40+
val res = ThreadUtils.awaitResult(job, Duration.Inf)
4041
res should be (10)
4142
job.jobIds.size should be (1)
4243
}
4344

4445
test("complex async action") {
4546
val rdd = sc.parallelize(1 to 15, 3)
4647
val job = rdd.takeAsync(10)
47-
val res = Await.result(job, Duration.Inf)
48+
val res = ThreadUtils.awaitResult(job, Duration.Inf)
4849
res should be (1 to 10)
4950
job.jobIds.size should be (2)
5051
}

core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.util.concurrent.{ExecutorService, TimeUnit}
2121

2222
import scala.collection.Map
2323
import scala.collection.mutable
24-
import scala.concurrent.Await
2524
import scala.concurrent.duration._
2625
import scala.language.postfixOps
2726

@@ -36,7 +35,7 @@ import org.apache.spark.scheduler._
3635
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
3736
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3837
import org.apache.spark.storage.BlockManagerId
39-
import org.apache.spark.util.ManualClock
38+
import org.apache.spark.util.{ManualClock, ThreadUtils}
4039

4140
/**
4241
* A test suite for the heartbeating behavior between the driver and the executors.
@@ -231,14 +230,14 @@ class HeartbeatReceiverSuite
231230
private def addExecutorAndVerify(executorId: String): Unit = {
232231
assert(
233232
heartbeatReceiver.addExecutor(executorId).map { f =>
234-
Await.result(f, 10.seconds)
233+
ThreadUtils.awaitResult(f, 10.seconds)
235234
} === Some(true))
236235
}
237236

238237
private def removeExecutorAndVerify(executorId: String): Unit = {
239238
assert(
240239
heartbeatReceiver.removeExecutor(executorId).map { f =>
241-
Await.result(f, 10.seconds)
240+
ThreadUtils.awaitResult(f, 10.seconds)
242241
} === Some(true))
243242
}
244243

0 commit comments

Comments
 (0)