Skip to content

Commit a08836c

Browse files
author
Stavros Kontopoulos
committed
add application status source
1 parent e7853dc commit a08836c

File tree

5 files changed

+126
-9
lines changed

5 files changed

+126
-9
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ import org.apache.spark.rpc.RpcEndpointRef
5353
import org.apache.spark.scheduler._
5454
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
5555
import org.apache.spark.scheduler.local.LocalSchedulerBackend
56-
import org.apache.spark.status.AppStatusStore
56+
import org.apache.spark.status.{AppStatusSource, AppStatusStore}
5757
import org.apache.spark.status.api.v1.ThreadStackTrace
5858
import org.apache.spark.storage._
5959
import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
@@ -418,7 +418,8 @@ class SparkContext(config: SparkConf) extends Logging {
418418

419419
// Initialize the app status store and listener before SparkEnv is created so that it gets
420420
// all events.
421-
_statusStore = AppStatusStore.createLiveStore(conf)
421+
val appStatusSource = AppStatusSource.createSource(conf)
422+
_statusStore = AppStatusStore.createLiveStore(conf, appStatusSource)
422423
listenerBus.addToStatusQueue(_statusStore.listener.get)
423424

424425
// Create the Spark execution environment (cache, map output tracker, etc)
@@ -569,7 +570,7 @@ class SparkContext(config: SparkConf) extends Logging {
569570
_executorAllocationManager.foreach { e =>
570571
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
571572
}
572-
573+
appStatusSource.foreach(_env.metricsSystem.registerSource(_))
573574
// Make sure the context is stopped if the user forgets about it. This avoids leaving
574575
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
575576
// is killed, though.

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.status
1919

20+
import java.time.Duration
2021
import java.util.Date
2122
import java.util.concurrent.ConcurrentHashMap
2223
import java.util.function.Function
@@ -44,6 +45,7 @@ private[spark] class AppStatusListener(
4445
kvstore: ElementTrackingStore,
4546
conf: SparkConf,
4647
live: Boolean,
48+
appStatusSource: Option[AppStatusSource] = None,
4749
lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {
4850

4951
import config._
@@ -280,6 +282,11 @@ private[spark] class AppStatusListener(
280282
private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = {
281283
liveExecutors.get(execId).foreach { exec =>
282284
exec.isBlacklisted = blacklisted
285+
if (blacklisted) {
286+
appStatusSource.foreach(_.BLACKLISTED_EXECUTORS.inc())
287+
} else {
288+
appStatusSource.foreach(_.UNBLACKLISTED_EXECUTORS.inc())
289+
}
283290
liveUpdate(exec, System.nanoTime())
284291
}
285292
}
@@ -382,11 +389,34 @@ private[spark] class AppStatusListener(
382389
}
383390

384391
job.status = event.jobResult match {
385-
case JobSucceeded => JobExecutionStatus.SUCCEEDED
386-
case JobFailed(_) => JobExecutionStatus.FAILED
392+
case JobSucceeded =>
393+
appStatusSource.foreach{_.SUCCEEDED_JOBS.inc()}
394+
JobExecutionStatus.SUCCEEDED
395+
case JobFailed(_) =>
396+
appStatusSource.foreach{_.FAILED_JOBS.inc()}
397+
JobExecutionStatus.FAILED
387398
}
388399

389400
job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None
401+
402+
for {
403+
source <- appStatusSource
404+
submissionTime <- job.submissionTime
405+
completionTime <- job.completionTime
406+
} {
407+
source.JOB_DURATION.value.set(completionTime.getTime() - submissionTime.getTime())
408+
}
409+
410+
// update global app status counters
411+
appStatusSource.foreach { source =>
412+
source.COMPLETED_STAGES.inc(job.completedStages.size)
413+
source.FAILED_STAGES.inc(job.failedStages)
414+
source.COMPLETED_TASKS.inc(job.completedTasks)
415+
source.FAILED_TASKS.inc(job.failedTasks)
416+
source.KILLED_TASKS.inc(job.killedTasks)
417+
source.SKIPPED_TASKS.inc(job.skippedTasks)
418+
source.SKIPPED_STAGES.inc(job.skippedStages.size)
419+
}
390420
update(job, now, last = true)
391421
}
392422

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.status
18+
19+
import java.util.concurrent.atomic.AtomicLong
20+
21+
import AppStatusSource.getCounter
22+
import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
23+
24+
import org.apache.spark.SparkConf
25+
import org.apache.spark.internal.config.ConfigBuilder
26+
import org.apache.spark.metrics.source.Source
27+
28+
private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] {
29+
override def getValue: Long = value.get()
30+
}
31+
32+
private[spark] class AppStatusSource extends Source {
33+
34+
override implicit val metricRegistry = new MetricRegistry()
35+
36+
override val sourceName = "appStatus"
37+
38+
val jobDuration = new JobDuration(new AtomicLong(0L))
39+
40+
// Duration of each job in milliseconds
41+
val JOB_DURATION = metricRegistry
42+
.register(MetricRegistry.name("jobDuration"), jobDuration)
43+
44+
val FAILED_STAGES = getCounter("stages", "failedStages")
45+
46+
val SKIPPED_STAGES = getCounter("stages", "skippedStages")
47+
48+
val COMPLETED_STAGES = getCounter("stages", "completedStages")
49+
50+
val SUCCEEDED_JOBS = getCounter("jobs", "succeededJobs")
51+
52+
val FAILED_JOBS = getCounter("jobs", "failedJobs")
53+
54+
val COMPLETED_TASKS = getCounter("tasks", "completedTasks")
55+
56+
val FAILED_TASKS = getCounter("tasks", "failedTasks")
57+
58+
val KILLED_TASKS = getCounter("tasks", "killedTasks")
59+
60+
val SKIPPED_TASKS = getCounter("tasks", "skippedTasks")
61+
62+
val BLACKLISTED_EXECUTORS = getCounter("tasks", "blackListedExecutors")
63+
64+
val UNBLACKLISTED_EXECUTORS = getCounter("tasks", "unblackListedExecutors")
65+
}
66+
67+
private[spark] object AppStatusSource {
68+
69+
def getCounter(prefix: String, name: String)(implicit metricRegistry: MetricRegistry): Counter = {
70+
metricRegistry.counter (MetricRegistry.name (prefix, name) )
71+
}
72+
73+
def createSource(conf: SparkConf): Option[AppStatusSource] = {
74+
Option(conf.get(AppStatusSource.APP_STATUS_METRICS_ENABLED))
75+
.filter(identity)
76+
.map {_ => new AppStatusSource()}
77+
}
78+
79+
val APP_STATUS_METRICS_ENABLED =
80+
ConfigBuilder("spark.app.status.metrics.enabled")
81+
.doc("Whether Dropwizard/Codahale metrics " +
82+
"will be reported for the status of the running spark app.")
83+
.booleanConf
84+
.createWithDefault(false)
85+
}

core/src/main/scala/org/apache/spark/status/AppStatusStore.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -503,10 +503,11 @@ private[spark] object AppStatusStore {
503503
/**
504504
* Create an in-memory store for a live application.
505505
*/
506-
def createLiveStore(conf: SparkConf): AppStatusStore = {
506+
def createLiveStore(
507+
conf: SparkConf,
508+
appStatusSource: Option[AppStatusSource] = None): AppStatusStore = {
507509
val store = new ElementTrackingStore(new InMemoryStore(), conf)
508-
val listener = new AppStatusListener(store, conf, true)
510+
val listener = new AppStatusListener(store, conf, true, appStatusSource)
509511
new AppStatusStore(store, listener = Some(listener))
510512
}
511-
512513
}

core/src/main/scala/org/apache/spark/status/LiveEntity.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private[spark] abstract class LiveEntity {
6262
private class LiveJob(
6363
val jobId: Int,
6464
name: String,
65-
submissionTime: Option[Date],
65+
val submissionTime: Option[Date],
6666
val stageIds: Seq[Int],
6767
jobGroup: Option[String],
6868
numTasks: Int) extends LiveEntity {

0 commit comments

Comments
 (0)