Skip to content

Commit 6e2c59f

Browse files
author
José Hiram Soltren
committed
Update ExecutorsTab and BlacklistTrackerSuite for squito review feedback
1 parent d3d3583 commit 6e2c59f

File tree

2 files changed

+61
-93
lines changed

2 files changed

+61
-93
lines changed

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
7474

7575
def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList
7676

77-
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
77+
override def onExecutorAdded(
78+
executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
7879
val eid = executorAdded.executorId
7980
val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
8081
taskSummary.executorLogs = executorAdded.executorInfo.logUrlMap
@@ -101,7 +102,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
101102
executorToTaskSummary.get(executorRemoved.executorId).foreach(e => e.isAlive = false)
102103
}
103104

104-
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
105+
override def onApplicationStart(
106+
applicationStart: SparkListenerApplicationStart): Unit = {
105107
applicationStart.driverLogs.foreach { logs =>
106108
val storageStatus = activeStorageStatusList.find { s =>
107109
s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER ||
@@ -115,13 +117,15 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
115117
}
116118
}
117119

118-
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
120+
override def onTaskStart(
121+
taskStart: SparkListenerTaskStart): Unit = synchronized {
119122
val eid = taskStart.taskInfo.executorId
120123
val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
121124
taskSummary.tasksActive += 1
122125
}
123126

124-
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
127+
override def onTaskEnd(
128+
taskEnd: SparkListenerTaskEnd): Unit = synchronized {
125129
val info = taskEnd.taskInfo
126130
if (info != null) {
127131
val eid = info.executorId
@@ -158,22 +162,26 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
158162
}
159163
}
160164

161-
private def updateExecutorBlacklist(eid: String, isBlacklisted: Boolean): Unit = {
165+
private def updateExecutorBlacklist(
166+
eid: String, isBlacklisted: Boolean): Unit = {
162167
val execTaskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
163168
execTaskSummary.isBlacklisted = isBlacklisted
164169
}
165170

166-
override def onExecutorBlacklisted(executorBlacklisted: SparkListenerExecutorBlacklisted)
171+
override def onExecutorBlacklisted(
172+
executorBlacklisted: SparkListenerExecutorBlacklisted)
167173
: Unit = synchronized {
168174
updateExecutorBlacklist(executorBlacklisted.executorId, true)
169175
}
170176

171-
override def onExecutorUnblacklisted(executorUnblacklisted: SparkListenerExecutorUnblacklisted)
177+
override def onExecutorUnblacklisted(
178+
executorUnblacklisted: SparkListenerExecutorUnblacklisted)
172179
: Unit = synchronized {
173180
updateExecutorBlacklist(executorUnblacklisted.executorId, false)
174181
}
175182

176-
override def onNodeBlacklisted(nodeBlacklisted: SparkListenerNodeBlacklisted)
183+
override def onNodeBlacklisted(
184+
nodeBlacklisted: SparkListenerNodeBlacklisted)
177185
: Unit = synchronized {
178186
/*
179187
Implicitly blacklist every executor associated with this node, and show this in the UI.
@@ -185,7 +193,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
185193
}
186194
}
187195

188-
override def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted)
196+
override def onNodeUnblacklisted(
197+
nodeUnblacklisted: SparkListenerNodeUnblacklisted)
189198
: Unit = synchronized {
190199
// Implicitly unblacklist every executor associated with this node, regardless of how
191200
// they may have been blacklisted initially (either explicitly through executor blacklisting

0 commit comments

Comments
 (0)