Skip to content

Commit

Permalink
SPARK-1202 - Add a "cancel" button in the UI for stages
Browse files Browse the repository at this point in the history
Author: Sundeep Narravula <[email protected]>
Author: Sundeep Narravula <[email protected]>

Closes apache#246 from sundeepn/uikilljob and squashes the following commits:

5fdd0e2 [Sundeep Narravula] Fix test string
f6fdff1 [Sundeep Narravula] Format fix; reduced line size to less than 100 chars
d1daeb9 [Sundeep Narravula] Incorporating review comments.
8d97923 [Sundeep Narravula] Ability to kill jobs thru the UI. This behavior can be turned on be settings the following variable: spark.ui.killEnabled=true (default=false) Adding DAGScheduler event StageCancelled and corresponding handlers. Added cancellation reason to handlers.
  • Loading branch information
Sundeep Narravula authored and pdeyhim committed Jun 25, 2014
1 parent 2dab922 commit bc387ff
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 12 deletions.
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,16 @@ class SparkContext(config: SparkConf) extends Logging {
dagScheduler.cancelAllJobs()
}

/** Cancel a given job if it's scheduled or running */
private[spark] def cancelJob(jobId: Int) {
dagScheduler.cancelJob(jobId)
}

/** Cancel a given stage and all jobs associated with it */
private[spark] def cancelStage(stageId: Int) {
dagScheduler.cancelStage(stageId)
}

/**
* Clean a closure to make it ready to serialized and send to tasks
* (removes unreferenced variables in $outer's, updates REPL variables)
Expand Down
32 changes: 28 additions & 4 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,13 @@ class DAGScheduler(
eventProcessActor ! AllJobsCancelled
}

/**
* Cancel all jobs associated with a running or scheduled stage.
*/
def cancelStage(stageId: Int) {
eventProcessActor ! StageCancelled(stageId)
}

/**
* Process one event retrieved from the event processing actor.
*
Expand Down Expand Up @@ -551,6 +558,9 @@ class DAGScheduler(
submitStage(finalStage)
}

case StageCancelled(stageId) =>
handleStageCancellation(stageId)

case JobCancelled(jobId) =>
handleJobCancellation(jobId)

Expand All @@ -560,11 +570,13 @@ class DAGScheduler(
val activeInGroup = activeJobs.filter(activeJob =>
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach(handleJobCancellation)
jobIds.foreach(jobId => handleJobCancellation(jobId,
"as part of cancelled job group %s".format(groupId)))

case AllJobsCancelled =>
// Cancel all running jobs.
runningStages.map(_.jobId).foreach(handleJobCancellation)
runningStages.map(_.jobId).foreach(jobId => handleJobCancellation(jobId,
"as part of cancellation of all jobs"))
activeJobs.clear() // These should already be empty by this point,
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...

Expand Down Expand Up @@ -991,11 +1003,23 @@ class DAGScheduler(
}
}

private def handleJobCancellation(jobId: Int) {
private def handleStageCancellation(stageId: Int) {
if (stageIdToJobIds.contains(stageId)) {
val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray
jobsThatUseStage.foreach(jobId => {
handleJobCancellation(jobId, "because Stage %s was cancelled".format(stageId))
})
} else {
logInfo("No active jobs to kill for Stage " + stageId)
}
}

private def handleJobCancellation(jobId: Int, reason: String = "") {
if (!jobIdToStageIds.contains(jobId)) {
logDebug("Trying to cancel unregistered job " + jobId)
} else {
failJobAndIndependentStages(jobIdToActiveJob(jobId), s"Job $jobId cancelled", None)
failJobAndIndependentStages(jobIdToActiveJob(jobId),
"Job %d cancelled %s".format(jobId, reason), None)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ private[scheduler] case class JobSubmitted(
properties: Properties = null)
extends DAGSchedulerEvent

private[scheduler] case class StageCancelled(stageId: Int) extends DAGSchedulerEvent

private[scheduler] case class JobCancelled(jobId: Int) extends DAGSchedulerEvent

private[scheduler] case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ private[spark] class SparkUI(
val live = sc != null

val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf)
val killEnabled = conf.getBoolean("spark.ui.killEnabled", true)

private val localHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ private[ui] class IndexPage(parent: JobProgressUI) {
private val sc = parent.sc
private lazy val listener = parent.listener
private lazy val isFairScheduler = parent.isFairScheduler
private val killEnabled = parent.killEnabled

private def appName = parent.appName

Expand All @@ -42,7 +43,18 @@ private[ui] class IndexPage(parent: JobProgressUI) {
val failedStages = listener.failedStages.reverse.toSeq
val now = System.currentTimeMillis()

val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
if (killEnabled) {
val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt

if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
sc.cancelStage(stageId)
}
}


val activeStagesTable =
new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled)
val completedStagesTable =
new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ private[ui] class JobProgressUI(parent: SparkUI) {
val basePath = parent.basePath
val live = parent.live
val sc = parent.sc
val killEnabled = parent.killEnabled

lazy val listener = _listener.get
lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.util.{Utils, Distribution}
private[ui] class StagePage(parent: JobProgressUI) {
private val basePath = parent.basePath
private lazy val listener = parent.listener
private lazy val sc = parent.sc

private def appName = parent.appName

Expand Down
29 changes: 23 additions & 6 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ import org.apache.spark.ui.{WebUI, UIUtils}
import org.apache.spark.util.Utils

/** Page showing list of all ongoing and recently finished stages */
private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
private[ui] class StageTable(
stages: Seq[StageInfo],
parent: JobProgressUI,
killEnabled: Boolean = false) {

private val basePath = parent.basePath
private lazy val listener = parent.listener
private lazy val isFairScheduler = parent.isFairScheduler
Expand Down Expand Up @@ -71,15 +75,28 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
</div>
}

/** Render an HTML row that represents a stage */
private def stageRow(s: StageInfo): Seq[Node] = {
val poolName = listener.stageIdToPool.get(s.stageId)
private def makeDescription(s: StageInfo): Seq[Node] = {
val nameLink =
<a href={"%s/stages/stage?id=%s".format(UIUtils.prependBaseUri(basePath), s.stageId)}>
{s.name}
</a>
val killLink = if (killEnabled) {
<div>[<a href=
{"%s/stages?id=%s&terminate=true".format(UIUtils.prependBaseUri(basePath), s.stageId)}>
Kill
</a>]</div>

}
val description = listener.stageIdToDescription.get(s.stageId)
.map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
.map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)
.getOrElse(<div>{nameLink} {killLink}</div>)

return description
}

/** Render an HTML row that represents a stage */
private def stageRow(s: StageInfo): Seq[Node] = {
val poolName = listener.stageIdToPool.get(s.stageId)
val submissionTime = s.submissionTime match {
case Some(t) => WebUI.formatDate(new Date(t))
case None => "Unknown"
Expand Down Expand Up @@ -118,7 +135,7 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
</a>
</td>
}}
<td>{description}</td>
<td>{makeDescription(s)}</td>
<td valign="middle">{submissionTime}</td>
<td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
<td class="progress-cell">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
val rdd = makeRdd(1, Nil)
val jobId = submit(rdd, Array(0))
cancel(jobId)
assert(failure.getMessage === s"Job $jobId cancelled")
assert(failure.getMessage === s"Job $jobId cancelled ")
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)
Expand Down
7 changes: 7 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ Apart from these, the following properties are also available, and may be useful
user that started the Spark job has view access.
</td>
</tr>
<tr>
<td>spark.ui.killEnabled</td>
<td>true</td>
<td>
Allows stages and corresponding jobs to be killed from the web ui.
</td>
</tr>
<tr>
<td>spark.shuffle.compress</td>
<td>true</td>
Expand Down

0 comments on commit bc387ff

Please sign in to comment.