Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,8 @@ class DAGScheduler(
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}

case stage: ResultStage =>
Expand All @@ -1024,7 +1025,8 @@ class DAGScheduler(
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
} catch {
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ import org.apache.spark.rdd.RDD
* input RDD's partitions).
* @param localProperties copy of thread-local properties set by the user on the driver side.
* @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
*/
*
* The parameters below are optional:
* @param jobId id of the job this task belongs to
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

description should mention these are optional.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* @param appId id of the app this task belongs to
* @param appAttemptId attempt id of the app this task belongs to
*/
private[spark] class ResultTask[T, U](
stageId: Int,
stageAttemptId: Int,
Expand All @@ -51,8 +56,12 @@ private[spark] class ResultTask[T, U](
locs: Seq[TaskLocation],
val outputId: Int,
localProperties: Properties,
metrics: TaskMetrics)
extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties)
metrics: TaskMetrics,
jobId: Option[Int] = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to update the description of this class to have new params and descriptions

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

appId: Option[String] = None,
appAttemptId: Option[String] = None)
extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId,
appId, appAttemptId)
with Serializable {

@transient private[this] val preferredLocs: Seq[TaskLocation] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ import org.apache.spark.shuffle.ShuffleWriter
* @param locs preferred task execution locations for locality scheduling
* @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
* @param localProperties copy of thread-local properties set by the user on the driver side.
*
* The parameters below are optional:
* @param jobId id of the job this task belongs to
* @param appId id of the app this task belongs to
* @param appAttemptId attempt id of the app this task belongs to
*/
private[spark] class ShuffleMapTask(
stageId: Int,
Expand All @@ -51,8 +56,12 @@ private[spark] class ShuffleMapTask(
partition: Partition,
@transient private var locs: Seq[TaskLocation],
metrics: TaskMetrics,
localProperties: Properties)
extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties)
localProperties: Properties,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update descriptions and params in java doc above

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

jobId: Option[Int] = None,
appId: Option[String] = None,
appAttemptId: Option[String] = None)
extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId,
appId, appAttemptId)
with Logging {

/** A constructor used only in test suites. This does not require passing in an RDD. */
Expand Down
17 changes: 15 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{AccumulatorV2, ByteBufferInputStream, ByteBufferOutputStream, Utils}
import org.apache.spark.util._

/**
* A unit of execution. We have two kinds of Task's in Spark:
Expand All @@ -47,14 +47,22 @@ import org.apache.spark.util.{AccumulatorV2, ByteBufferInputStream, ByteBufferOu
* @param partitionId index of the number in the RDD
* @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
* @param localProperties copy of thread-local properties set by the user on the driver side.
*
* The parameters below are optional:
* @param jobId id of the job this task belongs to
* @param appId id of the app this task belongs to
* @param appAttemptId attempt id of the app this task belongs to
*/
private[spark] abstract class Task[T](
val stageId: Int,
val stageAttemptId: Int,
val partitionId: Int,
// The default value is only used in tests.
val metrics: TaskMetrics = TaskMetrics.registered,
@transient var localProperties: Properties = new Properties) extends Serializable {
@transient var localProperties: Properties = new Properties,
val jobId: Option[Int] = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update params and descriptions

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these params all optional just to make it easier for different task types? the jobId and appId I think are mandatory now, the appattempt id is still really optional. I'm leaning towards making this not be Option so that is someone adds a new Task Type we make sure these are setup properly and thus context set properly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making these params all optional is not to break current code which uses this API. An alternative way is to mark the current API as deprecated and add a new overloaded function with new parameters. I am going to go this way. Any suggestions?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so Task is a private spark api so there is no issues breaking api. You don't need to deprecate and add new, you can just change and fix up everywhere that calls it or extends it.
There are a handful of test classes that extend it that would need to be fixed up. Let me think about it a bit more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @tgravescs I want to conform this with you if I can just change and fix up everywhere that calls /extends Task. I can do this, but may change many test classes/cases.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fine lets leave it as is.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Thanks, @tgravescs .

val appId: Option[String] = None,
val appAttemptId: Option[String] = None) extends Serializable {

/**
* Called by [[org.apache.spark.executor.Executor]] to run this task.
Expand All @@ -79,9 +87,14 @@ private[spark] abstract class Task[T](
metrics)
TaskContext.setTaskContext(context)
taskThread = Thread.currentThread()

if (_killed) {
kill(interruptThread = false)
}

new CallerContext("TASK", appId, appAttemptId, jobId, Option(stageId), Option(stageAttemptId),
Option(taskAttemptId), Option(attemptNumber)).setCurrentContext()

try {
runTask(context)
} catch {
Expand Down
62 changes: 62 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2420,6 +2420,68 @@ private[spark] object Utils extends Logging {
}
}

/**
* An utility class used to set up Spark caller contexts to HDFS and Yarn. The `context` will be
* constructed by parameters passed in.
* When Spark applications run on Yarn and HDFS, its caller contexts will be written into Yarn RM
* audit log and hdfs-audit.log. That can help users to better diagnose and understand how
* specific applications impacting parts of the Hadoop system and potential problems they may be
* creating (e.g. overloading NN). As HDFS mentioned in HDFS-9184, for a given HDFS operation, it's
* very helpful to track which upper level job issues it.
*
* @param from who sets up the caller context (TASK, CLIENT, APPMASTER)
*
* The parameters below are optional:
* @param appId id of the app this task belongs to
* @param appAttemptId attempt id of the app this task belongs to
* @param jobId id of the job this task belongs to
* @param stageId id of the stage this task belongs to
* @param stageAttemptId attempt id of the stage this task belongs to
* @param taskId task id
* @param taskAttemptNumber task attempt id
*/
private[spark] class CallerContext(
from: String,
appId: Option[String] = None,
appAttemptId: Option[String] = None,
jobId: Option[Int] = None,
stageId: Option[Int] = None,
stageAttemptId: Option[Int] = None,
taskId: Option[Long] = None,
taskAttemptNumber: Option[Int] = None) extends Logging {

val appIdStr = if (appId.isDefined) s"_${appId.get}" else ""
val appAttemptIdStr = if (appAttemptId.isDefined) s"_${appAttemptId.get}" else ""
val jobIdStr = if (jobId.isDefined) s"_JId_${jobId.get}" else ""
val stageIdStr = if (stageId.isDefined) s"_SId_${stageId.get}" else ""
val stageAttemptIdStr = if (stageAttemptId.isDefined) s"_${stageAttemptId.get}" else ""
val taskIdStr = if (taskId.isDefined) s"_TId_${taskId.get}" else ""
val taskAttemptNumberStr =
if (taskAttemptNumber.isDefined) s"_${taskAttemptNumber.get}" else ""

val context = "SPARK_" + from + appIdStr + appAttemptIdStr +
jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + taskAttemptNumberStr

/**
* Set up the caller context [[context]] by invoking Hadoop CallerContext API of
* [[org.apache.hadoop.ipc.CallerContext]], which was added in hadoop 2.8.
*/
def setCurrentContext(): Boolean = {
var succeed = false
try {
val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment that this api was added in hadoop 2.8

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually perhaps add a description to this entire class that explain what this is, what it applies to and when it was added.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I have updated the PR.

val Builder = Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder")
val builderInst = Builder.getConstructor(classOf[String]).newInstance(context)
val hdfsContext = Builder.getMethod("build").invoke(builderInst)
callerContext.getMethod("setCurrent", callerContext).invoke(null, hdfsContext)
succeed = true
} catch {
case NonFatal(e) => logInfo("Fail to set Spark caller context", e)
}
succeed
}
}

/**
* A utility class to redirect the child process's stdout or stderr.
*/
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,18 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
.set("spark.executor.instances", "1")) === 3)
}

test("Set Spark CallerContext") {
val context = "test"
try {
val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext")
assert(new CallerContext(context).setCurrentContext())
assert(s"SPARK_$context" ===
callerContext.getMethod("getCurrent").invoke(null).toString)
} catch {
case e: ClassNotFoundException =>
assert(!new CallerContext(context).setCurrentContext())
}
}

test("encodeFileNameToURIRawPath") {
assert(Utils.encodeFileNameToURIRawPath("abc") === "abc")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ private[spark] class ApplicationMaster(
try {
val appAttemptId = client.getAttemptId()

var attemptID: Option[String] = None

if (isClusterMode) {
// Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
Expand All @@ -196,8 +198,13 @@ private[spark] class ApplicationMaster(
// Set this internal configuration if it is running on cluster mode, this
// configuration will be checked in SparkContext to avoid misuse of yarn cluster mode.
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())

attemptID = Option(appAttemptId.getAttemptId.toString)
}

new CallerContext("APPMASTER",
Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext()

logInfo("ApplicationAttemptId: " + appAttemptId)

val fs = FileSystem.get(yarnConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
import org.apache.spark.util.Utils
import org.apache.spark.util.{CallerContext, Utils}

private[spark] class Client(
val args: ClientArguments,
Expand Down Expand Up @@ -161,6 +161,8 @@ private[spark] class Client(
reportLauncherState(SparkAppHandle.State.SUBMITTED)
launcherBackend.setAppId(appId.toString)

new CallerContext("CLIENT", Option(appId.toString)).setCurrentContext()

// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)

Expand Down