Skip to content

Commit 235919b

Browse files
committed
[SPARK-6980] Resolved conflicts after master merge
1 parent c07d05c commit 235919b

File tree

3 files changed

+104
-15
lines changed

3 files changed

+104
-15
lines changed

core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Add
2828
import akka.event.Logging.Error
2929
import akka.pattern.{ask => akkaAsk}
3030
import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent}
31+
import com.google.common.util.concurrent.MoreExecutors
32+
3133
import org.apache.spark.{SparkException, Logging, SparkConf}
3234
import org.apache.spark.rpc._
33-
import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
35+
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, ThreadUtils}
3436

3537
/**
3638
* A RpcEnv implementation based on Akka.
@@ -293,8 +295,8 @@ private[akka] class AkkaRpcEndpointRef(
293295
}
294296

295297
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
296-
import scala.concurrent.ExecutionContext.Implicits.global
297298
actorRef.ask(AkkaMessage(message, true))(timeout.duration).flatMap {
299+
// The function will run in the calling thread, so it should be short and never block.
298300
case msg @ AkkaMessage(message, reply) =>
299301
if (reply) {
300302
logError(s"Receive $msg but the sender cannot reply")
@@ -304,7 +306,7 @@ private[akka] class AkkaRpcEndpointRef(
304306
}
305307
case AkkaFailure(e) =>
306308
Future.failed(e)
307-
}.mapTo[T]
309+
}(ThreadUtils.sameThread).mapTo[T]
308310
}
309311

310312
override def toString: String = s"${getClass.getSimpleName}($actorRef)"

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717

1818
package org.apache.spark.storage
1919

20-
import scala.concurrent.Future
21-
import scala.concurrent.ExecutionContext.Implicits.global
20+
import scala.collection.Iterable
21+
import scala.collection.generic.CanBuildFrom
22+
import scala.concurrent.{Await, Future}
2223

2324
import org.apache.spark.rpc.RpcEndpointRef
2425
import org.apache.spark.{Logging, SparkConf, SparkException}
2526
import org.apache.spark.storage.BlockManagerMessages._
26-
import org.apache.spark.util.RpcUtils
27+
import org.apache.spark.util.{ThreadUtils, RpcUtils}
2728

2829
private[spark]
2930
class BlockManagerMaster(
@@ -102,8 +103,8 @@ class BlockManagerMaster(
102103
val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId))
103104
future.onFailure {
104105
case e: Exception =>
105-
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}")
106-
}
106+
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}", e)
107+
}(ThreadUtils.sameThread)
107108
if (blocking) {
108109
timeout.awaitResult(future)
109110
}
@@ -114,8 +115,8 @@ class BlockManagerMaster(
114115
val future = driverEndpoint.askWithRetry[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
115116
future.onFailure {
116117
case e: Exception =>
117-
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}")
118-
}
118+
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}", e)
119+
}(ThreadUtils.sameThread)
119120
if (blocking) {
120121
timeout.awaitResult(future)
121122
}
@@ -128,8 +129,8 @@ class BlockManagerMaster(
128129
future.onFailure {
129130
case e: Exception =>
130131
logWarning(s"Failed to remove broadcast $broadcastId" +
131-
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}")
132-
}
132+
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}", e)
133+
}(ThreadUtils.sameThread)
133134
if (blocking) {
134135
timeout.awaitResult(future)
135136
}
@@ -169,11 +170,17 @@ class BlockManagerMaster(
169170
val response = driverEndpoint.
170171
askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
171172
val (blockManagerIds, futures) = response.unzip
172-
val result = timeout.awaitResult(Future.sequence(futures))
173-
if (result == null) {
173+
implicit val sameThread = ThreadUtils.sameThread
174+
val cbf =
175+
implicitly[
176+
CanBuildFrom[Iterable[Future[Option[BlockStatus]]],
177+
Option[BlockStatus],
178+
Iterable[Option[BlockStatus]]]]
179+
val blockStatus = timeout.awaitResult(
180+
Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread))
181+
if (blockStatus == null) {
174182
throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)
175183
}
176-
val blockStatus = result.asInstanceOf[Iterable[Option[BlockStatus]]]
177184
blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) =>
178185
status.map { s => (blockManagerId, s) }
179186
}.toMap

core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,21 @@
1717

1818
package org.apache.spark.rpc.akka
1919

20+
import java.util.concurrent.TimeoutException
21+
22+
import scala.concurrent.Await
23+
import scala.concurrent.ExecutionContext.Implicits.global
24+
import scala.concurrent.duration._
25+
import scala.util.{Success, Failure}
26+
import scala.language.postfixOps
27+
28+
import akka.actor.{ActorSystem, Actor, ActorRef, Props, Address}
29+
import akka.pattern.ask
30+
2031
import org.apache.spark.rpc._
2132
import org.apache.spark.{SecurityManager, SparkConf}
2233

34+
2335
class AkkaRpcEnvSuite extends RpcEnvSuite {
2436

2537
override def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv = {
@@ -47,4 +59,72 @@ class AkkaRpcEnvSuite extends RpcEnvSuite {
4759
}
4860
}
4961

62+
test("Future failure with RpcTimeout") {
63+
64+
class EchoActor extends Actor {
65+
def receive: Receive = {
66+
case msg =>
67+
Thread.sleep(500)
68+
sender() ! msg
69+
}
70+
}
71+
72+
val system = ActorSystem("EchoSystem")
73+
val echoActor = system.actorOf(Props(new EchoActor), name = "echoA")
74+
75+
val timeout = new RpcTimeout(50 millis, "spark.rpc.short.timeout")
76+
77+
val fut = echoActor.ask("hello")(1000 millis).mapTo[String].recover {
78+
case te: TimeoutException => throw timeout.amend(te)
79+
}
80+
81+
fut.onFailure {
82+
case te: TimeoutException => println("failed with timeout exception")
83+
}
84+
85+
fut.onComplete {
86+
case Success(str) => println("future success")
87+
case Failure(ex) => println("future failure")
88+
}
89+
90+
println("sleeping")
91+
Thread.sleep(50)
92+
println("Future complete: " + fut.isCompleted.toString() + ", " + fut.value.toString())
93+
94+
println("Caught TimeoutException: " +
95+
intercept[TimeoutException] {
96+
//timeout.awaitResult(fut) // prints RpcTimeout description twice
97+
Await.result(fut, 10 millis)
98+
}.getMessage()
99+
)
100+
101+
/*
102+
val ref = env.setupEndpoint("test_future", new RpcEndpoint {
103+
override val rpcEnv = env
104+
105+
override def receive = {
106+
case _ =>
107+
}
108+
})
109+
val conf = new SparkConf()
110+
val newRpcEnv = new AkkaRpcEnvFactory().create(
111+
RpcEnvConfig(conf, "test", "localhost", 12346, new SecurityManager(conf)))
112+
try {
113+
val newRef = newRpcEnv.setupEndpointRef("local", ref.address, "test_future")
114+
val akkaActorRef = newRef.asInstanceOf[AkkaRpcEndpointRef].actorRef
115+
116+
val timeout = new RpcTimeout(1 millis, "spark.rpc.short.timeout")
117+
val fut = akkaActorRef.ask("hello")(timeout.duration).mapTo[String]
118+
119+
Thread.sleep(500)
120+
println("Future complete: " + fut.isCompleted.toString() + ", " + fut.value.toString())
121+
122+
} finally {
123+
newRpcEnv.shutdown()
124+
}
125+
*/
126+
127+
128+
}
129+
50130
}

0 commit comments

Comments
 (0)