-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1202 - Add a "cancel" button in the UI for stages #246
Conversation
Build triggered. |
Build started. |
Build finished. |
One or more automated tests failed |
@@ -1028,6 +1028,11 @@ class SparkContext( | |||
dagScheduler.cancelAllJobs() | |||
} | |||
|
|||
/** Cancel a given job if its scheduled or running*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Space after "running"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as we're being nitpicky... should be "it's"
Hi @sundeepn. Thanks for doing this. I left a few relatively minor comments. I haven't looked too closely into it but could you explain on a high level, what pages link to the KillPage and vice versa? If I understand correctly, each KillPage is associated with a stage ID, which seems a little strange when the "unit of killing" here is the job. Is there a reason why you created a new page for killing jobs, as opposed to embedding the functionality in existing pages (e.g. the StagePage)? |
Currently, only the IndexPage links to the killPage. The KillPage just has the uniform header on top so it links back to the other pages. The unit of killing is a job by requirement. However, the initiation of kill action is through any stage contained in the job. The KillPage is actually a Job level message page, even though we arrive there by a stageId. Maybe I should change the title to "Job x containing stage y killed" ? |
Can one of the admins verify this patch? |
@andrewor14 / @kayousterhout. Thanks for the comments. |
Build triggered. Build is starting -or- tests failed to complete. |
Build started. Build is starting -or- tests failed to complete. |
Build finished. Build is starting -or- tests failed to complete. |
Build is starting -or- tests failed to complete. |
It looks like github is just moving slowly today...the commit just got pulled in. I took another look at this and have a question: what happens for stages that are used for multiple jobs? Right now, stageIdToJobId in the UI code you added just maps a stage to a single job id. So, if stage0 is used by JobA and jobB, the ui code only stores one of these jobs, and then cancelJob() will only be called for one of the jobs. cancelJob() ultimately calls DAGScheduler.handleJobCancellation(), which only cancels the stages that are independent to the job. So, because stage0 is not independent to either of the jobs, it won't get cancelled. Did I misunderstand this? |
// Extract Job ID and double check if we have the details | ||
val jobId = Option(stageSubmitted.properties).flatMap { | ||
p => Option(p.getProperty("spark.job.id")) | ||
}.getOrElse("-1").toInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we chatted about this I remember you saying that this code is to handle the case where the stage runs locally at the driver...but from glancing at the DAGScheduler code, it looks like onStageSubmitted() never gets called for the locally-run tasks, and they never show up at the UI.
When do you need this code / when will the jobIdToStageIds mapping not already be set up correctly by OnJobStart?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, this is only to ensure we can handle things if we get any scenarios where the onJobStart does not arrive before stageSubmitted. I am not familiar with the scheduling code sufficiently to rule that out. If you are sure, I can take this out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah cool -- I looked at the ordering of the JobStart and StageSubmitted events more closely and I think you can safely remove this.
I misunderstood our conversation on job to stage mapping the other day. As you see, the code will currently not handle multiple job mappings. Is there a simple example I can use to generate such a scenario? |
I'm not sure how to generate an example of this...I think it can happen with Shark but maybe @pwendell or @markhamstra can comment here? I wonder if maybe a better way to handle this would be to add a cancelStage() method to the DAGScheduler, which already has the mappings of stages to jobs and vice versa. That way the UI wouldn't have to duplicate this information. |
You can look at Matei's comments from way back: mesos/spark#414 |
@kayousterhout I have thought about adding cancelStage to SparkContext/DAGScheduler. My earlier take was that the Job level Info is useful in the UI as well. Currently, its not shown/used in the UI, but lots of our users get confused looking at the stage level UI showing their query multiple times. It will be good to expose the Job ID in the stage table as well. Having said that, I do not see any other way of doing this. We will have to move this to the DAGScheduler as a cancelStage. If two jobs can share stages, then any handling from the UI side can have a race condition in the cancel work flow and it will be a lot cleaner to handle upstream. I will submit a revision shortly. @andrewor14, What do you think about adding a jobId column to the Stage table? Thanks Mark for the pointer. |
+1 on job-level UI, including job-level progress indication. |
Yes, right now the landing page of the UI is the stage page, which is a little arbitrary. I think it makes sense to have an overview page, that displays the number of running executors, master URL, total duration, event log location and other more general info. (@tdas and I discussed this a bunch in designing for a brother UI for Spark Streaming.) On this overview page, maybe we can also add a Job summary table, in which each row links to a job-specific stages page (what we already have). Then, in the context of canceling jobs, we can have the |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14024/ |
Jenkins, retest this please. |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
Hey I'm gonna go ahead and merge this - I'll have a follow-on patch with some changes... felt that was easier than doing another round-trip on the review. |
Merged into master and 1.0. |
Author: Sundeep Narravula <[email protected]> Author: Sundeep Narravula <[email protected]> Closes #246 from sundeepn/uikilljob and squashes the following commits: 5fdd0e2 [Sundeep Narravula] Fix test string f6fdff1 [Sundeep Narravula] Format fix; reduced line size to less than 100 chars d1daeb9 [Sundeep Narravula] Incorporating review comments. 8d97923 [Sundeep Narravula] Ability to kill jobs thru the UI. This behavior can be turned on be settings the following variable: spark.ui.killEnabled=true (default=false) Adding DAGScheduler event StageCancelled and corresponding handlers. Added cancellation reason to handlers. (cherry picked from commit 2c55783) Signed-off-by: Patrick Wendell <[email protected]>
Add missing license headers I found this when doing further audits on the 0.8.1 release candidate. (cherry picked from commit 6169fe1) Signed-off-by: Patrick Wendell <[email protected]>
Previously, when jobs were cancelled, not all of the state in the DAGScheduler was cleaned up, leading to a slow memory leak in the DAGScheduler. As we expose easier ways to cancel jobs, it's more important to fix these issues. This commit also fixes a second and less serious problem, which is that previously, when a stage failed, not all of the appropriate stages were cancelled. See the "failure of stage used by two jobs" test for an example of this. This just meant that extra work was done, and is not a correctness problem. This commit adds 3 tests. “run shuffle with map stage failure” is a new test to more thoroughly test this functionality, and passes on both the old and new versions of the code. “trivial job cancellation” fails on the old code because all state wasn’t cleaned up correctly when jobs were cancelled (we didn’t remove the job from resultStageToJob). “failure of stage used by two jobs” fails on the old code because taskScheduler.cancelTasks wasn’t called for one of the stages (see test comments). This should be checked in before apache#246, which makes it easier to cancel stages / jobs. Author: Kay Ousterhout <[email protected]> Closes apache#305 from kayousterhout/incremental_abort_fix and squashes the following commits: f33d844 [Kay Ousterhout] Mark review comments 9217080 [Kay Ousterhout] Properly cleanup DAGScheduler on job cancellation.
Author: Sundeep Narravula <[email protected]> Author: Sundeep Narravula <[email protected]> Closes apache#246 from sundeepn/uikilljob and squashes the following commits: 5fdd0e2 [Sundeep Narravula] Fix test string f6fdff1 [Sundeep Narravula] Format fix; reduced line size to less than 100 chars d1daeb9 [Sundeep Narravula] Incorporating review comments. 8d97923 [Sundeep Narravula] Ability to kill jobs thru the UI. This behavior can be turned on be settings the following variable: spark.ui.killEnabled=true (default=false) Adding DAGScheduler event StageCancelled and corresponding handlers. Added cancellation reason to handlers.
Fixes combineByKey
This PR pulls in recent changes in SparkR-pkg, including cartesian, intersection, sampleByKey, subtract, subtractByKey, except, and some API for StructType and StructField. Author: cafreeman <[email protected]> Author: Davies Liu <[email protected]> Author: Zongheng Yang <[email protected]> Author: Shivaram Venkataraman <[email protected]> Author: Shivaram Venkataraman <[email protected]> Author: Sun Rui <[email protected]> Closes #5436 from davies/R3 and squashes the following commits: c2b09be [Davies Liu] SQLTypes -> schema a5a02f2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R3 168b7fe [Davies Liu] sort generics b1fe460 [Davies Liu] fix conflict in README.md e74c04e [Davies Liu] fix schema.R 4f5ac09 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R5 41f8184 [Davies Liu] rm man ae78312 [Davies Liu] Merge pull request #237 from sun-rui/SPARKR-154_3 1bdcb63 [Zongheng Yang] Updates to README.md. 5a553e7 [cafreeman] Use object attribute instead of argument 71372d9 [cafreeman] Update docs and examples 8526d2e [cafreeman] Remove `tojson` functions 6ef5f2d [cafreeman] Fix spacing 7741d66 [cafreeman] Rename the SQL DataType function 141efd8 [Shivaram Venkataraman] Merge pull request #245 from hqzizania/upstream 9387402 [Davies Liu] fix style 40199eb [Shivaram Venkataraman] Move except into sorted position 07d0dbc [Sun Rui] [SPARKR-244] Fix test failure after integration of subtract() and subtractByKey() for RDD. 7e8caa3 [Shivaram Venkataraman] Merge pull request #246 from hlin09/fixCombineByKey ed66c81 [cafreeman] Update `subtract` to work with `generics.R` f3ba785 [cafreeman] Fixed duplicate export 275deb4 [cafreeman] Update `NAMESPACE` and tests 1a3b63d [cafreeman] new version of `CreateDF` 836c4bf [cafreeman] Update `createDataFrame` and `toDF` be5d5c1 [cafreeman] refactor schema functions 40338a4 [Zongheng Yang] Merge pull request #244 from sun-rui/SPARKR-154_5 20b97a6 [Zongheng Yang] Merge pull request #234 from hqzizania/assist ba54e34 [Shivaram Venkataraman] Merge pull request #238 from sun-rui/SPARKR-154_4 c9497a3 [Shivaram Venkataraman] Merge pull request #208 from lythesia/master b317aa7 [Zongheng Yang] Merge pull request #243 from hqzizania/master 136a07e [Zongheng Yang] Merge pull request #242 from hqzizania/stats cd66603 [cafreeman] new line at EOF 8b76e81 [Shivaram Venkataraman] Merge pull request #233 from redbaron/fail-early-on-missing-dep 7dd81b7 [cafreeman] Documentation 0e2a94f [cafreeman] Define functions for schema and fields
[NOSQUASH] Resync from apache-spark-on-k8s upstream
Add dockerhub to secrets list
### What changes were proposed in this pull request? Push down filter through expand. For case below: ``` create table t1(pid int, uid int, sid int, dt date, suid int) using parquet; create table t2(pid int, vs int, uid int, csid int) using parquet; SELECT years, appversion, SUM(uusers) AS users FROM (SELECT Date_trunc('year', dt) AS years, CASE WHEN h.pid = 3 THEN 'iOS' WHEN h.pid = 4 THEN 'Android' ELSE 'Other' END AS viewport, h.vs AS appversion, Count(DISTINCT u.uid) AS uusers ,Count(DISTINCT u.suid) AS srcusers FROM t1 u join t2 h ON h.uid = u.uid GROUP BY 1, 2, 3) AS a WHERE viewport = 'iOS' GROUP BY 1, 2 ``` Plan. before this pr: ``` == Physical Plan == *(5) HashAggregate(keys=[years#30, appversion#32], functions=[sum(uusers#33L)]) +- Exchange hashpartitioning(years#30, appversion#32, 200), true, [id=#251] +- *(4) HashAggregate(keys=[years#30, appversion#32], functions=[partial_sum(uusers#33L)]) +- *(4) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12], functions=[count(if ((gid#44 = 1)) u.`uid`#47 else null)]) +- Exchange hashpartitioning(date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, 200), true, [id=#246] +- *(3) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12], functions=[partial_count(if ((gid#44 = 1)) u.`uid`#47 else null)]) +- *(3) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44], functions=[]) +- Exchange hashpartitioning(date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44, 200), true, [id=#241] +- *(2) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44], functions=[]) +- *(2) Filter (CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46 = iOS) +- *(2) Expand [ArrayBuffer(date_trunc(year, cast(dt#9 as timestamp), Some(Etc/GMT+7)), CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END, vs#12, uid#7, null, 1), ArrayBuffer(date_trunc(year, cast(dt#9 as timestamp), Some(Etc/GMT+7)), CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END, vs#12, null, suid#10, 2)], [date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44] +- *(2) Project [uid#7, dt#9, suid#10, pid#11, vs#12] +- *(2) BroadcastHashJoin [uid#7], [uid#13], Inner, BuildRight :- *(2) Project [uid#7, dt#9, suid#10] : +- *(2) Filter isnotnull(uid#7) : +- *(2) ColumnarToRow : +- FileScan parquet default.t1[uid#7,dt#9,suid#10] Batched: true, DataFilters: [isnotnull(uid#7)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-bin-hadoop3.2/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,dt:date,suid:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, true] as bigint))), [id=#233] +- *(1) Project [pid#11, vs#12, uid#13] +- *(1) Filter isnotnull(uid#13) +- *(1) ColumnarToRow +- FileScan parquet default.t2[pid#11,vs#12,uid#13] Batched: true, DataFilters: [isnotnull(uid#13)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-bin-hadoop3.2/spark-warehouse/t2], PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<pid:int,vs:int,uid:int> ``` Plan. after. this pr. : ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[years#0, appversion#2], functions=[sum(uusers#3L)], output=[years#0, appversion#2, users#5L]) +- Exchange hashpartitioning(years#0, appversion#2, 5), true, [id=#71] +- HashAggregate(keys=[years#0, appversion#2], functions=[partial_sum(uusers#3L)], output=[years#0, appversion#2, sum#22L]) +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12], functions=[count(distinct uid#7)], output=[years#0, appversion#2, uusers#3L]) +- Exchange hashpartitioning(date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, 5), true, [id=#67] +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12], functions=[partial_count(distinct uid#7)], output=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, count#27L]) +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7], functions=[], output=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7]) +- Exchange hashpartitioning(date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7, 5), true, [id=#63] +- HashAggregate(keys=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles)) AS date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END AS CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7], functions=[], output=[date_trunc(year, cast(dt#9 as timestamp), Some(America/Los_Angeles))#23, CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END#24, vs#12, uid#7]) +- Project [uid#7, dt#9, pid#11, vs#12] +- BroadcastHashJoin [uid#7], [uid#13], Inner, BuildRight, false :- Filter isnotnull(uid#7) : +- FileScan parquet default.t1[uid#7,dt#9] Batched: true, DataFilters: [isnotnull(uid#7)], Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/4l/7_c5c97s1_gb0d9_d6shygx00000gn/T/warehouse-c069d87..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<uid:int,dt:date> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, false] as bigint)),false), [id=#58] +- Filter ((CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END = iOS) AND isnotnull(uid#13)) +- FileScan parquet default.t2[pid#11,vs#12,uid#13] Batched: true, DataFilters: [(CASE WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END = iOS), isnotnull..., Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/4l/7_c5c97s1_gb0d9_d6shygx00000gn/T/warehouse-c069d87..., PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema: struct<pid:int,vs:int,uid:int> ``` ### Why are the changes needed? Improve performance, filter more data. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT Closes #30278 from AngersZhuuuu/SPARK-33302. Authored-by: angerszhu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
No description provided.