@@ -74,7 +74,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
7474
7575 def deadStorageStatusList : Seq [StorageStatus ] = storageStatusListener.deadStorageStatusList
7676
77- override def onExecutorAdded (executorAdded : SparkListenerExecutorAdded ): Unit = synchronized {
77+ override def onExecutorAdded (
78+ executorAdded : SparkListenerExecutorAdded ): Unit = synchronized {
7879 val eid = executorAdded.executorId
7980 val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary (eid))
8081 taskSummary.executorLogs = executorAdded.executorInfo.logUrlMap
@@ -101,7 +102,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
101102 executorToTaskSummary.get(executorRemoved.executorId).foreach(e => e.isAlive = false )
102103 }
103104
104- override def onApplicationStart (applicationStart : SparkListenerApplicationStart ): Unit = {
105+ override def onApplicationStart (
106+ applicationStart : SparkListenerApplicationStart ): Unit = {
105107 applicationStart.driverLogs.foreach { logs =>
106108 val storageStatus = activeStorageStatusList.find { s =>
107109 s.blockManagerId.executorId == SparkContext .LEGACY_DRIVER_IDENTIFIER ||
@@ -115,13 +117,15 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
115117 }
116118 }
117119
118- override def onTaskStart (taskStart : SparkListenerTaskStart ): Unit = synchronized {
120+ override def onTaskStart (
121+ taskStart : SparkListenerTaskStart ): Unit = synchronized {
119122 val eid = taskStart.taskInfo.executorId
120123 val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary (eid))
121124 taskSummary.tasksActive += 1
122125 }
123126
124- override def onTaskEnd (taskEnd : SparkListenerTaskEnd ): Unit = synchronized {
127+ override def onTaskEnd (
128+ taskEnd : SparkListenerTaskEnd ): Unit = synchronized {
125129 val info = taskEnd.taskInfo
126130 if (info != null ) {
127131 val eid = info.executorId
@@ -158,22 +162,26 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
158162 }
159163 }
160164
161- private def updateExecutorBlacklist (eid : String , isBlacklisted : Boolean ): Unit = {
165+ private def updateExecutorBlacklist (
166+ eid : String , isBlacklisted : Boolean ): Unit = {
162167 val execTaskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary (eid))
163168 execTaskSummary.isBlacklisted = isBlacklisted
164169 }
165170
166- override def onExecutorBlacklisted (executorBlacklisted : SparkListenerExecutorBlacklisted )
171+ override def onExecutorBlacklisted (
172+ executorBlacklisted : SparkListenerExecutorBlacklisted )
167173 : Unit = synchronized {
168174 updateExecutorBlacklist(executorBlacklisted.executorId, true )
169175 }
170176
171- override def onExecutorUnblacklisted (executorUnblacklisted : SparkListenerExecutorUnblacklisted )
177+ override def onExecutorUnblacklisted (
178+ executorUnblacklisted : SparkListenerExecutorUnblacklisted )
172179 : Unit = synchronized {
173180 updateExecutorBlacklist(executorUnblacklisted.executorId, false )
174181 }
175182
176- override def onNodeBlacklisted (nodeBlacklisted : SparkListenerNodeBlacklisted )
183+ override def onNodeBlacklisted (
184+ nodeBlacklisted : SparkListenerNodeBlacklisted )
177185 : Unit = synchronized {
178186 /*
179187 Implicitly blacklist every executor associated with this node, and show this in the UI.
@@ -185,7 +193,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
185193 }
186194 }
187195
188- override def onNodeUnblacklisted (nodeUnblacklisted : SparkListenerNodeUnblacklisted )
196+ override def onNodeUnblacklisted (
197+ nodeUnblacklisted : SparkListenerNodeUnblacklisted )
189198 : Unit = synchronized {
190199 // Implicitly unblacklist every executor associated with this node, regardless of how
191200 // they may have been blacklisted initially (either explicitly through executor blacklisting
0 commit comments