Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add JobLogger to Spark #573

Closed
wants to merge 15 commits into from
Closed

add JobLogger to Spark #573

wants to merge 15 commits into from

Conversation

shimingfei
Copy link
Contributor

add a new class named JobLogger
1.each SparkContext has one JobLogger, one folder is created for every JobLogger
2.JobLogger manages all history files of activeJobs running in that SparkContext,the file name is jobID
job history include:
1.additional information from outside
For example: queryplan from Shark
2.RDD Graph for each Job
The RDD graph is printed using a top-down approach
3.stage information and tasks' start/stop & shuffle information
gotten from TaskMetrics and DAGScheduler

@AmplabJenkins
Copy link

I'm the Jenkins test bot for the UC, Berkeley AMPLab. I've noticed your pull request and will test it once an admin authorizes me to. Thanks for your submission!

@rxin
Copy link
Member

rxin commented Apr 18, 2013

I will take a look at this today.

Jenkins: this is ok to test

@rxin
Copy link
Member

rxin commented Apr 18, 2013

Thanks, Mingfei.

Below are some high level comments:

  1. Instead of adding JobLogger specific calls to DAGScheduler, why don't you extend SparkListener to add more callback APIs (for every recordTaskMetrics call in DAGScheduler, create a call back API in SparkListener)
  2. With 1, JobLogger becomes an implementation of the SparkListener
  3. JobLogger itself should also expose the information it collects so we can build a web UI for the JobLogger.

I have some detailed comments on the code that I will also post inline.

import scala.collection.mutable.HashMap
import scala.collection.mutable.ListBuffer
import spark.scheduler.Stage
import scala.io.Source
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For imports, sort them in the following order:

  1. java packages
  2. scala packages
  3. everything else in alphabetical order.

Add a blank line for classes in different domain.

Do this for other files - Spark code didn't strictly follow this but we are enforcing that now ...

@rxin
Copy link
Member

rxin commented Apr 18, 2013

I am done with my detailed comments.

Overall this looks like a great first step - we really appreciate you doing this!

There is one more comment about the SparkListener. Right now, DAGScheduler accepts a list of SparkListener's, and use a for loop to invoke the listeners. I think that can become expensive in large clusters. It would be great to have the DAGScheduler only accept one SparkListener, and if multiple SparkListener's are needed, we can create a composed SparkListener implementation that simply delegates to multiple SparkListener's.

Again, thanks a lot for doing this.

@AmplabJenkins
Copy link

I'm the Jenkins test bot for the UC, Berkeley AMPLab. I've noticed your pull request and will test it once an admin authorizes me to. Thanks for your submission!

case None =>
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid pattern matching on Option -- especially when you are doing nothing with the None case;

jobIDToStageIDs.get(jobID).map(_.foreach(stageid => stageIDToJobID -= stageid))

@markhamstra
Copy link
Contributor

@rxin 3. JobLogger itself should also expose the information it collects so we can build a web UI for the JobLogger.

That's a bit trickier. Not only do you need to construct one or more data structures to which you must add the appropriate information when tasks/stages/jobs start/complete/fail/resubmit, but you also need to figure out when that information is no longer needed so that you can remove it from the appropriate data structure, avoiding that structure's unbounded growth.

I'm working on a substantial portion of that right now.

@shimingfei
Copy link
Contributor Author

Reynold and markhamstra,
Thanks for your comments, I will modify the code according to the comments

}

//write log information to log file by stageID, withTime parameter controls whether to recored time stamp for the information
private def stageLogInfo(stageID: Int, info: String, withTime: Boolean) = stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is well over 100 characters

@pwendell
Copy link
Contributor

Hi Mingfei,

Thanks for updating this - the new approach is exactly what Reynold and I were suggesting.

I've made several comments about style for your latest patch. We adhere to the Scala style guide with a few changes. Lines must be at most 100 characters. If you could review this patch (not only where I pointed out nits, but the entire thing) for style, that would be great. Here is our style guide:
http://spark-project.org/docs/latest/contributing-to-spark.html

From a code perspective, there are a few remaining things:

  • It would be great if you could add some tests for this to verify the logic inside of JobLogger.scala
  • I'm still a bit confused why you can't sue getCallSite instead of the generator class. Could you fill us in about the thought there?

Thanks again for taking time to update this.

  • Patrick

@shimingfei
Copy link
Contributor Author

Hi Patrick
It is impossible to get the RDD's generator from callsite information some time, because some classes may call into the same code where RDD is generated (for example in Shark, many operators call executeProcessPartition in Operator class to generate mapPartitionsWithIndexRDD), it is better to let user's code be able to set who generate the RDD. and I also merged getRddGenerator function to getSparkCallSite
I added it only for analyzing, if it is not suitable to add it into RDD class, I will remove it from the code and use callsite information instead.
Besides I am adding test case for JobLogger now.

Thanks
Mingfei

@shimingfei
Copy link
Contributor Author

Hi Patrick

I have modified code according to your comments and added test case for joblogger

Thanks
Mingfei

logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler)
val result = dagScheduler.runJob(rdd, func, partitions, callSite + "|" + addInfo.value.toString, allowLocal, resultHandler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is over 100 characters

@pwendell
Copy link
Contributor

Hi Mighfei,

I added a handful of comments on this latest submission.

  1. For the callsite stuff, I proposed a slightly different refactoring than the one you give here. In general though I support re-factoring this to use the same code path.
  2. I would prefer not to have the JobLogger based on a Spark configuration option just yet. I'd rather have it be something that people need to explicitly add. Also, you changed the code to add a StatsListener by default which wasn't the case before. My proposal was just to remove the entire StatsListener object which would fix both of these issues.
  3. There were some more style issues which I commented on.
  4. For the unit test - ideally you want to write a test in a way that actually verifies some of the logic in the JobLogger class. For instance, you could try to verify that the hashmaps you maintain are being setup correctly. There are a few ways to do this. One is to mock out calls to the various handler functions then to look directly at the state of the JobLogger. Another way is to add a constructor option that avoids cleaning the state of the maps inside of JobLogger (default false) and in the tests you set this option to true. Then you could just run a job and verify the contents of the hashmaps.

Thanks,
Patrick

@shimingfei
Copy link
Contributor Author

Hi Patrick

I have modified the code and add unit test according to your suggestion,

Thanks
Mingfei

@@ -65,6 +65,9 @@ class SparkContext(
// Ensure logging is initialized before we spawn any threads
initLogging()

// Allows higher layer frameworks to describe the context of a job
val annotation = new DynamicVariable[String]("")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than having this be a possibly blank string. How about:

new DynamicVariableOption[String]

@pwendell
Copy link
Contributor

pwendell commented Jun 6, 2013

Hi Mingfei,

A few notes about the annotation field - it would be good to not use string parsing as a way to pass additional arguments. Instead, you can use an optional value.

Also, I notice that now you've changed the default format for the callsite info - it would be good to actually leave that unchanged.

Overall, this is looking in good shape. Once you fix these I'll ask Matei or Reynold to do a pass.

Jenkins, test this please

@shimingfei
Copy link
Contributor Author

Hi Patric,
I would like to use "localProperties" variable in latest SparkContext to pass job annotation information, so that interface of "runJob" and "submitJob" will not be changed.
and I also intend to use a daemon thread to do joblogger's work, since joblogger has lots of disk access.
But current spark I forked don't have "localProperties" variable in SparkContext, need I close the current pull request and create a new one?

@pwendell
Copy link
Contributor

Okay I think we can close this now. I'm going to take a look at the new PR.

@pwendell pwendell closed this Jun 10, 2013
pwendell pushed a commit to andyk/mesos-spark that referenced this pull request May 5, 2014
Small bug fix to make sure the "spark contents" are copied to the
deployment directory correctly.

Author: Rahul Singhal <[email protected]>

Closes mesos#573 from rahulsinghaliitd/SPARK-1651 and squashes the following commits:

402c999 [Rahul Singhal] SPARK-1651: Delete existing deployment directory
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants