@@ -89,8 +89,8 @@ private[spark] class EventLoggingListener(
8989 private [scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName)
9090
9191 private val executorIdToLatestMetrics = new HashMap [String , SparkListenerExecutorMetricsUpdate ]
92- private val executorIdToModifiedMaxMetrics = new
93- HashMap [String , SparkListenerExecutorMetricsUpdate ]
92+ private val executorIdToModifiedMaxMetrics =
93+ new HashMap [String , SparkListenerExecutorMetricsUpdate ]
9494
9595 /**
9696 * Creates the log file in the configured log directory.
@@ -242,7 +242,7 @@ private[spark] class EventLoggingListener(
242242 // We only track the executor metrics in each stage, so we drop the task metrics as they are
243243 // quite verbose
244244 val eventWithoutTaskMetrics = SparkListenerExecutorMetricsUpdate (
245- event.execId, event.executorMetrics, Seq .empty )
245+ event.execId, Seq .empty, event.executorMetrics )
246246 executorIdToLatestMetrics(eventWithoutTaskMetrics.execId) = eventWithoutTaskMetrics
247247 updateModifiedMetrics(eventWithoutTaskMetrics)
248248 }
@@ -295,6 +295,9 @@ private[spark] class EventLoggingListener(
295295 * Does this event match the ID of an executor we are already tracking?
296296 * If no, start tracking metrics for this executor, starting at this event.
297297 * If yes, compare time stamps, and perhaps update using this event.
298+ * Only do this if executorMetrics is present in the toBeModifiedEvent.
299+ * If it is not - meaning we are processing historical data created
300+ * without executorMetrics - simply cache the latestEvent
298301 * @param latestEvent the latest event received, used to update our map of stored metrics.
299302 */
300303 private def updateModifiedMetrics (latestEvent : SparkListenerExecutorMetricsUpdate ): Unit = {
@@ -304,30 +307,36 @@ private[spark] class EventLoggingListener(
304307 case None =>
305308 executorIdToModifiedMaxMetrics(executorId) = latestEvent
306309 case Some (toBeModifiedEvent) =>
307- val toBeModifiedTransportMetrics = toBeModifiedEvent.executorMetrics.transportMetrics
308- val latestTransportMetrics = latestEvent.executorMetrics.transportMetrics
309- var timeStamp : Long = toBeModifiedTransportMetrics.timeStamp
310-
311- val onHeapSize = if
312- (latestTransportMetrics.onHeapSize > toBeModifiedTransportMetrics.onHeapSize) {
313- timeStamp = latestTransportMetrics.timeStamp
314- latestTransportMetrics.onHeapSize
315- } else {
316- toBeModifiedTransportMetrics.onHeapSize
310+ if (toBeModifiedEvent.executorMetrics.isEmpty ||
311+ latestEvent.executorMetrics.isEmpty) {
312+ executorIdToModifiedMaxMetrics(executorId) == latestEvent
317313 }
318- val offHeapSize =
319- if (latestTransportMetrics.offHeapSize > toBeModifiedTransportMetrics.offHeapSize) {
320- timeStamp = latestTransportMetrics.timeStamp
321- latestTransportMetrics.offHeapSize
322- } else {
323- toBeModifiedTransportMetrics.offHeapSize
314+ else {
315+ val prevTransportMetrics = toBeModifiedEvent.executorMetrics.get.transportMetrics
316+ val latestTransportMetrics = latestEvent.executorMetrics.get.transportMetrics
317+ var timeStamp : Long = prevTransportMetrics.timeStamp
318+
319+ val onHeapSize = if
320+ (latestTransportMetrics.onHeapSize > prevTransportMetrics.onHeapSize) {
321+ timeStamp = latestTransportMetrics.timeStamp
322+ latestTransportMetrics.onHeapSize
323+ } else {
324+ prevTransportMetrics.onHeapSize
325+ }
326+ val offHeapSize =
327+ if (latestTransportMetrics.offHeapSize > prevTransportMetrics.offHeapSize) {
328+ timeStamp = latestTransportMetrics.timeStamp
329+ latestTransportMetrics.offHeapSize
330+ } else {
331+ prevTransportMetrics.offHeapSize
332+ }
333+ val updatedExecMetrics = ExecutorMetrics (toBeModifiedEvent.executorMetrics.get.hostname,
334+ toBeModifiedEvent.executorMetrics.get.port,
335+ TransportMetrics (timeStamp, onHeapSize, offHeapSize))
336+ val modifiedEvent = SparkListenerExecutorMetricsUpdate (
337+ toBeModifiedEvent.execId, toBeModifiedEvent.accumUpdates, Some (updatedExecMetrics))
338+ executorIdToModifiedMaxMetrics(executorId) = modifiedEvent
324339 }
325- val modifiedExecMetrics = ExecutorMetrics (toBeModifiedEvent.executorMetrics.hostname,
326- toBeModifiedEvent.executorMetrics.port,
327- TransportMetrics (timeStamp, onHeapSize, offHeapSize))
328- val modifiedEvent = SparkListenerExecutorMetricsUpdate (
329- toBeModifiedEvent.execId, modifiedExecMetrics, toBeModifiedEvent.accumUpdates)
330- executorIdToModifiedMaxMetrics(executorId) = modifiedEvent
331340 }
332341 }
333342}
0 commit comments