Skip to content

Commit 8d97923

Browse files
Sundeep NarravulaSundeep Narravula
authored andcommitted
Ability to kill jobs thru the UI.
This behavior can be turned on be settings the following variable: spark.ui.killEnabled=true (default=false) Adding DAGScheduler event StageCancelled and corresponding handlers. Added cancellation reason to handlers. Author: Sundeep Narravula <[email protected]>
1 parent b9e0c93 commit 8d97923

File tree

9 files changed

+74
-13
lines changed

9 files changed

+74
-13
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1076,6 +1076,16 @@ class SparkContext(
10761076
dagScheduler.cancelAllJobs()
10771077
}
10781078

1079+
/** Cancel a given job if it's scheduled or running */
1080+
def cancelJob(jobId: Int) {
1081+
dagScheduler.cancelJob(jobId)
1082+
}
1083+
1084+
/** Cancel a given stage and all jobs associated with it */
1085+
def cancelStage(stageId: Int) {
1086+
dagScheduler.cancelStage(stageId)
1087+
}
1088+
10791089
/**
10801090
* Clean a closure to make it ready to serialized and send to tasks
10811091
* (removes unreferenced variables in $outer's, updates REPL variables)

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ class DAGScheduler(
494494
/**
495495
* Cancel a job that is running or waiting in the queue.
496496
*/
497-
def cancelJob(jobId: Int) {
497+
private[spark] def cancelJob(jobId: Int) {
498498
logInfo("Asked to cancel job " + jobId)
499499
eventProcessActor ! JobCancelled(jobId)
500500
}
@@ -511,6 +511,13 @@ class DAGScheduler(
511511
eventProcessActor ! AllJobsCancelled
512512
}
513513

514+
/**
515+
* Cancel all jobs associated with a running or scheduled stage.
516+
*/
517+
def cancelStage(stageId: Int) {
518+
eventProcessActor ! StageCancelled(stageId)
519+
}
520+
514521
/**
515522
* Process one event retrieved from the event processing actor.
516523
*
@@ -551,6 +558,9 @@ class DAGScheduler(
551558
submitStage(finalStage)
552559
}
553560

561+
case StageCancelled(stageId) =>
562+
handleStageCancellation(stageId)
563+
554564
case JobCancelled(jobId) =>
555565
handleJobCancellation(jobId)
556566

@@ -560,11 +570,11 @@ class DAGScheduler(
560570
val activeInGroup = activeJobs.filter(activeJob =>
561571
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
562572
val jobIds = activeInGroup.map(_.jobId)
563-
jobIds.foreach(handleJobCancellation)
573+
jobIds.foreach(jobId => handleJobCancellation(jobId, "as part of cancelled job group %s".format(groupId)))
564574

565575
case AllJobsCancelled =>
566576
// Cancel all running jobs.
567-
runningStages.map(_.jobId).foreach(handleJobCancellation)
577+
runningStages.map(_.jobId).foreach(jobId => handleJobCancellation(jobId, "as part of cancellation of all jobs"))
568578
activeJobs.clear() // These should already be empty by this point,
569579
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
570580

@@ -991,11 +1001,23 @@ class DAGScheduler(
9911001
}
9921002
}
9931003

994-
private def handleJobCancellation(jobId: Int) {
1004+
private def handleStageCancellation(stageId: Int) {
1005+
if (stageIdToJobIds.contains(stageId)) {
1006+
val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray.sorted
1007+
jobsThatUseStage.foreach(jobId => {
1008+
handleJobCancellation(jobId, "because Stage %s was cancelled".format(stageId))
1009+
})
1010+
} else {
1011+
logInfo("No active jobs to kill for Stage " + stageId)
1012+
}
1013+
}
1014+
1015+
private def handleJobCancellation(jobId: Int, reason: String = "") {
9951016
if (!jobIdToStageIds.contains(jobId)) {
9961017
logDebug("Trying to cancel unregistered job " + jobId)
9971018
} else {
998-
failJobAndIndependentStages(jobIdToActiveJob(jobId), s"Job $jobId cancelled", None)
1019+
failJobAndIndependentStages(jobIdToActiveJob(jobId),
1020+
"Job %d cancelled %s".format(jobId, reason), None)
9991021
}
10001022
}
10011023

core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ private[scheduler] case class JobSubmitted(
4444
properties: Properties = null)
4545
extends DAGSchedulerEvent
4646

47+
private[scheduler] case class StageCancelled(stageId: Int) extends DAGSchedulerEvent
48+
4749
private[scheduler] case class JobCancelled(jobId: Int) extends DAGSchedulerEvent
4850

4951
private[scheduler] case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent

core/src/main/scala/org/apache/spark/ui/SparkUI.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ private[spark] class SparkUI(
4646
val live = sc != null
4747

4848
val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf)
49+
val killEnabled = conf.get("spark.ui.killEnabled", "false").toBoolean
4950

5051
private val bindHost = Utils.localHostName()
5152
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)

core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ private[ui] class IndexPage(parent: JobProgressUI) {
4141
val failedStages = listener.failedStages.reverse.toSeq
4242
val now = System.currentTimeMillis()
4343

44-
val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
44+
val activeStagesTable =
45+
new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled)
4546
val completedStagesTable =
4647
new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
4748
val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
6161
val stageIdToPool = HashMap[Int, String]()
6262
val stageIdToDescription = HashMap[Int, String]()
6363
val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
64+
val jobIdToStageIds = HashMap[Int, Seq[Int]]()
6465

6566
val executorIdToBlockManagerId = HashMap[String, BlockManagerId]()
6667

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ private[ui] class JobProgressUI(parent: SparkUI) {
3333
val basePath = parent.basePath
3434
val live = parent.live
3535
val sc = parent.sc
36+
val killEnabled = parent.killEnabled
3637

3738
lazy val listener = _listener.get
3839
lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
@@ -56,5 +57,5 @@ private[ui] class JobProgressUI(parent: SparkUI) {
5657
(request: HttpServletRequest) => poolPage.render(request), parent.securityManager, basePath),
5758
createServletHandler("/stages",
5859
(request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath)
59-
)
60+
)
6061
}

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,21 @@ private[ui] class StagePage(parent: JobProgressUI) {
3131
private val appName = parent.appName
3232
private val basePath = parent.basePath
3333
private lazy val listener = parent.listener
34+
private lazy val sc = parent.sc
35+
private val killEnabled = parent.killEnabled
3436

3537
def render(request: HttpServletRequest): Seq[Node] = {
3638
listener.synchronized {
3739
val stageId = request.getParameter("id").toInt
3840

41+
if (killEnabled) {
42+
val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
43+
44+
if (killFlag && listener.activeStages.contains(stageId)) {
45+
sc.cancelStage(stageId)
46+
}
47+
}
48+
3949
if (!listener.stageIdToTaskData.contains(stageId)) {
4050
val content =
4151
<div>

core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.ui.{WebUI, UIUtils}
2727
import org.apache.spark.util.Utils
2828

2929
/** Page showing list of all ongoing and recently finished stages */
30-
private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
30+
private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI, killEnabled: Boolean = false) {
3131
private val basePath = parent.basePath
3232
private lazy val listener = parent.listener
3333
private lazy val isFairScheduler = parent.isFairScheduler
@@ -71,15 +71,28 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
7171
</div>
7272
}
7373

74-
/** Render an HTML row that represents a stage */
75-
private def stageRow(s: StageInfo): Seq[Node] = {
76-
val poolName = listener.stageIdToPool.get(s.stageId)
74+
private def makeDescription(s: StageInfo): Seq[Node] = {
7775
val nameLink =
7876
<a href={"%s/stages/stage?id=%s".format(UIUtils.prependBaseUri(basePath), s.stageId)}>
7977
{s.name}
8078
</a>
79+
val killButton = if (killEnabled) {
80+
<form action={"%s/stages/stage/".format(UIUtils.prependBaseUri(basePath))}>
81+
<input type="hidden" value={"true"} name="terminate" />
82+
<input type="hidden" value={"" + s.stageId} name="id" />
83+
<input type="submit" value="Terminate Job"/>
84+
</form>
85+
}
8186
val description = listener.stageIdToDescription.get(s.stageId)
82-
.map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
87+
.map(d => <div><em>{d}</em></div><div>{nameLink} {killButton}</div>)
88+
.getOrElse(<div>{nameLink} {killButton}</div>)
89+
90+
return description
91+
}
92+
93+
/** Render an HTML row that represents a stage */
94+
private def stageRow(s: StageInfo): Seq[Node] = {
95+
val poolName = listener.stageIdToPool.get(s.stageId)
8396
val submissionTime = s.submissionTime match {
8497
case Some(t) => WebUI.formatDate(new Date(t))
8598
case None => "Unknown"
@@ -118,7 +131,7 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
118131
</a>
119132
</td>
120133
}}
121-
<td>{description}</td>
134+
<td>{makeDescription(s)}</td>
122135
<td valign="middle">{submissionTime}</td>
123136
<td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
124137
<td class="progress-cell">

0 commit comments

Comments
 (0)