From ec6833d32ef14950b2d81790bc908992f6288815 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Mon, 15 Aug 2016 21:11:41 -0700 Subject: [PATCH 01/10] [SPARK-16757] Set up Spark caller context to HDFS --- .../org/apache/spark/scheduler/DAGScheduler.scala | 4 ++-- .../org/apache/spark/scheduler/ResultTask.scala | 5 +++-- .../org/apache/spark/scheduler/ShuffleMapTask.scala | 5 +++-- .../main/scala/org/apache/spark/scheduler/Task.scala | 9 ++++++++- .../src/main/scala/org/apache/spark/util/Utils.scala | 12 ++++++++++++ .../apache/spark/deploy/yarn/ApplicationMaster.scala | 3 +++ .../scala/org/apache/spark/deploy/yarn/Client.scala | 3 +++ 7 files changed, 34 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4eb7c81f9e8cc..4aa494eb16200 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1015,7 +1015,7 @@ class DAGScheduler( val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, - taskBinary, part, locs, stage.latestInfo.taskMetrics, properties) + taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId)) } case stage: ResultStage => @@ -1024,7 +1024,7 @@ class DAGScheduler( val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, - taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics) + taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics, Option(jobId)) } } } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 75c6018e214d8..a3db7211d9dda 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -51,8 +51,9 @@ private[spark] class ResultTask[T, U]( locs: Seq[TaskLocation], val outputId: Int, localProperties: Properties, - metrics: TaskMetrics) - extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties) + metrics: TaskMetrics, + jobId: Option[Int] = None) + extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId) with Serializable { @transient private[this] val preferredLocs: Seq[TaskLocation] = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 84b3e5ba6c1f3..25e3f90524d77 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -51,8 +51,9 @@ private[spark] class ShuffleMapTask( partition: Partition, @transient private var locs: Seq[TaskLocation], metrics: TaskMetrics, - localProperties: Properties) - extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties) + localProperties: Properties, + jobId: Option[Int] = None) + extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId) with Logging { /** A constructor used only in test suites. This does not require passing in an RDD. */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 35c4dafe9c19c..99cda9c9e8697 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -54,7 +54,8 @@ private[spark] abstract class Task[T]( val partitionId: Int, // The default value is only used in tests. val metrics: TaskMetrics = TaskMetrics.registered, - @transient var localProperties: Properties = new Properties) extends Serializable { + @transient var localProperties: Properties = new Properties, + val jobId: Option[Int] = None) extends Serializable { /** * Called by [[org.apache.spark.executor.Executor]] to run this task. @@ -79,6 +80,12 @@ private[spark] abstract class Task[T]( metrics) TaskContext.setTaskContext(context) taskThread = Thread.currentThread() + + val callerContext = + s"JobId_${jobId.get}_StageID_${stageId}_stageAttemptId_${stageAttemptId}" + + s"_taskID_${taskAttemptId}_attemptNumber_${attemptNumber} on Spark" + Utils.setCallerContext(callerContext) + if (_killed) { kill(interruptThread = false) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0ae44a2ed7865..1d1513ba994fa 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2418,6 +2418,18 @@ private[spark] object Utils extends Logging { sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten } } + + def setCallerContext(context: String): Unit = { + try { + val Builder = Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder") + val builderInst = Builder.getConstructor(classOf[String]).newInstance(context) + val ret = Builder.getMethod("build").invoke(builderInst) + val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext") + callerContext.getMethod("setCurrent", callerContext).invoke(null, ret) + } catch { + case NonFatal(e) => logDebug(s"${e.getMessage}") + } + } } /** diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 614278c8b2d22..862493a094449 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -198,6 +198,9 @@ private[spark] class ApplicationMaster( System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) } + val context = s"${System.getProperty("spark.app.name")} running on Spark" + Utils.setCallerContext(context) + logInfo("ApplicationAttemptId: " + appAttemptId) val fs = FileSystem.get(yarnConf) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index e3572d781b0db..96f505bb4aba4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -65,6 +65,9 @@ private[spark] class Client( import Client._ import YarnSparkHadoopUtil._ + val context: String = s"${sparkConf.get("spark.app.name")} running on Spark" + Utils.setCallerContext(context) + def this(clientArgs: ClientArguments, spConf: SparkConf) = this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf) From 6b66e4927aaee8309858c7b4e3e0fcff5f7623d1 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Fri, 19 Aug 2016 21:28:10 -0700 Subject: [PATCH 02/10] Use 'getOrElse' instead of 'get' --- core/src/main/scala/org/apache/spark/scheduler/Task.scala | 2 +- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 99cda9c9e8697..6d8ef193c3250 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -82,7 +82,7 @@ private[spark] abstract class Task[T]( taskThread = Thread.currentThread() val callerContext = - s"JobId_${jobId.get}_StageID_${stageId}_stageAttemptId_${stageAttemptId}" + + s"JobId_${jobId.getOrElse("0")}_StageID_${stageId}_stageAttemptId_${stageAttemptId}" + s"_taskID_${taskAttemptId}_attemptNumber_${attemptNumber} on Spark" Utils.setCallerContext(callerContext) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 96f505bb4aba4..8b7f160a577f3 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -65,7 +65,8 @@ private[spark] class Client( import Client._ import YarnSparkHadoopUtil._ - val context: String = s"${sparkConf.get("spark.app.name")} running on Spark" + + val context: String = s"${sparkConf.get("spark.app.name", "")} running on Spark" Utils.setCallerContext(context) def this(clientArgs: ClientArguments, spConf: SparkConf) = From 3b9a17e6dc9ef60a4c40f8aab2d0409c32b864e1 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Sun, 21 Aug 2016 18:27:49 -0700 Subject: [PATCH 03/10] Remove spaces in the value of Spark caller context --- core/src/main/scala/org/apache/spark/scheduler/Task.scala | 4 ++-- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 6d8ef193c3250..1d5ea18533b18 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -82,8 +82,8 @@ private[spark] abstract class Task[T]( taskThread = Thread.currentThread() val callerContext = - s"JobId_${jobId.getOrElse("0")}_StageID_${stageId}_stageAttemptId_${stageAttemptId}" + - s"_taskID_${taskAttemptId}_attemptNumber_${attemptNumber} on Spark" + s"Spark_JobId_${jobId.getOrElse("0")}_StageID_${stageId}_stageAttemptId_${stageAttemptId}" + + s"_taskID_${taskAttemptId}_attemptNumber_${attemptNumber}" Utils.setCallerContext(callerContext) if (_killed) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 862493a094449..f02a996426312 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -198,7 +198,7 @@ private[spark] class ApplicationMaster( System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) } - val context = s"${System.getProperty("spark.app.name")} running on Spark" + val context = s"Spark_${System.getProperty("spark.app.name")}" Utils.setCallerContext(context) logInfo("ApplicationAttemptId: " + appAttemptId) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8b7f160a577f3..d783ab9628805 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -66,7 +66,7 @@ private[spark] class Client( import YarnSparkHadoopUtil._ - val context: String = s"${sparkConf.get("spark.app.name", "")} running on Spark" + val context: String = s"Spark_${sparkConf.get("spark.app.name", "")}" Utils.setCallerContext(context) def this(clientArgs: ClientArguments, spConf: SparkConf) = From 5ab2a41b93bfd73baf3798ba66fc7554b10b78e6 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Sun, 21 Aug 2016 22:07:44 -0700 Subject: [PATCH 04/10] Add Application ID and Attempt ID in the value of the caller context when Yarn client and ApplicationMaster invoke Hadoop caller context API --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 8 ++++++-- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 7 +++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index f02a996426312..34dc71d60f41c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -184,6 +184,9 @@ private[spark] class ApplicationMaster( try { val appAttemptId = client.getAttemptId() + var context = s"Spark_AppName_${System.getProperty("spark.app.name")}" + + s"_AppId_${appAttemptId.getApplicationId}" + if (isClusterMode) { // Set the web ui port to be ephemeral for yarn so we don't conflict with // other spark processes running on the same box @@ -196,9 +199,10 @@ private[spark] class ApplicationMaster( // Set this internal configuration if it is running on cluster mode, this // configuration will be checked in SparkContext to avoid misuse of yarn cluster mode. System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) - } - val context = s"Spark_${System.getProperty("spark.app.name")}" + context = context + s"_AttemptId_${appAttemptId.getAttemptId}" + } + // Set Spark caller context to HDFS Utils.setCallerContext(context) logInfo("ApplicationAttemptId: " + appAttemptId) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index d783ab9628805..f12e2c4a9b14c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -65,10 +65,6 @@ private[spark] class Client( import Client._ import YarnSparkHadoopUtil._ - - val context: String = s"Spark_${sparkConf.get("spark.app.name", "")}" - Utils.setCallerContext(context) - def this(clientArgs: ClientArguments, spConf: SparkConf) = this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf) @@ -165,6 +161,9 @@ private[spark] class Client( reportLauncherState(SparkAppHandle.State.SUBMITTED) launcherBackend.setAppId(appId.toString) + val context: String = s"Spark_AppName_${sparkConf.get("spark.app.name", "")}_AppId_$appId" + Utils.setCallerContext(context) + // Verify whether the cluster has enough resources for our AM verifyClusterResources(newAppResponse) From 1512775a3faddb9de9299662a6f3bfec3f6fe205 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Sun, 21 Aug 2016 23:20:54 -0700 Subject: [PATCH 05/10] Add application ID and attempt ID to the value of the caller context when 'Task' invoke Hadoop caller context API --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 ++++-- .../main/scala/org/apache/spark/scheduler/ResultTask.scala | 7 +++++-- .../scala/org/apache/spark/scheduler/ShuffleMapTask.scala | 7 +++++-- core/src/main/scala/org/apache/spark/scheduler/Task.scala | 7 +++++-- 4 files changed, 19 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4aa494eb16200..92b02d85e6080 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1015,7 +1015,8 @@ class DAGScheduler( val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, - taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId)) + taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId), + Option(sc.applicationId), sc.applicationAttemptId) } case stage: ResultStage => @@ -1024,7 +1025,8 @@ class DAGScheduler( val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, - taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics, Option(jobId)) + taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics, + Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } } } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index a3db7211d9dda..3f3130856dbdc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -52,8 +52,11 @@ private[spark] class ResultTask[T, U]( val outputId: Int, localProperties: Properties, metrics: TaskMetrics, - jobId: Option[Int] = None) - extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId) + jobId: Option[Int] = None, + appId: Option[String] = None, + appAttemptId: Option[String] = None) + extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId, + appId, appAttemptId) with Serializable { @transient private[this] val preferredLocs: Seq[TaskLocation] = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 25e3f90524d77..be682ef8a677c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -52,8 +52,11 @@ private[spark] class ShuffleMapTask( @transient private var locs: Seq[TaskLocation], metrics: TaskMetrics, localProperties: Properties, - jobId: Option[Int] = None) - extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId) + jobId: Option[Int] = None, + appId: Option[String] = None, + appAttemptId: Option[String] = None) + extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId, + appId, appAttemptId) with Logging { /** A constructor used only in test suites. This does not require passing in an RDD. */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 1d5ea18533b18..f9b9c5d92dc3c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -55,7 +55,9 @@ private[spark] abstract class Task[T]( // The default value is only used in tests. val metrics: TaskMetrics = TaskMetrics.registered, @transient var localProperties: Properties = new Properties, - val jobId: Option[Int] = None) extends Serializable { + val jobId: Option[Int] = None, + val appId: Option[String] = None, + val appAttemptId: Option[String] = None) extends Serializable { /** * Called by [[org.apache.spark.executor.Executor]] to run this task. @@ -82,7 +84,8 @@ private[spark] abstract class Task[T]( taskThread = Thread.currentThread() val callerContext = - s"Spark_JobId_${jobId.getOrElse("0")}_StageID_${stageId}_stageAttemptId_${stageAttemptId}" + + s"Spark_AppId_${appId.getOrElse("")}_AppAttemptId_${appAttemptId.getOrElse("None")}" + + s"_JobId_${jobId.getOrElse("0")}_StageID_${stageId}_stageAttemptId_${stageAttemptId}" + s"_taskID_${taskAttemptId}_attemptNumber_${attemptNumber}" Utils.setCallerContext(callerContext) From ae42093e59e37d0a4fda4280f2bbffec18c594d3 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Wed, 31 Aug 2016 22:06:13 -0700 Subject: [PATCH 06/10] Add an unit test --- core/src/main/scala/org/apache/spark/util/Utils.scala | 7 +++++-- .../src/test/scala/org/apache/spark/util/UtilsSuite.scala | 8 ++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1d1513ba994fa..20122cbac7638 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2419,16 +2419,19 @@ private[spark] object Utils extends Logging { } } - def setCallerContext(context: String): Unit = { + def setCallerContext(context: String): Boolean = { + var succeed = false try { val Builder = Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder") val builderInst = Builder.getConstructor(classOf[String]).newInstance(context) val ret = Builder.getMethod("build").invoke(builderInst) val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext") callerContext.getMethod("setCurrent", callerContext).invoke(null, ret) + succeed = true } catch { - case NonFatal(e) => logDebug(s"${e.getMessage}") + case NonFatal(e) => logDebug(s"$e", e) } + succeed } } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 30952a9458345..2920f71707d48 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -787,6 +787,14 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { .set("spark.executor.instances", "1")) === 3) } + test("setCallerContext") { + try { + Utils.classForName("org.apache.hadoop.ipc.CallerContext") + assert(Utils.setCallerContext("sparkCallerContext")) + } catch { + case e: ClassNotFoundException => assert(!Utils.setCallerContext("sparkCallerContext")) + } + } test("encodeFileNameToURIRawPath") { assert(Utils.encodeFileNameToURIRawPath("abc") === "abc") From 748e7a9b6f6fe928df9e49f8e020d02126123be8 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Wed, 14 Sep 2016 22:38:15 -0700 Subject: [PATCH 07/10] Add a CallerContext call, update java doc, and shorten caller context string --- .../apache/spark/scheduler/ResultTask.scala | 5 ++- .../spark/scheduler/ShuffleMapTask.scala | 3 ++ .../org/apache/spark/scheduler/Task.scala | 15 +++++---- .../scala/org/apache/spark/util/Utils.scala | 33 ++++++++++++++++--- .../org/apache/spark/util/UtilsSuite.scala | 12 ++++--- .../spark/deploy/yarn/ApplicationMaster.scala | 10 +++--- .../org/apache/spark/deploy/yarn/Client.scala | 5 ++- 7 files changed, 58 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 3f3130856dbdc..ca826eb516234 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -42,7 +42,10 @@ import org.apache.spark.rdd.RDD * input RDD's partitions). * @param localProperties copy of thread-local properties set by the user on the driver side. * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side. - */ + * @param jobId id of the job this task belongs to + * @param appId id of the app this task belongs to + * @param appAttemptId attempt id of the app this task belongs to + */ private[spark] class ResultTask[T, U]( stageId: Int, stageAttemptId: Int, diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index be682ef8a677c..0313af68bf9a7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -43,6 +43,9 @@ import org.apache.spark.shuffle.ShuffleWriter * @param locs preferred task execution locations for locality scheduling * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side. * @param localProperties copy of thread-local properties set by the user on the driver side. + * @param jobId id of the job this task belongs to + * @param appId id of the app this task belongs to + * @param appAttemptId attempt id of the app this task belongs to */ private[spark] class ShuffleMapTask( stageId: Int, diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index f9b9c5d92dc3c..af200953a0ed9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -29,7 +29,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.memory.{MemoryMode, TaskMemoryManager} import org.apache.spark.metrics.MetricsSystem import org.apache.spark.serializer.SerializerInstance -import org.apache.spark.util.{AccumulatorV2, ByteBufferInputStream, ByteBufferOutputStream, Utils} +import org.apache.spark.util._ /** * A unit of execution. We have two kinds of Task's in Spark: @@ -47,6 +47,9 @@ import org.apache.spark.util.{AccumulatorV2, ByteBufferInputStream, ByteBufferOu * @param partitionId index of the number in the RDD * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side. * @param localProperties copy of thread-local properties set by the user on the driver side. + * @param jobId id of the job this task belongs to + * @param appId id of the app this task belongs to + * @param appAttemptId attempt id of the app this task belongs to */ private[spark] abstract class Task[T]( val stageId: Int, @@ -83,15 +86,13 @@ private[spark] abstract class Task[T]( TaskContext.setTaskContext(context) taskThread = Thread.currentThread() - val callerContext = - s"Spark_AppId_${appId.getOrElse("")}_AppAttemptId_${appAttemptId.getOrElse("None")}" + - s"_JobId_${jobId.getOrElse("0")}_StageID_${stageId}_stageAttemptId_${stageAttemptId}" + - s"_taskID_${taskAttemptId}_attemptNumber_${attemptNumber}" - Utils.setCallerContext(callerContext) - if (_killed) { kill(interruptThread = false) } + + new CallerContext(None, appId, appAttemptId, jobId, Option(stageId), Option(stageAttemptId), + Option(taskAttemptId), Option(attemptNumber)).set() + try { runTask(context) } catch { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 20122cbac7638..d6d08404774d7 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2418,18 +2418,41 @@ private[spark] object Utils extends Logging { sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten } } +} - def setCallerContext(context: String): Boolean = { +private[spark] class CallerContext( + appName: Option[String] = None, + appID: Option[String] = None, + appAttemptID: Option[String] = None, + jobID: Option[Int] = None, + stageID: Option[Int] = None, + stageAttemptId: Option[Int] = None, + taskId: Option[Long] = None, + taskAttemptNumber: Option[Int] = None) extends Logging { + + val AppName = if (appName.isDefined) s"_AppName_${appName.get}" else "" + val AppID = if (appID.isDefined) s"_AppID_${appID.get}" else "" + val AppAttemptID = if (appAttemptID.isDefined) s"_${appAttemptID.get}" else "" + val JobID = if (jobID.isDefined) s"_JobID_${jobID.get}" else "" + val StageID = if (stageID.isDefined) s"_StageID_${stageID.get}" else "" + val StageAttemptId = if (stageAttemptId.isDefined) s"_${stageAttemptId.get}" else "" + val TaskId = if (taskId.isDefined) s"_TaskId_${taskId.get}" else "" + val TaskAttemptNumber = if (taskAttemptNumber.isDefined) s"_${taskAttemptNumber.get}" else "" + + val context = "SPARK" + AppName + AppID + AppAttemptID + + JobID + StageID + StageAttemptId + TaskId + TaskAttemptNumber + + def set(): Boolean = { var succeed = false try { + val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext") val Builder = Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder") val builderInst = Builder.getConstructor(classOf[String]).newInstance(context) - val ret = Builder.getMethod("build").invoke(builderInst) - val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext") - callerContext.getMethod("setCurrent", callerContext).invoke(null, ret) + val hdfsContext = Builder.getMethod("build").invoke(builderInst) + callerContext.getMethod("setCurrent", callerContext).invoke(null, hdfsContext) succeed = true } catch { - case NonFatal(e) => logDebug(s"$e", e) + case NonFatal(e) => logInfo("Fail to set Spark caller context", e) } succeed } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 2920f71707d48..d89e650aa48cb 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -787,12 +787,16 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { .set("spark.executor.instances", "1")) === 3) } - test("setCallerContext") { + test("Set Spark CallerContext") { + val appName = "myTestApp" try { - Utils.classForName("org.apache.hadoop.ipc.CallerContext") - assert(Utils.setCallerContext("sparkCallerContext")) + val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext") + assert(new CallerContext(Option(s"$appName")).set()) + assert(s"SPARK_AppName_$appName" === + callerContext.getMethod("getCurrent").invoke(null).toString) } catch { - case e: ClassNotFoundException => assert(!Utils.setCallerContext("sparkCallerContext")) + case e: ClassNotFoundException => + assert(!new CallerContext(Option(s"$appName")).set()) } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 34dc71d60f41c..a30223332d81a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -184,8 +184,7 @@ private[spark] class ApplicationMaster( try { val appAttemptId = client.getAttemptId() - var context = s"Spark_AppName_${System.getProperty("spark.app.name")}" + - s"_AppId_${appAttemptId.getApplicationId}" + var attemptID: Option[String] = None if (isClusterMode) { // Set the web ui port to be ephemeral for yarn so we don't conflict with @@ -200,10 +199,11 @@ private[spark] class ApplicationMaster( // configuration will be checked in SparkContext to avoid misuse of yarn cluster mode. System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) - context = context + s"_AttemptId_${appAttemptId.getAttemptId}" + attemptID = Option(appAttemptId.getAttemptId.toString) } - // Set Spark caller context to HDFS - Utils.setCallerContext(context) + + new CallerContext(Option(System.getProperty("spark.app.name")), + Option(appAttemptId.getApplicationId.toString), attemptID).set() logInfo("ApplicationAttemptId: " + appAttemptId) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index f12e2c4a9b14c..bf973f75a6d02 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -54,7 +54,7 @@ import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils} -import org.apache.spark.util.Utils +import org.apache.spark.util.{CallerContext, Utils} private[spark] class Client( val args: ClientArguments, @@ -161,8 +161,7 @@ private[spark] class Client( reportLauncherState(SparkAppHandle.State.SUBMITTED) launcherBackend.setAppId(appId.toString) - val context: String = s"Spark_AppName_${sparkConf.get("spark.app.name", "")}_AppId_$appId" - Utils.setCallerContext(context) + new CallerContext(Option(sparkConf.get("spark.app.name", "")), Option(appId.toString)).set() // Verify whether the cluster has enough resources for our AM verifyClusterResources(newAppResponse) From 10dbc6f26ac7d224803b721f32a9a0b4306e1f47 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Tue, 20 Sep 2016 12:10:59 -0700 Subject: [PATCH 08/10] Update the content of the caller context and add more description for CallerContext class --- .../apache/spark/scheduler/ResultTask.scala | 2 + .../spark/scheduler/ShuffleMapTask.scala | 2 + .../org/apache/spark/scheduler/Task.scala | 6 +- .../scala/org/apache/spark/util/Utils.scala | 55 ++++++++++++++----- .../org/apache/spark/util/UtilsSuite.scala | 8 +-- .../spark/deploy/yarn/ApplicationMaster.scala | 4 +- .../org/apache/spark/deploy/yarn/Client.scala | 2 +- 7 files changed, 55 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index ca826eb516234..68da9ba8fcf0b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -42,6 +42,8 @@ import org.apache.spark.rdd.RDD * input RDD's partitions). * @param localProperties copy of thread-local properties set by the user on the driver side. * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side. + * + * The parameters below are optional: * @param jobId id of the job this task belongs to * @param appId id of the app this task belongs to * @param appAttemptId attempt id of the app this task belongs to diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 0313af68bf9a7..03002bf2804e7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -43,6 +43,8 @@ import org.apache.spark.shuffle.ShuffleWriter * @param locs preferred task execution locations for locality scheduling * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side. * @param localProperties copy of thread-local properties set by the user on the driver side. + * + * The parameters below are optional: * @param jobId id of the job this task belongs to * @param appId id of the app this task belongs to * @param appAttemptId attempt id of the app this task belongs to diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index af200953a0ed9..87dd888d855cf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -47,6 +47,8 @@ import org.apache.spark.util._ * @param partitionId index of the number in the RDD * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side. * @param localProperties copy of thread-local properties set by the user on the driver side. + * + * The parameters below are optional: * @param jobId id of the job this task belongs to * @param appId id of the app this task belongs to * @param appAttemptId attempt id of the app this task belongs to @@ -90,8 +92,8 @@ private[spark] abstract class Task[T]( kill(interruptThread = false) } - new CallerContext(None, appId, appAttemptId, jobId, Option(stageId), Option(stageAttemptId), - Option(taskAttemptId), Option(attemptNumber)).set() + new CallerContext("TASK", appId, appAttemptId, jobId, Option(stageId), Option(stageAttemptId), + Option(taskAttemptId), Option(attemptNumber)).setCurrentContext() try { runTask(context) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d6d08404774d7..3caa5e90ca399 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2420,29 +2420,54 @@ private[spark] object Utils extends Logging { } } +/** + * An utility class used to set up Spark caller contexts to HDFS and Yarn. The `context` will be + * constructed by parameters passed in. + * When Spark applications run on Yarn and HDFS, its caller contexts will be written into Yarn RM + * audit log and hdfs-audit.log. That can help users to better diagnose and understand how + * specific applications impacting parts of the Hadoop system and potential problems they may be + * creating (e.g. overloading NN). As HDFS mentioned in HDFS-9184, for a given HDFS operation, it's + * very helpful to track which upper level job issues it. + * + * @param from who sets up the caller context (TASK, CLIENT, APPLICATION_MASTER) + * + * The parameters below are optional: + * @param appID id of the app this task belongs to + * @param appAttemptID attempt id of the app this task belongs to + * @param jobID id of the job this task belongs to + * @param stageID id of the stage this task belongs to + * @param stageAttemptId attempt id of the stage this task belongs to + * @param taskId task id + * @param taskAttemptNumber task attempt id + * @since 2.0.1 + */ private[spark] class CallerContext( - appName: Option[String] = None, - appID: Option[String] = None, - appAttemptID: Option[String] = None, - jobID: Option[Int] = None, - stageID: Option[Int] = None, + from: String, + appId: Option[String] = None, + appAttemptId: Option[String] = None, + jobId: Option[Int] = None, + stageId: Option[Int] = None, stageAttemptId: Option[Int] = None, taskId: Option[Long] = None, taskAttemptNumber: Option[Int] = None) extends Logging { - val AppName = if (appName.isDefined) s"_AppName_${appName.get}" else "" - val AppID = if (appID.isDefined) s"_AppID_${appID.get}" else "" - val AppAttemptID = if (appAttemptID.isDefined) s"_${appAttemptID.get}" else "" - val JobID = if (jobID.isDefined) s"_JobID_${jobID.get}" else "" - val StageID = if (stageID.isDefined) s"_StageID_${stageID.get}" else "" - val StageAttemptId = if (stageAttemptId.isDefined) s"_${stageAttemptId.get}" else "" + val AppId = if (appId.isDefined) s"_AppId_${appId.get}" else "" + val AppAttemptId = if (appAttemptId.isDefined) s"_AttemptId_${appAttemptId.get}" else "" + val JobId = if (jobId.isDefined) s"_JobId_${jobId.get}" else "" + val StageId = if (stageId.isDefined) s"_StageId_${stageId.get}" else "" + val StageAttemptId = if (stageAttemptId.isDefined) s"_AttemptId_${stageAttemptId.get}" else "" val TaskId = if (taskId.isDefined) s"_TaskId_${taskId.get}" else "" - val TaskAttemptNumber = if (taskAttemptNumber.isDefined) s"_${taskAttemptNumber.get}" else "" + val TaskAttemptNumber = + if (taskAttemptNumber.isDefined) s"_AttemptNum_${taskAttemptNumber.get}" else "" - val context = "SPARK" + AppName + AppID + AppAttemptID + - JobID + StageID + StageAttemptId + TaskId + TaskAttemptNumber + val context = "SPARK_" + from + AppId + AppAttemptId + + JobId + StageId + StageAttemptId + TaskId + TaskAttemptNumber - def set(): Boolean = { + /** + * Set up the caller context [[context]] by invoking Hadoop CallerContext API of + * [[org.apache.hadoop.ipc.CallerContext]], which was added in hadoop 2.8. + */ + def setCurrentContext(): Boolean = { var succeed = false try { val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext") diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index d89e650aa48cb..da144ef86c72e 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -788,15 +788,15 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { } test("Set Spark CallerContext") { - val appName = "myTestApp" + val context = "test" try { val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext") - assert(new CallerContext(Option(s"$appName")).set()) - assert(s"SPARK_AppName_$appName" === + assert(new CallerContext(context).setCurrentContext()) + assert(s"SPARK_$context" === callerContext.getMethod("getCurrent").invoke(null).toString) } catch { case e: ClassNotFoundException => - assert(!new CallerContext(Option(s"$appName")).set()) + assert(!new CallerContext(context).setCurrentContext()) } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index a30223332d81a..a6de00fd04a62 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -202,8 +202,8 @@ private[spark] class ApplicationMaster( attemptID = Option(appAttemptId.getAttemptId.toString) } - new CallerContext(Option(System.getProperty("spark.app.name")), - Option(appAttemptId.getApplicationId.toString), attemptID).set() + new CallerContext("APPLICATION_MASTER", + Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext() logInfo("ApplicationAttemptId: " + appAttemptId) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index bf973f75a6d02..14cd4dc71d268 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -161,7 +161,7 @@ private[spark] class Client( reportLauncherState(SparkAppHandle.State.SUBMITTED) launcherBackend.setAppId(appId.toString) - new CallerContext(Option(sparkConf.get("spark.app.name", "")), Option(appId.toString)).set() + new CallerContext("CLIENT", Option(appId.toString)).setCurrentContext() // Verify whether the cluster has enough resources for our AM verifyClusterResources(newAppResponse) From 47de8a2a9e1640e0ea942d1a689150d7b7a66c10 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Thu, 22 Sep 2016 16:26:59 -0700 Subject: [PATCH 09/10] Update the content of the caller context string --- .../scala/org/apache/spark/util/Utils.scala | 24 +++++++++---------- .../spark/deploy/yarn/ApplicationMaster.scala | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 3caa5e90ca399..af8caa03dfbe7 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2429,13 +2429,13 @@ private[spark] object Utils extends Logging { * creating (e.g. overloading NN). As HDFS mentioned in HDFS-9184, for a given HDFS operation, it's * very helpful to track which upper level job issues it. * - * @param from who sets up the caller context (TASK, CLIENT, APPLICATION_MASTER) + * @param from who sets up the caller context (TASK, CLIENT, APPMASTER) * * The parameters below are optional: - * @param appID id of the app this task belongs to - * @param appAttemptID attempt id of the app this task belongs to - * @param jobID id of the job this task belongs to - * @param stageID id of the stage this task belongs to + * @param appId id of the app this task belongs to + * @param appAttemptId attempt id of the app this task belongs to + * @param jobId id of the job this task belongs to + * @param stageId id of the stage this task belongs to * @param stageAttemptId attempt id of the stage this task belongs to * @param taskId task id * @param taskAttemptNumber task attempt id @@ -2451,14 +2451,14 @@ private[spark] class CallerContext( taskId: Option[Long] = None, taskAttemptNumber: Option[Int] = None) extends Logging { - val AppId = if (appId.isDefined) s"_AppId_${appId.get}" else "" - val AppAttemptId = if (appAttemptId.isDefined) s"_AttemptId_${appAttemptId.get}" else "" - val JobId = if (jobId.isDefined) s"_JobId_${jobId.get}" else "" - val StageId = if (stageId.isDefined) s"_StageId_${stageId.get}" else "" - val StageAttemptId = if (stageAttemptId.isDefined) s"_AttemptId_${stageAttemptId.get}" else "" - val TaskId = if (taskId.isDefined) s"_TaskId_${taskId.get}" else "" + val AppId = if (appId.isDefined) s"_${appId.get}" else "" + val AppAttemptId = if (appAttemptId.isDefined) s"_${appAttemptId.get}" else "" + val JobId = if (jobId.isDefined) s"_JId_${jobId.get}" else "" + val StageId = if (stageId.isDefined) s"_SId_${stageId.get}" else "" + val StageAttemptId = if (stageAttemptId.isDefined) s"_${stageAttemptId.get}" else "" + val TaskId = if (taskId.isDefined) s"_TId_${taskId.get}" else "" val TaskAttemptNumber = - if (taskAttemptNumber.isDefined) s"_AttemptNum_${taskAttemptNumber.get}" else "" + if (taskAttemptNumber.isDefined) s"_${taskAttemptNumber.get}" else "" val context = "SPARK_" + from + AppId + AppAttemptId + JobId + StageId + StageAttemptId + TaskId + TaskAttemptNumber diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index a6de00fd04a62..91ec31dbd0479 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -202,7 +202,7 @@ private[spark] class ApplicationMaster( attemptID = Option(appAttemptId.getAttemptId.toString) } - new CallerContext("APPLICATION_MASTER", + new CallerContext("APPMASTER", Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext() logInfo("ApplicationAttemptId: " + appAttemptId) From dbcabfc3ff0d14c1a0a77daddc7751a77ec6d241 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Mon, 26 Sep 2016 14:38:19 -0700 Subject: [PATCH 10/10] rename local vals and remove the '@since' --- .../scala/org/apache/spark/util/Utils.scala | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index af8caa03dfbe7..00f247e01bbda 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2439,7 +2439,6 @@ private[spark] object Utils extends Logging { * @param stageAttemptId attempt id of the stage this task belongs to * @param taskId task id * @param taskAttemptNumber task attempt id - * @since 2.0.1 */ private[spark] class CallerContext( from: String, @@ -2451,17 +2450,17 @@ private[spark] class CallerContext( taskId: Option[Long] = None, taskAttemptNumber: Option[Int] = None) extends Logging { - val AppId = if (appId.isDefined) s"_${appId.get}" else "" - val AppAttemptId = if (appAttemptId.isDefined) s"_${appAttemptId.get}" else "" - val JobId = if (jobId.isDefined) s"_JId_${jobId.get}" else "" - val StageId = if (stageId.isDefined) s"_SId_${stageId.get}" else "" - val StageAttemptId = if (stageAttemptId.isDefined) s"_${stageAttemptId.get}" else "" - val TaskId = if (taskId.isDefined) s"_TId_${taskId.get}" else "" - val TaskAttemptNumber = + val appIdStr = if (appId.isDefined) s"_${appId.get}" else "" + val appAttemptIdStr = if (appAttemptId.isDefined) s"_${appAttemptId.get}" else "" + val jobIdStr = if (jobId.isDefined) s"_JId_${jobId.get}" else "" + val stageIdStr = if (stageId.isDefined) s"_SId_${stageId.get}" else "" + val stageAttemptIdStr = if (stageAttemptId.isDefined) s"_${stageAttemptId.get}" else "" + val taskIdStr = if (taskId.isDefined) s"_TId_${taskId.get}" else "" + val taskAttemptNumberStr = if (taskAttemptNumber.isDefined) s"_${taskAttemptNumber.get}" else "" - val context = "SPARK_" + from + AppId + AppAttemptId + - JobId + StageId + StageAttemptId + TaskId + TaskAttemptNumber + val context = "SPARK_" + from + appIdStr + appAttemptIdStr + + jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + taskAttemptNumberStr /** * Set up the caller context [[context]] by invoking Hadoop CallerContext API of