Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add JobLogger to Spark #573

Closed
wants to merge 15 commits into from
1 change: 1 addition & 0 deletions core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
itr.setDelegate(blockFetcherItr)
CompletionIterator[(K,V), Iterator[(K,V)]](itr, {
val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
shuffleMetrics.shuffleReadMillis = itr.getNetMillis
shuffleMetrics.remoteFetchTime = itr.remoteFetchTime
shuffleMetrics.fetchWaitTime = itr.fetchWaitTime
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/spark/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ abstract class RDD[T: ClassManifest](
name = _name
this
}

/**User-defined generator of this RDD*/
var generator = Utils.getCallSiteInfo._4

/**reset generator*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this used anywhere else. What is the purpose of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just like setName above, this is used to set generator intentionally

def setGenerator(_generator: String) = {
generator = _generator
}

/**
* Set this RDD's storage level to persist its values across operations after the first time
Expand Down Expand Up @@ -749,7 +757,7 @@ abstract class RDD[T: ClassManifest](
private var storageLevel: StorageLevel = StorageLevel.NONE

/** Record user function generating this RDD. */
private[spark] val origin = Utils.getSparkCallSite
private[spark] val origin = Utils.formatSparkCallSite

private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T]

Expand Down
12 changes: 8 additions & 4 deletions core/src/main/scala/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend
import spark.storage.BlockManagerUI
import spark.util.{MetadataCleaner, TimeStampedHashMap}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo}

import scala.util.DynamicVariable
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
Expand All @@ -65,6 +65,9 @@ class SparkContext(
// Ensure logging is initialized before we spawn any threads
initLogging()

// Allows higher layer frameworks to describe the context of a job
val annotation = new DynamicVariable[String]("")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than having this be a possibly blank string. How about:

new DynamicVariableOption[String]


// Set Spark driver host and port system properties
if (System.getProperty("spark.driver.host") == null) {
System.setProperty("spark.driver.host", Utils.localIpAddress)
Expand Down Expand Up @@ -575,10 +578,11 @@ class SparkContext(
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
val callSite = Utils.getSparkCallSite
val callSite = Utils.formatSparkCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler)
val result = dagScheduler.runJob(rdd, func, partitions, callSite + "|" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than piggyback on callsite to add the annotation. Could you instead add another argument to runJob, called annotation, which accepts an Option[String]?

annotation.value.toString, allowLocal, resultHandler)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
result
Expand Down Expand Up @@ -659,7 +663,7 @@ class SparkContext(
evaluator: ApproximateEvaluator[U, R],
timeout: Long
): PartialResult[R] = {
val callSite = Utils.getSparkCallSite
val callSite = Utils.formatSparkCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout)
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/spark/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ private object Utils extends Logging {
* (outside the spark package) that called into Spark, as well as which Spark method they called.
* This is used, for example, to tell users where in their code each RDD got created.
*/
def getSparkCallSite: String = {
def getCallSiteInfo = {
val trace = Thread.currentThread.getStackTrace().filter( el =>
(!el.getMethodName.contains("getStackTrace")))

Expand All @@ -413,6 +413,7 @@ private object Utils extends Logging {
var firstUserFile = "<unknown>"
var firstUserLine = 0
var finished = false
var firstUserClass = "<unknown>"

for (el <- trace) {
if (!finished) {
Expand All @@ -427,13 +428,18 @@ private object Utils extends Logging {
else {
firstUserLine = el.getLineNumber
firstUserFile = el.getFileName
firstUserClass = el.getClassName
finished = true
}
}
}
"%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine)
(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)
}

def formatSparkCallSite = {
val callSiteInfo = getCallSiteInfo
"%s_%s_%s_%s".format(callSiteInfo._1, callSiteInfo._2, callSiteInfo._3, callSiteInfo._4)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make this formatting match what was there before, rather than changing it to this one with underscores? It's fine to leave out the fourth entry, just make it matches what we used to have. The reason why I proposed this refactoring was so that you could leave the existing log messages unchanged.

}
/**
* Try to find a free port to bind to on the local host. This should ideally never be needed,
* except that, unfortunately, some of the networking libraries we currently rely on (e.g. Spray)
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ object TaskMetrics {


class ShuffleReadMetrics extends Serializable {

var shuffleFinishTime: Long = _
/**
* Total number of blocks fetched in a shuffle (remote or local)
*/
var totalBlocksFetched : Int = _
var totalBlocksFetched: Int = _

/**
* Number of remote blocks fetched in a shuffle
Expand Down
16 changes: 14 additions & 2 deletions core/src/main/scala/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,16 @@ class DAGScheduler(
private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
event match {
case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add an annotation field to JobSubmitted as well, (of type Option[String]) rather than using string parsing for this.

val callSites = callSite.split("\\|",2)
var jobInfo = ""
if (callSites.size == 2)
jobInfo = callSites(1)
val runId = nextRunId.getAndIncrement()
val finalStage = newStage(finalRDD, None, runId)
val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener)
val job = new ActiveJob(runId, finalStage, func, partitions, callSites(0), listener)
clearCacheLocs()
logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length +
sparkListeners.foreach{_.onJobStart(job, jobInfo)}
logInfo("Got job " + job.runId + " (" + callSites(0) + ") with " + partitions.length +
" output partitions (allowLocal=" + allowLocal + ")")
logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")")
logInfo("Parents of final stage: " + finalStage.parents)
Expand All @@ -297,6 +302,7 @@ class DAGScheduler(
handleExecutorLost(execId)

case completion: CompletionEvent =>
sparkListeners.foreach{_.onTaskEnd(completion)}
handleTaskCompletion(completion)

case TaskSetFailed(taskSet, reason) =>
Expand All @@ -307,6 +313,8 @@ class DAGScheduler(
for (job <- activeJobs) {
val error = new SparkException("Job cancelled because SparkContext was shut down")
job.listener.jobFailed(error)
val JobCancelEvent = new SparkListenerJobCancelled("SPARKCONTEXT_SHUTDOWN")
sparkListeners.foreach{_.onJobEnd(job, JobCancelEvent)}
}
return true
}
Expand Down Expand Up @@ -454,6 +462,7 @@ class DAGScheduler(
}
}
if (tasks.size > 0) {
sparkListeners.foreach{_.onStageSubmitted(stage, "TASKS_SIZE=" + tasks.size)}
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
myPending ++= tasks
logDebug("New pending tasks: " + myPending)
Expand Down Expand Up @@ -507,6 +516,7 @@ class DAGScheduler(
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
sparkListeners.foreach{_.onJobEnd(job, SparkListenerJobSuccess)}
}
job.listener.taskSucceeded(rt.outputId, event.result)
}
Expand Down Expand Up @@ -642,6 +652,8 @@ class DAGScheduler(
job.listener.jobFailed(new SparkException("Job failed: " + reason))
activeJobs -= job
resultStageToJob -= resultStage
val jobFailedEvent = new SparkListenerJobFailed(failedStage)
sparkListeners.foreach{_.onJobEnd(job, jobFailedEvent)}
}
if (dependentStages.isEmpty) {
logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done")
Expand Down
Loading