-
Notifications
You must be signed in to change notification settings - Fork 385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add JobLogger to Spark #573
Changes from all commits
75be4a4
f8ac7c4
3c80ece
f7bef28
de891e6
f14a706
cbf0031
ce2d8e3
859aed7
1c7d721
04e2b0f
b599c4b
cb8037b
42f352f
157141c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,7 +42,7 @@ import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend | |
import spark.storage.BlockManagerUI | ||
import spark.util.{MetadataCleaner, TimeStampedHashMap} | ||
import spark.storage.{StorageStatus, StorageUtils, RDDInfo} | ||
|
||
import scala.util.DynamicVariable | ||
/** | ||
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark | ||
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. | ||
|
@@ -65,6 +65,9 @@ class SparkContext( | |
// Ensure logging is initialized before we spawn any threads | ||
initLogging() | ||
|
||
// Allows higher layer frameworks to describe the context of a job | ||
val annotation = new DynamicVariable[String]("") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rather than having this be a possibly blank string. How about: new DynamicVariableOption[String] |
||
|
||
// Set Spark driver host and port system properties | ||
if (System.getProperty("spark.driver.host") == null) { | ||
System.setProperty("spark.driver.host", Utils.localIpAddress) | ||
|
@@ -575,10 +578,11 @@ class SparkContext( | |
partitions: Seq[Int], | ||
allowLocal: Boolean, | ||
resultHandler: (Int, U) => Unit) { | ||
val callSite = Utils.getSparkCallSite | ||
val callSite = Utils.formatSparkCallSite | ||
logInfo("Starting job: " + callSite) | ||
val start = System.nanoTime | ||
val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler) | ||
val result = dagScheduler.runJob(rdd, func, partitions, callSite + "|" + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than piggyback on callsite to add the annotation. Could you instead add another argument to runJob, called annotation, which accepts an Option[String]? |
||
annotation.value.toString, allowLocal, resultHandler) | ||
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") | ||
rdd.doCheckpoint() | ||
result | ||
|
@@ -659,7 +663,7 @@ class SparkContext( | |
evaluator: ApproximateEvaluator[U, R], | ||
timeout: Long | ||
): PartialResult[R] = { | ||
val callSite = Utils.getSparkCallSite | ||
val callSite = Utils.formatSparkCallSite | ||
logInfo("Starting job: " + callSite) | ||
val start = System.nanoTime | ||
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -401,7 +401,7 @@ private object Utils extends Logging { | |
* (outside the spark package) that called into Spark, as well as which Spark method they called. | ||
* This is used, for example, to tell users where in their code each RDD got created. | ||
*/ | ||
def getSparkCallSite: String = { | ||
def getCallSiteInfo = { | ||
val trace = Thread.currentThread.getStackTrace().filter( el => | ||
(!el.getMethodName.contains("getStackTrace"))) | ||
|
||
|
@@ -413,6 +413,7 @@ private object Utils extends Logging { | |
var firstUserFile = "<unknown>" | ||
var firstUserLine = 0 | ||
var finished = false | ||
var firstUserClass = "<unknown>" | ||
|
||
for (el <- trace) { | ||
if (!finished) { | ||
|
@@ -427,13 +428,18 @@ private object Utils extends Logging { | |
else { | ||
firstUserLine = el.getLineNumber | ||
firstUserFile = el.getFileName | ||
firstUserClass = el.getClassName | ||
finished = true | ||
} | ||
} | ||
} | ||
"%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine) | ||
(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass) | ||
} | ||
|
||
def formatSparkCallSite = { | ||
val callSiteInfo = getCallSiteInfo | ||
"%s_%s_%s_%s".format(callSiteInfo._1, callSiteInfo._2, callSiteInfo._3, callSiteInfo._4) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you make this formatting match what was there before, rather than changing it to this one with underscores? It's fine to leave out the fourth entry, just make it matches what we used to have. The reason why I proposed this refactoring was so that you could leave the existing log messages unchanged. |
||
} | ||
/** | ||
* Try to find a free port to bind to on the local host. This should ideally never be needed, | ||
* except that, unfortunately, some of the networking libraries we currently rely on (e.g. Spray) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -275,11 +275,16 @@ class DAGScheduler( | |
private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = { | ||
event match { | ||
case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be good to add an annotation field to JobSubmitted as well, (of type Option[String]) rather than using string parsing for this. |
||
val callSites = callSite.split("\\|",2) | ||
var jobInfo = "" | ||
if (callSites.size == 2) | ||
jobInfo = callSites(1) | ||
val runId = nextRunId.getAndIncrement() | ||
val finalStage = newStage(finalRDD, None, runId) | ||
val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener) | ||
val job = new ActiveJob(runId, finalStage, func, partitions, callSites(0), listener) | ||
clearCacheLocs() | ||
logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length + | ||
sparkListeners.foreach{_.onJobStart(job, jobInfo)} | ||
logInfo("Got job " + job.runId + " (" + callSites(0) + ") with " + partitions.length + | ||
" output partitions (allowLocal=" + allowLocal + ")") | ||
logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")") | ||
logInfo("Parents of final stage: " + finalStage.parents) | ||
|
@@ -297,6 +302,7 @@ class DAGScheduler( | |
handleExecutorLost(execId) | ||
|
||
case completion: CompletionEvent => | ||
sparkListeners.foreach{_.onTaskEnd(completion)} | ||
handleTaskCompletion(completion) | ||
|
||
case TaskSetFailed(taskSet, reason) => | ||
|
@@ -307,6 +313,8 @@ class DAGScheduler( | |
for (job <- activeJobs) { | ||
val error = new SparkException("Job cancelled because SparkContext was shut down") | ||
job.listener.jobFailed(error) | ||
val JobCancelEvent = new SparkListenerJobCancelled("SPARKCONTEXT_SHUTDOWN") | ||
sparkListeners.foreach{_.onJobEnd(job, JobCancelEvent)} | ||
} | ||
return true | ||
} | ||
|
@@ -454,6 +462,7 @@ class DAGScheduler( | |
} | ||
} | ||
if (tasks.size > 0) { | ||
sparkListeners.foreach{_.onStageSubmitted(stage, "TASKS_SIZE=" + tasks.size)} | ||
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") | ||
myPending ++= tasks | ||
logDebug("New pending tasks: " + myPending) | ||
|
@@ -507,6 +516,7 @@ class DAGScheduler( | |
activeJobs -= job | ||
resultStageToJob -= stage | ||
markStageAsFinished(stage) | ||
sparkListeners.foreach{_.onJobEnd(job, SparkListenerJobSuccess)} | ||
} | ||
job.listener.taskSucceeded(rt.outputId, event.result) | ||
} | ||
|
@@ -642,6 +652,8 @@ class DAGScheduler( | |
job.listener.jobFailed(new SparkException("Job failed: " + reason)) | ||
activeJobs -= job | ||
resultStageToJob -= resultStage | ||
val jobFailedEvent = new SparkListenerJobFailed(failedStage) | ||
sparkListeners.foreach{_.onJobEnd(job, jobFailedEvent)} | ||
} | ||
if (dependentStages.isEmpty) { | ||
logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see this used anywhere else. What is the purpose of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just like setName above, this is used to set generator intentionally