Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1202 - Add a "cancel" button in the UI for stages #246

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,16 @@ class SparkContext(
dagScheduler.cancelAllJobs()
}

/** Cancel a given job if it's scheduled or running */
private[spark] def cancelJob(jobId: Int) {
dagScheduler.cancelJob(jobId)
}

/** Cancel a given stage and all jobs associated with it */
private[spark] def cancelStage(stageId: Int) {
dagScheduler.cancelStage(stageId)
}

/**
* Clean a closure to make it ready to serialized and send to tasks
* (removes unreferenced variables in $outer's, updates REPL variables)
Expand Down
32 changes: 28 additions & 4 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,13 @@ class DAGScheduler(
eventProcessActor ! AllJobsCancelled
}

/**
* Cancel all jobs associated with a running or scheduled stage.
*/
def cancelStage(stageId: Int) {
eventProcessActor ! StageCancelled(stageId)
}

/**
* Process one event retrieved from the event processing actor.
*
Expand Down Expand Up @@ -551,6 +558,9 @@ class DAGScheduler(
submitStage(finalStage)
}

case StageCancelled(stageId) =>
handleStageCancellation(stageId)

case JobCancelled(jobId) =>
handleJobCancellation(jobId)

Expand All @@ -560,11 +570,13 @@ class DAGScheduler(
val activeInGroup = activeJobs.filter(activeJob =>
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach(handleJobCancellation)
jobIds.foreach(jobId => handleJobCancellation(jobId,
"as part of cancelled job group %s".format(groupId)))

case AllJobsCancelled =>
// Cancel all running jobs.
runningStages.map(_.jobId).foreach(handleJobCancellation)
runningStages.map(_.jobId).foreach(jobId => handleJobCancellation(jobId,
"as part of cancellation of all jobs"))
activeJobs.clear() // These should already be empty by this point,
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...

Expand Down Expand Up @@ -991,11 +1003,23 @@ class DAGScheduler(
}
}

private def handleJobCancellation(jobId: Int) {
private def handleStageCancellation(stageId: Int) {
if (stageIdToJobIds.contains(stageId)) {
val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray
jobsThatUseStage.foreach(jobId => {
handleJobCancellation(jobId, "because Stage %s was cancelled".format(stageId))
})
} else {
logInfo("No active jobs to kill for Stage " + stageId)
}
}

private def handleJobCancellation(jobId: Int, reason: String = "") {
if (!jobIdToStageIds.contains(jobId)) {
logDebug("Trying to cancel unregistered job " + jobId)
} else {
failJobAndIndependentStages(jobIdToActiveJob(jobId), s"Job $jobId cancelled", None)
failJobAndIndependentStages(jobIdToActiveJob(jobId),
"Job %d cancelled %s".format(jobId, reason), None)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ private[scheduler] case class JobSubmitted(
properties: Properties = null)
extends DAGSchedulerEvent

private[scheduler] case class StageCancelled(stageId: Int) extends DAGSchedulerEvent

private[scheduler] case class JobCancelled(jobId: Int) extends DAGSchedulerEvent

private[scheduler] case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ private[spark] class SparkUI(
val live = sc != null

val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf)
val killEnabled = conf.getBoolean("spark.ui.killEnabled", true)

private val bindHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private[ui] class IndexPage(parent: JobProgressUI) {
private val sc = parent.sc
private lazy val listener = parent.listener
private lazy val isFairScheduler = parent.isFairScheduler
private val killEnabled = parent.killEnabled

def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
Expand All @@ -41,7 +42,18 @@ private[ui] class IndexPage(parent: JobProgressUI) {
val failedStages = listener.failedStages.reverse.toSeq
val now = System.currentTimeMillis()

val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
if (killEnabled) {
val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt

if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
sc.cancelStage(stageId)
}
}


val activeStagesTable =
new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled)
val completedStagesTable =
new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private[ui] class JobProgressUI(parent: SparkUI) {
val basePath = parent.basePath
val live = parent.live
val sc = parent.sc
val killEnabled = parent.killEnabled

lazy val listener = _listener.get
lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
private val appName = parent.appName
private val basePath = parent.basePath
private lazy val listener = parent.listener
private lazy val sc = parent.sc

def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
Expand Down
29 changes: 23 additions & 6 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ import org.apache.spark.ui.{WebUI, UIUtils}
import org.apache.spark.util.Utils

/** Page showing list of all ongoing and recently finished stages */
private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
private[ui] class StageTable(
stages: Seq[StageInfo],
parent: JobProgressUI,
killEnabled: Boolean = false) {

private val basePath = parent.basePath
private lazy val listener = parent.listener
private lazy val isFairScheduler = parent.isFairScheduler
Expand Down Expand Up @@ -71,15 +75,28 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
</div>
}

/** Render an HTML row that represents a stage */
private def stageRow(s: StageInfo): Seq[Node] = {
val poolName = listener.stageIdToPool.get(s.stageId)
private def makeDescription(s: StageInfo): Seq[Node] = {
val nameLink =
<a href={"%s/stages/stage?id=%s".format(UIUtils.prependBaseUri(basePath), s.stageId)}>
{s.name}
</a>
val killLink = if (killEnabled) {
<div>[<a href=
{"%s/stages?id=%s&terminate=true".format(UIUtils.prependBaseUri(basePath), s.stageId)}>
Kill
</a>]</div>

}
val description = listener.stageIdToDescription.get(s.stageId)
.map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
.map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)
.getOrElse(<div>{nameLink} {killLink}</div>)

return description
}

/** Render an HTML row that represents a stage */
private def stageRow(s: StageInfo): Seq[Node] = {
val poolName = listener.stageIdToPool.get(s.stageId)
val submissionTime = s.submissionTime match {
case Some(t) => WebUI.formatDate(new Date(t))
case None => "Unknown"
Expand Down Expand Up @@ -118,7 +135,7 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
</a>
</td>
}}
<td>{description}</td>
<td>{makeDescription(s)}</td>
<td valign="middle">{submissionTime}</td>
<td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
<td class="progress-cell">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
val rdd = makeRdd(1, Nil)
val jobId = submit(rdd, Array(0))
cancel(jobId)
assert(failure.getMessage === s"Job $jobId cancelled")
assert(failure.getMessage === s"Job $jobId cancelled ")
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)
Expand Down
7 changes: 7 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ Apart from these, the following properties are also available, and may be useful
user that started the Spark job has view access.
</td>
</tr>
<tr>
<td>spark.ui.killEnabled</td>
<td>true</td>
<td>
Allows stages and corresponding jobs to be killed from the web ui.
</td>
</tr>
<tr>
<td>spark.shuffle.compress</td>
<td>true</td>
Expand Down