Skip to content

Commit 4b3dbe4

Browse files
change port to option and some bug fixes
1 parent 17d094e commit 4b3dbe4

File tree

9 files changed

+78
-39
lines changed

9 files changed

+78
-39
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@ private[spark] class Executor(
8787

8888
private val executorMetrics: ExecutorMetrics = new ExecutorMetrics
8989
executorMetrics.setHostname(Utils.localHostName)
90-
executorMetrics.setPort(env.rpcEnv.address.port)
90+
if (env.rpcEnv.address != null) {
91+
executorMetrics.setPort(Some(env.rpcEnv.address.port))
92+
}
9193

9294
// Whether to load classes in user jars before those in Spark jars
9395
private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false)

core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,17 @@ class ExecutorMetrics extends Serializable {
3939
/**
4040
* Host's port the executor runs on
4141
*/
42-
private var _port: Int = _
43-
def port: Int = _port
44-
private[spark] def setPort(value: Int) = _port = value
42+
private var _port: Option[Int] = None
43+
def port: Option[Int] = _port
44+
private[spark] def setPort(value: Option[Int]) = _port = value
4545

46-
private[spark] def hostPort: String = hostname + ":" + port
46+
private[spark] def hostPort: String = {
47+
val hp = port match {
48+
case None => hostname
49+
case value => hostname + ":" + value
50+
}
51+
hp
52+
}
4753

4854
private var _transportMetrics: TransportMetrics = new TransportMetrics
4955
def transportMetrics: TransportMetrics = _transportMetrics
@@ -61,7 +67,7 @@ class ExecutorMetrics extends Serializable {
6167
object ExecutorMetrics extends Serializable {
6268
def apply(
6369
hostName: String,
64-
port: Int,
70+
port: Option[Int],
6571
transportMetrics: TransportMetrics): ExecutorMetrics = {
6672
val execMetrics = new ExecutorMetrics
6773
execMetrics.setHostname(hostName)

core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,27 +40,32 @@ private[ui] class MemoryTab(parent: SparkUI) extends SparkUITab(parent, "memory"
4040
class MemoryListener extends SparkListener {
4141
type ExecutorId = String
4242
val activeExecutorIdToMem = new HashMap[ExecutorId, MemoryUIInfo]
43+
// TODO There might be plenty of removed executors (e.g. Dynamic Allocation Mode), This may use
44+
// too much memory.
4345
val removedExecutorIdToMem = new HashMap[ExecutorId, MemoryUIInfo]
44-
// latestExecIdToExecMetrics including all executors that is active and removed.
45-
// this may consume a lot of memory when executors are changing frequently, e.g. in dynamical
46-
// allocation mode.
46+
// A map that maintains the latest metrics of each active executor
4747
val latestExecIdToExecMetrics = new HashMap[ExecutorId, ExecutorMetrics]
4848
// activeStagesToMem a map maintains all executors memory information of each stage,
4949
// the Map type is [(stageId, attemptId), Seq[(executorId, MemoryUIInfo)]
5050
val activeStagesToMem = new HashMap[(Int, Int), HashMap[ExecutorId, MemoryUIInfo]]
51+
// TODO We need to get conf of the retained stages so that we don't need to handle all the
52+
// stages since there might be too many completed stages.
5153
val completedStagesToMem = new HashMap[(Int, Int), HashMap[ExecutorId, MemoryUIInfo]]
5254

5355
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
5456
val executorId = event.execId
5557
val executorMetrics = event.executorMetrics
5658
val memoryInfo = activeExecutorIdToMem.getOrElseUpdate(executorId, new MemoryUIInfo)
57-
memoryInfo.updateExecutorMetrics(executorMetrics)
59+
memoryInfo.updateMemUiInfo(executorMetrics)
5860
activeStagesToMem.foreach { case (_, stageMemMetrics) =>
59-
if (stageMemMetrics.contains(executorId)) {
60-
stageMemMetrics.get(executorId).get.updateExecutorMetrics(executorMetrics)
61+
// If executor is added in the stage running time, we also update the metrics for the
62+
// executor in {{activeStagesToMem}}
63+
if (!stageMemMetrics.contains(executorId)) {
64+
stageMemMetrics(executorId) = new MemoryUIInfo
6165
}
66+
stageMemMetrics(executorId).updateMemUiInfo(executorMetrics)
6267
}
63-
latestExecIdToExecMetrics.update(executorId, executorMetrics)
68+
latestExecIdToExecMetrics(executorId) = executorMetrics
6469
}
6570

6671
override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
@@ -71,28 +76,39 @@ class MemoryListener extends SparkListener {
7176
override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
7277
val executorId = event.executorId
7378
val info = activeExecutorIdToMem.remove(executorId)
79+
latestExecIdToExecMetrics.remove(executorId)
7480
removedExecutorIdToMem.getOrElseUpdate(executorId, info.getOrElse(new MemoryUIInfo))
7581
}
7682

7783
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
7884
val stage = (event.stageInfo.stageId, event.stageInfo.attemptId)
7985
val memInfoMap = new HashMap[ExecutorId, MemoryUIInfo]
80-
activeExecutorIdToMem.foreach(idToMem => memInfoMap.update(idToMem._1, new MemoryUIInfo))
81-
activeStagesToMem.update(stage, memInfoMap)
86+
activeExecutorIdToMem.map { case (id, _) =>
87+
memInfoMap(id) = new MemoryUIInfo
88+
val latestExecMetrics = latestExecIdToExecMetrics.get(id)
89+
latestExecMetrics match {
90+
case None => // Do nothing
91+
case Some(metrics) =>
92+
memInfoMap(id).updateMemUiInfo(metrics)
93+
}
94+
}
95+
activeStagesToMem(stage) = memInfoMap
8296
}
8397

8498
override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
8599
val stage = (event.stageInfo.stageId, event.stageInfo.attemptId)
100+
// We need to refresh {{activeStagesToMem}} with {{activeExecutorIdToMem}} in case the
101+
// executor is added in the stage running time and no {{SparkListenerExecutorMetricsUpdate}}
102+
// event is updated in this stage.
86103
activeStagesToMem.get(stage).map { memInfoMap =>
87-
activeExecutorIdToMem.foreach { case (executorId, _) =>
88-
val memInfo = memInfoMap.getOrElse(executorId, new MemoryUIInfo)
89-
latestExecIdToExecMetrics.get(executorId).foreach { prevExecutorMetrics =>
90-
memInfo.updateExecutorMetrics(prevExecutorMetrics)
104+
activeExecutorIdToMem.foreach { case (executorId, memUiInfo) =>
105+
if (!memInfoMap.contains(executorId)) {
106+
memInfoMap(executorId) = new MemoryUIInfo
107+
memInfoMap(executorId).copyMemUiInfo(memUiInfo)
91108
}
92-
memInfoMap.update(executorId, memInfo)
93109
}
94-
completedStagesToMem.put(stage, activeStagesToMem.remove(stage).get)
95110
}
111+
completedStagesToMem.put(stage, activeStagesToMem.remove(stage).get)
96112
}
97113
}
98114

@@ -106,13 +122,18 @@ class MemoryUIInfo {
106122
executorAddress = execInfo.executorHost
107123
}
108124

109-
def updateExecutorMetrics(execMetrics: ExecutorMetrics): Unit = {
125+
def updateMemUiInfo(execMetrics: ExecutorMetrics): Unit = {
110126
transportInfo = transportInfo match {
111127
case Some(transportMemSize) => transportInfo
112128
case _ => Some(new TransportMemSize)
113129
}
114130
executorAddress = execMetrics.hostPort
115-
transportInfo.get.updateTransport(execMetrics.transportMetrics)
131+
transportInfo.get.updateTransMemSize(execMetrics.transportMetrics)
132+
}
133+
134+
def copyMemUiInfo(memUiInfo: MemoryUIInfo): Unit = {
135+
executorAddress = memUiInfo.executorAddress
136+
transportInfo.foreach(_.copyTransMemSize(memUiInfo.transportInfo.get))
116137
}
117138
}
118139

@@ -123,7 +144,7 @@ class TransportMemSize {
123144
var peakOnHeapSizeTime: MemTime = new MemTime()
124145
var peakOffHeapSizeTime: MemTime = new MemTime()
125146

126-
def updateTransport(transportMetrics: TransportMetrics): Unit = {
147+
def updateTransMemSize(transportMetrics: TransportMetrics): Unit = {
127148
val updatedOnHeapSize = transportMetrics.onHeapSize
128149
val updatedOffHeapSize = transportMetrics.offHeapSize
129150
val updateTime: Long = transportMetrics.timeStamp
@@ -136,6 +157,15 @@ class TransportMemSize {
136157
peakOffHeapSizeTime = MemTime(updatedOffHeapSize, updateTime)
137158
}
138159
}
160+
161+
def copyTransMemSize(transMemSize: TransportMemSize): Unit = {
162+
onHeapSize = transMemSize.onHeapSize
163+
offHeapSize = transMemSize.offHeapSize
164+
peakOnHeapSizeTime = MemTime(transMemSize.peakOnHeapSizeTime.memorySize,
165+
transMemSize.peakOnHeapSizeTime.timeStamp)
166+
peakOffHeapSizeTime = MemTime(transMemSize.peakOffHeapSizeTime.memorySize,
167+
transMemSize.peakOffHeapSizeTime.timeStamp)
168+
}
139169
}
140170

141171
@DeveloperApi

core/src/main/scala/org/apache/spark/ui/memory/MemoryTable.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ private[ui] class MemTableBase(
3535
<th>Executor ID</th>
3636
<th>Address</th>
3737
<th>Network Memory (on-heap)</th>
38-
<th>Network Memory (direct-heap)</th>
38+
<th>Network Memory (off-heap)</th>
3939
<th>Peak Network Memory (on-heap) / Happen Time</th>
40-
<th>Peak Network Read (direct-heap) / Happen Time</th>
40+
<th>Peak Network Read (off-heap) / Happen Time</th>
4141
}
4242

4343
def toNodeSeq: Seq[Node] = {

core/src/main/scala/org/apache/spark/ui/memory/StageMemoryPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ private[ui] class StageMemoryPage(parent: MemoryTab) extends WebUIPage("stage")
3939

4040
val finishedStageToMem = memoryListener.completedStagesToMem
4141
val content = if (finishedStageToMem.get(stage).isDefined) {
42-
val executorIdToMem = finishedStageToMem.get(stage).get.toSeq.sortBy(_._1)
42+
val executorIdToMem = finishedStageToMem(stage).toSeq.sortBy(_._1)
4343
val execMemTable = new MemTableBase(executorIdToMem, memoryListener)
4444
<h4 id="activeExec">Executors ({executorIdToMem.size})</h4> ++
4545
execMemTable.toNodeSeq

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.collection.Map
2424

2525
import com.fasterxml.jackson.databind.ObjectMapper
2626
import com.fasterxml.jackson.module.scala.DefaultScalaModule
27-
import org.json4s.DefaultFormats
27+
import org.json4s.{JsonAST, DefaultFormats}
2828
import org.json4s.JsonDSL._
2929
import org.json4s.JsonAST._
3030
import org.json4s.jackson.JsonMethods._
@@ -296,7 +296,7 @@ private[spark] object JsonProtocol {
296296
def executorMetricsToJson(executorMetrics: ExecutorMetrics): JValue = {
297297
val transportMetrics = transportMetricsToJson(executorMetrics.transportMetrics)
298298
("Executor Hostname" -> executorMetrics.hostname) ~
299-
("Executor Port" -> executorMetrics.port) ~
299+
("Executor Port" -> executorMetrics.port.map(new JInt(_)).getOrElse(JNothing)) ~
300300
("TransportMetrics" -> transportMetrics)
301301
}
302302

@@ -732,7 +732,7 @@ private[spark] object JsonProtocol {
732732
return metrics
733733
}
734734
metrics.setHostname((json \ "Executor Hostname").extract[String])
735-
metrics.setPort((json \ "Executor Port").extract[Int])
735+
metrics.setPort(Utils.jsonOption(json \ "Executor Port").map(_.extract[Int]))
736736
metrics.setTransportMetrics(transportMetricsFromJson(json \ "TransportMetrics"))
737737
metrics
738738
}

core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
129129
val eventLogger = new EventLoggingListener("test-memListener", None, testDirPath.toUri(), conf)
130130
val execId = "exec-1"
131131
val hostName = "host-1"
132-
val port = 80
132+
val port: Option[Int] = Some(80)
133133

134134
eventLogger.start()
135135
eventLogger.onExecutorAdded(SparkListenerExecutorAdded(

core/src/test/scala/org/apache/spark/ui/memory/MemoryListenerSuite.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class MemoryListenerSuite extends SparkFunSuite {
2828
val listener = new MemoryListener
2929
val execId1 = "exec-1"
3030
val host1 = "host-1"
31+
val port: Option[Int] = Some(80)
3132

3233
listener.onExecutorAdded(
3334
SparkListenerExecutorAdded(1L, execId1, new ExecutorInfo(host1, 1, Map.empty)))
@@ -38,13 +39,13 @@ class MemoryListenerSuite extends SparkFunSuite {
3839

3940
// multiple metrics updated in stage 2
4041
listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(2))
41-
val execMetrics1 = MemoryListenerSuite.createExecutorMetrics(host1, 80, 2L, 20, 10)
42+
val execMetrics1 = MemoryListenerSuite.createExecutorMetrics(host1, port, 2L, 20, 10)
4243
listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
4344
execId1, execMetrics1))
44-
val execMetrics2 = MemoryListenerSuite.createExecutorMetrics(host1, 80, 3L, 30, 5)
45+
val execMetrics2 = MemoryListenerSuite.createExecutorMetrics(host1, port, 3L, 30, 5)
4546
listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
4647
execId1, execMetrics2))
47-
val execMetrics3 = MemoryListenerSuite.createExecutorMetrics(host1, 80, 4L, 15, 15)
48+
val execMetrics3 = MemoryListenerSuite.createExecutorMetrics(host1, port, 4L, 15, 15)
4849
listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
4950
execId1, execMetrics3))
5051
listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(2))
@@ -87,7 +88,8 @@ class MemoryListenerSuite extends SparkFunSuite {
8788
val listener = new MemoryListener
8889
val (execId1, execId2, execId3) = ("exec-1", "exec-2", "exec-3")
8990
val (host1, host2, host3) = ("host-1", "host-2", "host-3")
90-
val (port1, port2, port3) = (80, 80, 80)
91+
val (port1, port2, port3): (Option[Int], Option[Int], Option[Int]) =
92+
(Some(80), Some(80), Some(80))
9193

9294
// two executors added first
9395
listener.onExecutorAdded(
@@ -135,7 +137,6 @@ class MemoryListenerSuite extends SparkFunSuite {
135137

136138
assert(listener.activeStagesToMem.isEmpty)
137139
assert(listener.completedStagesToMem.size === 4)
138-
assert(listener.activeExecutorIdToMem.size === listener.latestExecIdToExecMetrics.size)
139140
assert(listener.removedExecutorIdToMem.size === 1)
140141

141142
listener.onExecutorRemoved(SparkListenerExecutorRemoved(7L, execId1, ""))
@@ -180,7 +181,7 @@ object MemoryListenerSuite extends SparkFunSuite {
180181

181182
def createExecutorMetrics(
182183
hostname: String,
183-
port: Int,
184+
port: Option[Int],
184185
timeStamp: Long,
185186
onHeapSize: Long,
186187
offHeapSize: Long): ExecutorMetrics = {

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ class JsonProtocolSuite extends SparkFunSuite {
8686
val executorMetrics = {
8787
val execMetrics = new ExecutorMetrics
8888
execMetrics.setHostname("host-1")
89-
execMetrics.setPort(80)
89+
execMetrics.setPort(Some(80))
9090
execMetrics.setTransportMetrics(TransportMetrics(0L, 10, 10))
9191
execMetrics
9292
}
@@ -398,7 +398,7 @@ class JsonProtocolSuite extends SparkFunSuite {
398398
val oldJson = newJson.removeField { case (field, _) => field == "Executor Metrics Updated"}
399399
val newMetrics = JsonProtocol.executorMetricsUpdateFromJson(oldJson)
400400
assert(newMetrics.executorMetrics.hostname === "")
401-
assert(newMetrics.executorMetrics.port === 0)
401+
assert(newMetrics.executorMetrics.port === None)
402402
assert(newMetrics.executorMetrics.transportMetrics.onHeapSize === 0L)
403403
assert(newMetrics.executorMetrics.transportMetrics.offHeapSize === 0L)
404404
assert(newMetrics.executorMetrics.transportMetrics.timeStamp != 0L)

0 commit comments

Comments
 (0)