Skip to content

Commit 796977a

Browse files
committed
SPARK-1244: Throw exception if map output status exceeds frame size
This is a very small change on top of @andrewor14's patch in apache#147. Author: Patrick Wendell <[email protected]> Author: Andrew Or <[email protected]> Closes apache#152 from pwendell/akka-frame and squashes the following commits: e5fb3ff [Patrick Wendell] Reversing test order 393af4c [Patrick Wendell] Small improvement suggested by Andrew Or 8045103 [Patrick Wendell] Breaking out into two tests 2b4e085 [Patrick Wendell] Consolidate Executor use of akka frame size c9b6109 [Andrew Or] Simplify test + make access to akka frame size more modular 281d7c9 [Andrew Or] Throw exception on spark.akka.frameSize exceeded + Unit tests
1 parent dc96546 commit 796977a

File tree

6 files changed

+84
-20
lines changed

6 files changed

+84
-20
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,28 @@ private[spark] case class GetMapOutputStatuses(shuffleId: Int)
3535
extends MapOutputTrackerMessage
3636
private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
3737

38-
private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster)
38+
private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster, conf: SparkConf)
3939
extends Actor with Logging {
40+
val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
41+
4042
def receive = {
4143
case GetMapOutputStatuses(shuffleId: Int) =>
4244
val hostPort = sender.path.address.hostPort
4345
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
44-
sender ! tracker.getSerializedMapOutputStatuses(shuffleId)
46+
val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId)
47+
val serializedSize = mapOutputStatuses.size
48+
if (serializedSize > maxAkkaFrameSize) {
49+
val msg = s"Map output statuses were $serializedSize bytes which " +
50+
s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes)."
51+
52+
/* For SPARK-1244 we'll opt for just logging an error and then throwing an exception.
53+
* Note that on exception the actor will just restart. A bigger refactoring (SPARK-1239)
54+
* will ultimately remove this entire code path. */
55+
val exception = new SparkException(msg)
56+
logError(msg, exception)
57+
throw exception
58+
}
59+
sender ! mapOutputStatuses
4560

4661
case StopMapOutputTracker =>
4762
logInfo("MapOutputTrackerActor stopped!")

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ object SparkEnv extends Logging {
191191
}
192192
mapOutputTracker.trackerActor = registerOrLookup(
193193
"MapOutputTracker",
194-
new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]))
194+
new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
195195

196196
val shuffleFetcher = instantiateClass[ShuffleFetcher](
197197
"spark.shuffle.fetcher", "org.apache.spark.BlockStoreShuffleFetcher")

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark._
2929
import org.apache.spark.deploy.SparkHadoopUtil
3030
import org.apache.spark.scheduler._
3131
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
32-
import org.apache.spark.util.Utils
32+
import org.apache.spark.util.{AkkaUtils, Utils}
3333

3434
/**
3535
* Spark executor used with Mesos, YARN, and the standalone scheduler.
@@ -120,9 +120,7 @@ private[spark] class Executor(
120120

121121
// Akka's message frame size. If task result is bigger than this, we use the block manager
122122
// to send the result back.
123-
private val akkaFrameSize = {
124-
env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size")
125-
}
123+
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
126124

127125
// Start worker thread pool
128126
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")

core/src/main/scala/org/apache/spark/util/AkkaUtils.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ private[spark] object AkkaUtils extends Logging {
4949

5050
val akkaTimeout = conf.getInt("spark.akka.timeout", 100)
5151

52-
val akkaFrameSize = conf.getInt("spark.akka.frameSize", 10)
52+
val akkaFrameSize = maxFrameSizeBytes(conf)
5353
val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
5454
val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
5555
if (!akkaLogLifecycleEvents) {
@@ -92,7 +92,7 @@ private[spark] object AkkaUtils extends Logging {
9292
|akka.remote.netty.tcp.port = $port
9393
|akka.remote.netty.tcp.tcp-nodelay = on
9494
|akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
95-
|akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB
95+
|akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
9696
|akka.remote.netty.tcp.execution-pool-size = $akkaThreads
9797
|akka.actor.default-dispatcher.throughput = $akkaBatchSize
9898
|akka.log-config-on-start = $logAkkaConfig
@@ -121,4 +121,9 @@ private[spark] object AkkaUtils extends Logging {
121121
def lookupTimeout(conf: SparkConf): FiniteDuration = {
122122
Duration.create(conf.get("spark.akka.lookupTimeout", "30").toLong, "seconds")
123123
}
124+
125+
/** Returns the configured max frame size for Akka messages in bytes. */
126+
def maxFrameSizeBytes(conf: SparkConf): Int = {
127+
conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024
128+
}
124129
}

core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,12 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
4545

4646
val masterTracker = new MapOutputTrackerMaster(conf)
4747
masterTracker.trackerActor = actorSystem.actorOf(
48-
Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
48+
Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
4949

5050
val badconf = new SparkConf
5151
badconf.set("spark.authenticate", "true")
5252
badconf.set("spark.authenticate.secret", "bad")
53-
val securityManagerBad = new SecurityManager(badconf);
53+
val securityManagerBad = new SecurityManager(badconf)
5454

5555
assert(securityManagerBad.isAuthenticationEnabled() === true)
5656

@@ -84,7 +84,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
8484

8585
val masterTracker = new MapOutputTrackerMaster(conf)
8686
masterTracker.trackerActor = actorSystem.actorOf(
87-
Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
87+
Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
8888

8989
val badconf = new SparkConf
9090
badconf.set("spark.authenticate", "false")
@@ -136,7 +136,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
136136

137137
val masterTracker = new MapOutputTrackerMaster(conf)
138138
masterTracker.trackerActor = actorSystem.actorOf(
139-
Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
139+
Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
140140

141141
val goodconf = new SparkConf
142142
goodconf.set("spark.authenticate", "true")
@@ -189,7 +189,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
189189

190190
val masterTracker = new MapOutputTrackerMaster(conf)
191191
masterTracker.trackerActor = actorSystem.actorOf(
192-
Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
192+
Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
193193

194194
val badconf = new SparkConf
195195
badconf.set("spark.authenticate", "false")

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark
2020
import scala.concurrent.Await
2121

2222
import akka.actor._
23+
import akka.testkit.TestActorRef
2324
import org.scalatest.FunSuite
2425

2526
import org.apache.spark.scheduler.MapStatus
@@ -51,14 +52,16 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
5152
test("master start and stop") {
5253
val actorSystem = ActorSystem("test")
5354
val tracker = new MapOutputTrackerMaster(conf)
54-
tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
55+
tracker.trackerActor =
56+
actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
5557
tracker.stop()
5658
}
5759

5860
test("master register and fetch") {
5961
val actorSystem = ActorSystem("test")
6062
val tracker = new MapOutputTrackerMaster(conf)
61-
tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
63+
tracker.trackerActor =
64+
actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
6265
tracker.registerShuffle(10, 2)
6366
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
6467
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
@@ -77,7 +80,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
7780
test("master register and unregister and fetch") {
7881
val actorSystem = ActorSystem("test")
7982
val tracker = new MapOutputTrackerMaster(conf)
80-
tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
83+
tracker.trackerActor =
84+
actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
8185
tracker.registerShuffle(10, 2)
8286
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
8387
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
@@ -100,11 +104,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
100104
val hostname = "localhost"
101105
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf,
102106
securityManager = new SecurityManager(conf))
103-
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
107+
108+
// Will be cleared by LocalSparkContext
109+
System.setProperty("spark.driver.port", boundPort.toString)
104110

105111
val masterTracker = new MapOutputTrackerMaster(conf)
106112
masterTracker.trackerActor = actorSystem.actorOf(
107-
Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
113+
Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
108114

109115
val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf,
110116
securityManager = new SecurityManager(conf))
@@ -126,7 +132,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
126132
masterTracker.incrementEpoch()
127133
slaveTracker.updateEpoch(masterTracker.getEpoch)
128134
assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
129-
Seq((BlockManagerId("a", "hostA", 1000, 0), size1000)))
135+
Seq((BlockManagerId("a", "hostA", 1000, 0), size1000)))
130136

131137
masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0))
132138
masterTracker.incrementEpoch()
@@ -136,4 +142,44 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
136142
// failure should be cached
137143
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
138144
}
145+
146+
test("remote fetch below akka frame size") {
147+
val newConf = new SparkConf
148+
newConf.set("spark.akka.frameSize", "1")
149+
newConf.set("spark.akka.askTimeout", "1") // Fail fast
150+
151+
val masterTracker = new MapOutputTrackerMaster(conf)
152+
val actorSystem = ActorSystem("test")
153+
val actorRef = TestActorRef[MapOutputTrackerMasterActor](
154+
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
155+
val masterActor = actorRef.underlyingActor
156+
157+
// Frame size should be ~123B, and no exception should be thrown
158+
masterTracker.registerShuffle(10, 1)
159+
masterTracker.registerMapOutput(10, 0, new MapStatus(
160+
BlockManagerId("88", "mph", 1000, 0), Array.fill[Byte](10)(0)))
161+
masterActor.receive(GetMapOutputStatuses(10))
162+
}
163+
164+
test("remote fetch exceeds akka frame size") {
165+
val newConf = new SparkConf
166+
newConf.set("spark.akka.frameSize", "1")
167+
newConf.set("spark.akka.askTimeout", "1") // Fail fast
168+
169+
val masterTracker = new MapOutputTrackerMaster(conf)
170+
val actorSystem = ActorSystem("test")
171+
val actorRef = TestActorRef[MapOutputTrackerMasterActor](
172+
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
173+
val masterActor = actorRef.underlyingActor
174+
175+
// Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception.
176+
// Note that the size is hand-selected here because map output statuses are compressed before
177+
// being sent.
178+
masterTracker.registerShuffle(20, 100)
179+
(0 until 100).foreach { i =>
180+
masterTracker.registerMapOutput(20, i, new MapStatus(
181+
BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0)))
182+
}
183+
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
184+
}
139185
}

0 commit comments

Comments
 (0)