Skip to content

Commit 2b4e085

Browse files
committed
Consolidate Executor use of akka frame size
1 parent c9b6109 commit 2b4e085

File tree

3 files changed

+10
-12
lines changed

3 files changed

+10
-12
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
3737

3838
private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster, conf: SparkConf)
3939
extends Actor with Logging {
40-
val maxAkkaFrameSize = AkkaUtils.maxFrameSize(conf) * 1024 * 1024 // MB
40+
val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
4141

4242
def receive = {
4343
case GetMapOutputStatuses(shuffleId: Int) =>
@@ -46,8 +46,8 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
4646
val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId)
4747
val serializedSize = mapOutputStatuses.size
4848
if (serializedSize > maxAkkaFrameSize) {
49-
throw new SparkException(
50-
"spark.akka.frameSize exceeded! Map output statuses were %d bytes".format(serializedSize))
49+
throw new SparkException(s"Map output statuses were $serializedSize bytes which " +
50+
s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes).")
5151
}
5252
sender ! mapOutputStatuses
5353

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark._
2929
import org.apache.spark.deploy.SparkHadoopUtil
3030
import org.apache.spark.scheduler._
3131
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
32-
import org.apache.spark.util.Utils
32+
import org.apache.spark.util.{AkkaUtils, Utils}
3333

3434
/**
3535
* Spark executor used with Mesos, YARN, and the standalone scheduler.
@@ -120,9 +120,7 @@ private[spark] class Executor(
120120

121121
// Akka's message frame size. If task result is bigger than this, we use the block manager
122122
// to send the result back.
123-
private val akkaFrameSize = {
124-
env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size")
125-
}
123+
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
126124

127125
// Start worker thread pool
128126
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")

core/src/main/scala/org/apache/spark/util/AkkaUtils.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ private[spark] object AkkaUtils extends Logging {
4949

5050
val akkaTimeout = conf.getInt("spark.akka.timeout", 100)
5151

52-
val akkaFrameSize = maxFrameSize(conf)
52+
val akkaFrameSize = maxFrameSizeBytes(conf)
5353
val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
5454
val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
5555
if (!akkaLogLifecycleEvents) {
@@ -92,7 +92,7 @@ private[spark] object AkkaUtils extends Logging {
9292
|akka.remote.netty.tcp.port = $port
9393
|akka.remote.netty.tcp.tcp-nodelay = on
9494
|akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
95-
|akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB
95+
|akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
9696
|akka.remote.netty.tcp.execution-pool-size = $akkaThreads
9797
|akka.actor.default-dispatcher.throughput = $akkaBatchSize
9898
|akka.log-config-on-start = $logAkkaConfig
@@ -122,8 +122,8 @@ private[spark] object AkkaUtils extends Logging {
122122
Duration.create(conf.get("spark.akka.lookupTimeout", "30").toLong, "seconds")
123123
}
124124

125-
/** Returns the default max frame size for Akka messages in MB. */
126-
def maxFrameSize(conf: SparkConf): Int = {
127-
conf.getInt("spark.akka.frameSize", 10)
125+
/** Returns the configured max frame size for Akka messages in bytes. */
126+
def maxFrameSizeBytes(conf: SparkConf): Int = {
127+
conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024
128128
}
129129
}

0 commit comments

Comments
 (0)