-
Notifications
You must be signed in to change notification settings - Fork 385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add JobLogger to Spark #573
Conversation
I'm the Jenkins test bot for the UC, Berkeley AMPLab. I've noticed your pull request and will test it once an admin authorizes me to. Thanks for your submission! |
I will take a look at this today. Jenkins: this is ok to test |
Thanks, Mingfei. Below are some high level comments:
I have some detailed comments on the code that I will also post inline. |
import scala.collection.mutable.HashMap | ||
import scala.collection.mutable.ListBuffer | ||
import spark.scheduler.Stage | ||
import scala.io.Source |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For imports, sort them in the following order:
- java packages
- scala packages
- everything else in alphabetical order.
Add a blank line for classes in different domain.
Do this for other files - Spark code didn't strictly follow this but we are enforcing that now ...
I am done with my detailed comments. Overall this looks like a great first step - we really appreciate you doing this! There is one more comment about the SparkListener. Right now, DAGScheduler accepts a list of SparkListener's, and use a for loop to invoke the listeners. I think that can become expensive in large clusters. It would be great to have the DAGScheduler only accept one SparkListener, and if multiple SparkListener's are needed, we can create a composed SparkListener implementation that simply delegates to multiple SparkListener's. Again, thanks a lot for doing this. |
I'm the Jenkins test bot for the UC, Berkeley AMPLab. I've noticed your pull request and will test it once an admin authorizes me to. Thanks for your submission! |
case None => | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid pattern matching on Option -- especially when you are doing nothing with the None case;
jobIDToStageIDs.get(jobID).map(_.foreach(stageid => stageIDToJobID -= stageid))
@rxin 3. JobLogger itself should also expose the information it collects so we can build a web UI for the JobLogger. That's a bit trickier. Not only do you need to construct one or more data structures to which you must add the appropriate information when tasks/stages/jobs start/complete/fail/resubmit, but you also need to figure out when that information is no longer needed so that you can remove it from the appropriate data structure, avoiding that structure's unbounded growth. I'm working on a substantial portion of that right now. |
Reynold and markhamstra, |
} | ||
|
||
//write log information to log file by stageID, withTime parameter controls whether to recored time stamp for the information | ||
private def stageLogInfo(stageID: Int, info: String, withTime: Boolean) = stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is well over 100 characters
Hi Mingfei, Thanks for updating this - the new approach is exactly what Reynold and I were suggesting. I've made several comments about style for your latest patch. We adhere to the Scala style guide with a few changes. Lines must be at most 100 characters. If you could review this patch (not only where I pointed out nits, but the entire thing) for style, that would be great. Here is our style guide: From a code perspective, there are a few remaining things:
Thanks again for taking time to update this.
|
Hi Patrick Thanks |
Hi Patrick I have modified code according to your comments and added test case for joblogger Thanks |
logInfo("Starting job: " + callSite) | ||
val start = System.nanoTime | ||
val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler) | ||
val result = dagScheduler.runJob(rdd, func, partitions, callSite + "|" + addInfo.value.toString, allowLocal, resultHandler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is over 100 characters
Hi Mighfei, I added a handful of comments on this latest submission.
Thanks, |
Hi Patrick I have modified the code and add unit test according to your suggestion, Thanks |
@@ -65,6 +65,9 @@ class SparkContext( | |||
// Ensure logging is initialized before we spawn any threads | |||
initLogging() | |||
|
|||
// Allows higher layer frameworks to describe the context of a job | |||
val annotation = new DynamicVariable[String]("") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rather than having this be a possibly blank string. How about:
new DynamicVariableOption[String]
Hi Mingfei, A few notes about the annotation field - it would be good to not use string parsing as a way to pass additional arguments. Instead, you can use an optional value. Also, I notice that now you've changed the default format for the callsite info - it would be good to actually leave that unchanged. Overall, this is looking in good shape. Once you fix these I'll ask Matei or Reynold to do a pass. Jenkins, test this please |
Hi Patric, |
Okay I think we can close this now. I'm going to take a look at the new PR. |
Small bug fix to make sure the "spark contents" are copied to the deployment directory correctly. Author: Rahul Singhal <[email protected]> Closes mesos#573 from rahulsinghaliitd/SPARK-1651 and squashes the following commits: 402c999 [Rahul Singhal] SPARK-1651: Delete existing deployment directory
add a new class named JobLogger
1.each SparkContext has one JobLogger, one folder is created for every JobLogger
2.JobLogger manages all history files of activeJobs running in that SparkContext,the file name is jobID
job history include:
1.additional information from outside
For example: queryplan from Shark
2.RDD Graph for each Job
The RDD graph is printed using a top-down approach
3.stage information and tasks' start/stop & shuffle information
gotten from TaskMetrics and DAGScheduler