@@ -40,27 +40,32 @@ private[ui] class MemoryTab(parent: SparkUI) extends SparkUITab(parent, "memory"
4040class MemoryListener extends SparkListener {
4141 type ExecutorId = String
4242 val activeExecutorIdToMem = new HashMap [ExecutorId , MemoryUIInfo ]
43+ // TODO There might be plenty of removed executors (e.g. Dynamic Allocation Mode), This may use
44+ // too much memory.
4345 val removedExecutorIdToMem = new HashMap [ExecutorId , MemoryUIInfo ]
44- // latestExecIdToExecMetrics including all executors that is active and removed.
45- // this may consume a lot of memory when executors are changing frequently, e.g. in dynamical
46- // allocation mode.
46+ // A map that maintains the latest metrics of each active executor
4747 val latestExecIdToExecMetrics = new HashMap [ExecutorId , ExecutorMetrics ]
4848 // activeStagesToMem a map maintains all executors memory information of each stage,
4949 // the Map type is [(stageId, attemptId), Seq[(executorId, MemoryUIInfo)]
5050 val activeStagesToMem = new HashMap [(Int , Int ), HashMap [ExecutorId , MemoryUIInfo ]]
51+ // TODO We need to get conf of the retained stages so that we don't need to handle all the
52+ // stages since there might be too many completed stages.
5153 val completedStagesToMem = new HashMap [(Int , Int ), HashMap [ExecutorId , MemoryUIInfo ]]
5254
5355 override def onExecutorMetricsUpdate (event : SparkListenerExecutorMetricsUpdate ): Unit = {
5456 val executorId = event.execId
5557 val executorMetrics = event.executorMetrics
5658 val memoryInfo = activeExecutorIdToMem.getOrElseUpdate(executorId, new MemoryUIInfo )
57- memoryInfo.updateExecutorMetrics (executorMetrics)
59+ memoryInfo.updateMemUiInfo (executorMetrics)
5860 activeStagesToMem.foreach { case (_, stageMemMetrics) =>
59- if (stageMemMetrics.contains(executorId)) {
60- stageMemMetrics.get(executorId).get.updateExecutorMetrics(executorMetrics)
61+ // If executor is added in the stage running time, we also update the metrics for the
62+ // executor in {{activeStagesToMem}}
63+ if (! stageMemMetrics.contains(executorId)) {
64+ stageMemMetrics(executorId) = new MemoryUIInfo
6165 }
66+ stageMemMetrics(executorId).updateMemUiInfo(executorMetrics)
6267 }
63- latestExecIdToExecMetrics.update (executorId, executorMetrics)
68+ latestExecIdToExecMetrics(executorId) = executorMetrics
6469 }
6570
6671 override def onExecutorAdded (event : SparkListenerExecutorAdded ): Unit = {
@@ -71,28 +76,39 @@ class MemoryListener extends SparkListener {
7176 override def onExecutorRemoved (event : SparkListenerExecutorRemoved ): Unit = {
7277 val executorId = event.executorId
7378 val info = activeExecutorIdToMem.remove(executorId)
79+ latestExecIdToExecMetrics.remove(executorId)
7480 removedExecutorIdToMem.getOrElseUpdate(executorId, info.getOrElse(new MemoryUIInfo ))
7581 }
7682
7783 override def onStageSubmitted (event : SparkListenerStageSubmitted ): Unit = {
7884 val stage = (event.stageInfo.stageId, event.stageInfo.attemptId)
7985 val memInfoMap = new HashMap [ExecutorId , MemoryUIInfo ]
80- activeExecutorIdToMem.foreach(idToMem => memInfoMap.update(idToMem._1, new MemoryUIInfo ))
81- activeStagesToMem.update(stage, memInfoMap)
86+ activeExecutorIdToMem.map { case (id, _) =>
87+ memInfoMap(id) = new MemoryUIInfo
88+ val latestExecMetrics = latestExecIdToExecMetrics.get(id)
89+ latestExecMetrics match {
90+ case None => // Do nothing
91+ case Some (metrics) =>
92+ memInfoMap(id).updateMemUiInfo(metrics)
93+ }
94+ }
95+ activeStagesToMem(stage) = memInfoMap
8296 }
8397
8498 override def onStageCompleted (event : SparkListenerStageCompleted ): Unit = {
8599 val stage = (event.stageInfo.stageId, event.stageInfo.attemptId)
100+ // We need to refresh {{activeStagesToMem}} with {{activeExecutorIdToMem}} in case the
101+ // executor is added in the stage running time and no {{SparkListenerExecutorMetricsUpdate}}
102+ // event is updated in this stage.
86103 activeStagesToMem.get(stage).map { memInfoMap =>
87- activeExecutorIdToMem.foreach { case (executorId, _ ) =>
88- val memInfo = memInfoMap.getOrElse (executorId, new MemoryUIInfo )
89- latestExecIdToExecMetrics.get (executorId).foreach { prevExecutorMetrics =>
90- memInfo.updateExecutorMetrics(prevExecutorMetrics )
104+ activeExecutorIdToMem.foreach { case (executorId, memUiInfo ) =>
105+ if ( ! memInfoMap.contains (executorId)) {
106+ memInfoMap (executorId) = new MemoryUIInfo
107+ memInfoMap(executorId).copyMemUiInfo(memUiInfo )
91108 }
92- memInfoMap.update(executorId, memInfo)
93109 }
94- completedStagesToMem.put(stage, activeStagesToMem.remove(stage).get)
95110 }
111+ completedStagesToMem.put(stage, activeStagesToMem.remove(stage).get)
96112 }
97113}
98114
@@ -106,13 +122,18 @@ class MemoryUIInfo {
106122 executorAddress = execInfo.executorHost
107123 }
108124
109- def updateExecutorMetrics (execMetrics : ExecutorMetrics ): Unit = {
125+ def updateMemUiInfo (execMetrics : ExecutorMetrics ): Unit = {
110126 transportInfo = transportInfo match {
111127 case Some (transportMemSize) => transportInfo
112128 case _ => Some (new TransportMemSize )
113129 }
114130 executorAddress = execMetrics.hostPort
115- transportInfo.get.updateTransport(execMetrics.transportMetrics)
131+ transportInfo.get.updateTransMemSize(execMetrics.transportMetrics)
132+ }
133+
134+ def copyMemUiInfo (memUiInfo : MemoryUIInfo ): Unit = {
135+ executorAddress = memUiInfo.executorAddress
136+ transportInfo.foreach(_.copyTransMemSize(memUiInfo.transportInfo.get))
116137 }
117138}
118139
@@ -123,7 +144,7 @@ class TransportMemSize {
123144 var peakOnHeapSizeTime : MemTime = new MemTime ()
124145 var peakOffHeapSizeTime : MemTime = new MemTime ()
125146
126- def updateTransport (transportMetrics : TransportMetrics ): Unit = {
147+ def updateTransMemSize (transportMetrics : TransportMetrics ): Unit = {
127148 val updatedOnHeapSize = transportMetrics.onHeapSize
128149 val updatedOffHeapSize = transportMetrics.offHeapSize
129150 val updateTime : Long = transportMetrics.timeStamp
@@ -136,6 +157,15 @@ class TransportMemSize {
136157 peakOffHeapSizeTime = MemTime (updatedOffHeapSize, updateTime)
137158 }
138159 }
160+
161+ def copyTransMemSize (transMemSize : TransportMemSize ): Unit = {
162+ onHeapSize = transMemSize.onHeapSize
163+ offHeapSize = transMemSize.offHeapSize
164+ peakOnHeapSizeTime = MemTime (transMemSize.peakOnHeapSizeTime.memorySize,
165+ transMemSize.peakOnHeapSizeTime.timeStamp)
166+ peakOffHeapSizeTime = MemTime (transMemSize.peakOffHeapSizeTime.memorySize,
167+ transMemSize.peakOffHeapSizeTime.timeStamp)
168+ }
139169}
140170
141171@ DeveloperApi
0 commit comments