[SPARK-47577][CORE][PART2] Migrate logError with variables to structured logging framework #45890
[SPARK-47577][CORE][PART2] Migrate logError with variables to structured logging framework #45890gengliangwang wants to merge 4 commits into
Conversation
|
cc @dtenedor @panbingkun @pan3793 @itholic as well |
| val SHUFFLE_ID = Value | ||
| val SHUFFLE_MERGE_ID = Value | ||
| val SIZE = Value | ||
| val SLEEP_TIME_SECONDS = Value |
There was a problem hiding this comment.
Do we need the suffix _SECONDS?
Will we encounter SLEEP_TIME_MILLISECONDS like this in the future.
There was a problem hiding this comment.
We can handle this when there is SLEEP_TIME_MILLISECONDS
| if (sc != null) { | ||
| logError(s"uncaught error in thread $currentThreadName, stopping SparkContext", t) | ||
| logError( | ||
| log"uncaught error in thread ${MDC(THREAD, currentThreadName)}, stopping SparkContext", |
There was a problem hiding this comment.
Will we have THREAD_ID in the future? THREAD_NAME?
There was a problem hiding this comment.
Good point, I will update this.
| } | ||
| if (!NonFatal(t)) { | ||
| logError(s"throw uncaught fatal error in thread $currentThreadName", t) | ||
| logError(log"throw uncaught fatal error in thread ${MDC(THREAD, currentThreadName)}", t) |
| case NonFatal(t) => | ||
| logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) | ||
| logError( | ||
| log"Uncaught exception in thread ${MDC(THREAD, Thread.currentThread().getName)}", t) |
| exception: Throwable): Unit = { | ||
| logError(s"Failed to get the meta of push-merged block for ($shuffleId, $reduceId) " + | ||
| s"from ${req.address.host}:${req.address.port}", exception) | ||
| logError(log"Failed to get the meta of push-merged block for " + |
There was a problem hiding this comment.
It's not related to this PR, do we need shuffleMergeId in the log?
There was a problem hiding this comment.
Well let's just keep it.
| s"unrecoverable fetch failures! Most likely this means user code is incorrectly " + | ||
| s"swallowing Spark's internal ${classOf[FetchFailedException]}", fetchFailure) | ||
| logError(log"${LogMDC(TASK_NAME, taskName)} completed successfully though internally " + | ||
| log"it encountered unrecoverable fetch failures! Most likely this means user code " + |
There was a problem hiding this comment.
Can we correct the original appearance of the two whitespaces before Most?
| if (failures < executorStateSyncMaxAttempts) { | ||
| logError(s"Failed to send $newState to Master $masterRef, " + | ||
| s"will retry ($failures/$executorStateSyncMaxAttempts).", t) | ||
| logError(log"Failed to send ${MDC(EXECUTOR_STATE_CHANGED, newState)}" + |
There was a problem hiding this comment.
I'm not sure if EXECUTOR_STATE_CHANGE or EXECUTOR_STATE is better
dongjoon-hyun
left a comment
There was a problem hiding this comment.
nit, the PR description seems to be wrong to me, @gengliangwang . logInfo is a typo, right? :)
|
@dongjoon-hyun Yes, updated :) |
|
Thank you! Please add the existing comments first, @gengliangwang . |
|
nit: pr description says "part1" which is inconsistent with the pr title :) |
|
@xinrong-meng Thanks, I updated the PR description. Seems I made two mistakes when creating a PR at night time... |
|
@panbingkun @dongjoon-hyun @xinrong-meng Thanks for the review, merging to master |

What changes were proposed in this pull request?
Migrate logError with variables of core module to structured logging framework. This is part2 which transforms the logError entries of the following API
to
migration Part1 was in #45834
Why are the changes needed?
To enhance Apache Spark's logging system by implementing structured logging.
Does this PR introduce any user-facing change?
Yes, Spark core logs will contain additional MDC
How was this patch tested?
Compiler and scala style checks, as well as code review.
Was this patch authored or co-authored using generative AI tooling?
No