Skip to content

Commit dc46b14

Browse files
pan3793LuciferYang
authored andcommitted
[SPARK-53064][CORE] Rewrite MDC LogKey in Java
### What changes were proposed in this pull request? This PR proposes to rewrite a few classes used by the Structured Logging API from Scala to Java. ### Why are the changes needed? Previously (before 3.5), modules under `common` were pure Java, and were easy to embed into other services, for example, the YARN External Shuffle Service will be run as a plugin of the YARN Resource Manager daemon process. With recent years' changes, some pure Java modules also require `scala-library` to be present at runtime, i.e., SPARK-52942 reports that YARN ESS causes YARN RM to fail to start due to missing `scala-library` in the classpath. Instead of bundling `scala-library` into YARN ESS jar, #51650 (comment) suggests making it scala-free again. This also makes Java's invocation of Structured Logging API much cleaner, now, it can be called without ugly `$.MODULE$`. ```patch - MDC.of(LogKeys.HOST_PORT$.MODULE$, address); + MDC.of(LogKeys.HOST_PORT, address); ``` ### Does this PR introduce _any_ user-facing change? No, they are internal APIs, for those plugin developers who want to provide custom `LogKey`s, there still possible to make it compatible with both Spark 4.0 and the new API proposed by this PR, see https://github.com/pan3793/SPARK-53064 ```java import org.apache.spark.internal.LogKey; // CUSTOM_LOG_KEY is compatible with both Spark 4.0 and SPARK-53064 public class JavaCustomLogKeys { // Custom `LogKey` must be `implements LogKey` public static class CUSTOM_LOG_KEY implements LogKey { Override public String name() { return "custom_log_key"; } } // Singleton public static final CUSTOM_LOG_KEY CUSTOM_LOG_KEY = new CUSTOM_LOG_KEY(); } ``` ### How was this patch tested? Pass GHA, and verified YARN ESS works without `scala-library`. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51775 from pan3793/SPARK-53064. Authored-by: Cheng Pan <[email protected]> Signed-off-by: yangjie01 <[email protected]>
1 parent d72e028 commit dc46b14

File tree

546 files changed

+1892
-1914
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

546 files changed

+1892
-1914
lines changed

common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,8 +364,8 @@ public void operationComplete(Future<? super Void> future) throws Exception {
364364
}
365365
} else {
366366
logger.error("Failed to send RPC {} to {}", future.cause(),
367-
MDC.of(LogKeys.REQUEST_ID$.MODULE$, requestId),
368-
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
367+
MDC.of(LogKeys.REQUEST_ID, requestId),
368+
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
369369
channel.close();
370370
try {
371371
String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,

common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,9 @@ public TransportClient createClient(String remoteHost, int remotePort, boolean f
193193
final String resolvMsg = resolvedAddress.isUnresolved() ? "failed" : "succeed";
194194
if (hostResolveTimeMs > 2000) {
195195
logger.warn("DNS resolution {} for {} took {} ms",
196-
MDC.of(LogKeys.STATUS$.MODULE$, resolvMsg),
197-
MDC.of(LogKeys.HOST_PORT$.MODULE$, resolvedAddress),
198-
MDC.of(LogKeys.TIME$.MODULE$, hostResolveTimeMs));
196+
MDC.of(LogKeys.STATUS, resolvMsg),
197+
MDC.of(LogKeys.HOST_PORT, resolvedAddress),
198+
MDC.of(LogKeys.TIME, hostResolveTimeMs));
199199
} else {
200200
logger.trace("DNS resolution {} for {} took {} ms",
201201
resolvMsg, resolvedAddress, hostResolveTimeMs);
@@ -210,7 +210,7 @@ public TransportClient createClient(String remoteHost, int remotePort, boolean f
210210
return cachedClient;
211211
} else {
212212
logger.info("Found inactive connection to {}, creating a new one.",
213-
MDC.of(LogKeys.HOST_PORT$.MODULE$, resolvedAddress));
213+
MDC.of(LogKeys.HOST_PORT, resolvedAddress));
214214
}
215215
}
216216
// If this connection should fast fail when last connection failed in last fast fail time
@@ -314,7 +314,7 @@ public void operationComplete(final Future<Channel> handshakeFuture) {
314314
logger.debug("{} successfully completed TLS handshake to ", address);
315315
} else {
316316
logger.info("failed to complete TLS handshake to {}", handshakeFuture.cause(),
317-
MDC.of(LogKeys.HOST_PORT$.MODULE$, address));
317+
MDC.of(LogKeys.HOST_PORT, address));
318318
cf.channel().close();
319319
}
320320
}
@@ -340,17 +340,17 @@ public void operationComplete(final Future<Channel> handshakeFuture) {
340340
} catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala
341341
long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000;
342342
logger.error("Exception while bootstrapping client after {} ms", e,
343-
MDC.of(LogKeys.BOOTSTRAP_TIME$.MODULE$, bootstrapTimeMs));
343+
MDC.of(LogKeys.BOOTSTRAP_TIME, bootstrapTimeMs));
344344
client.close();
345345
Throwables.throwIfUnchecked(e);
346346
throw new RuntimeException(e);
347347
}
348348
long postBootstrap = System.nanoTime();
349349

350350
logger.info("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",
351-
MDC.of(LogKeys.HOST_PORT$.MODULE$, address),
352-
MDC.of(LogKeys.ELAPSED_TIME$.MODULE$, (postBootstrap - preConnect) / 1000000),
353-
MDC.of(LogKeys.BOOTSTRAP_TIME$.MODULE$, (postBootstrap - preBootstrap) / 1000000));
351+
MDC.of(LogKeys.HOST_PORT, address),
352+
MDC.of(LogKeys.ELAPSED_TIME, (postBootstrap - preConnect) / 1000000),
353+
MDC.of(LogKeys.BOOTSTRAP_TIME, (postBootstrap - preBootstrap) / 1000000));
354354

355355
return client;
356356
}

common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,8 @@ public void channelInactive() {
145145
if (hasOutstandingRequests()) {
146146
String remoteAddress = getRemoteAddress(channel);
147147
logger.error("Still have {} requests outstanding when connection from {} is closed",
148-
MDC.of(LogKeys.COUNT$.MODULE$, numOutstandingRequests()),
149-
MDC.of(LogKeys.HOST_PORT$.MODULE$, remoteAddress));
148+
MDC.of(LogKeys.COUNT, numOutstandingRequests()),
149+
MDC.of(LogKeys.HOST_PORT, remoteAddress));
150150
failOutstandingRequests(new IOException("Connection from " + remoteAddress + " closed"));
151151
}
152152
}
@@ -156,8 +156,8 @@ public void exceptionCaught(Throwable cause) {
156156
if (hasOutstandingRequests()) {
157157
String remoteAddress = getRemoteAddress(channel);
158158
logger.error("Still have {} requests outstanding when connection from {} is closed",
159-
MDC.of(LogKeys.COUNT$.MODULE$, numOutstandingRequests()),
160-
MDC.of(LogKeys.HOST_PORT$.MODULE$, remoteAddress));
159+
MDC.of(LogKeys.COUNT, numOutstandingRequests()),
160+
MDC.of(LogKeys.HOST_PORT, remoteAddress));
161161
failOutstandingRequests(cause);
162162
}
163163
}
@@ -168,8 +168,8 @@ public void handle(ResponseMessage message) throws Exception {
168168
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
169169
if (listener == null) {
170170
logger.warn("Ignoring response for block {} from {} since it is not outstanding",
171-
MDC.of(LogKeys.STREAM_CHUNK_ID$.MODULE$, resp.streamChunkId),
172-
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
171+
MDC.of(LogKeys.STREAM_CHUNK_ID, resp.streamChunkId),
172+
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
173173
resp.body().release();
174174
} else {
175175
outstandingFetches.remove(resp.streamChunkId);
@@ -180,9 +180,9 @@ public void handle(ResponseMessage message) throws Exception {
180180
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
181181
if (listener == null) {
182182
logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
183-
MDC.of(LogKeys.STREAM_CHUNK_ID$.MODULE$, resp.streamChunkId),
184-
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)),
185-
MDC.of(LogKeys.ERROR$.MODULE$, resp.errorString));
183+
MDC.of(LogKeys.STREAM_CHUNK_ID, resp.streamChunkId),
184+
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)),
185+
MDC.of(LogKeys.ERROR, resp.errorString));
186186
} else {
187187
outstandingFetches.remove(resp.streamChunkId);
188188
listener.onFailure(resp.streamChunkId.chunkIndex(), new ChunkFetchFailureException(
@@ -192,9 +192,9 @@ public void handle(ResponseMessage message) throws Exception {
192192
RpcResponseCallback listener = (RpcResponseCallback) outstandingRpcs.get(resp.requestId);
193193
if (listener == null) {
194194
logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
195-
MDC.of(LogKeys.REQUEST_ID$.MODULE$, resp.requestId),
196-
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)),
197-
MDC.of(LogKeys.RESPONSE_BODY_SIZE$.MODULE$, resp.body().size()));
195+
MDC.of(LogKeys.REQUEST_ID, resp.requestId),
196+
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)),
197+
MDC.of(LogKeys.RESPONSE_BODY_SIZE, resp.body().size()));
198198
resp.body().release();
199199
} else {
200200
outstandingRpcs.remove(resp.requestId);
@@ -208,9 +208,9 @@ public void handle(ResponseMessage message) throws Exception {
208208
BaseResponseCallback listener = outstandingRpcs.get(resp.requestId);
209209
if (listener == null) {
210210
logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
211-
MDC.of(LogKeys.REQUEST_ID$.MODULE$, resp.requestId),
212-
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)),
213-
MDC.of(LogKeys.ERROR$.MODULE$, resp.errorString));
211+
MDC.of(LogKeys.REQUEST_ID, resp.requestId),
212+
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)),
213+
MDC.of(LogKeys.ERROR, resp.errorString));
214214
} else {
215215
outstandingRpcs.remove(resp.requestId);
216216
listener.onFailure(new RuntimeException(resp.errorString));
@@ -222,9 +222,9 @@ public void handle(ResponseMessage message) throws Exception {
222222
if (listener == null) {
223223
logger.warn("Ignoring response for MergedBlockMetaRequest {} from {} ({} bytes) since "
224224
+ "it is not outstanding",
225-
MDC.of(LogKeys.REQUEST_ID$.MODULE$, resp.requestId),
226-
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)),
227-
MDC.of(LogKeys.RESPONSE_BODY_SIZE$.MODULE$, resp.body().size()));
225+
MDC.of(LogKeys.REQUEST_ID, resp.requestId),
226+
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)),
227+
MDC.of(LogKeys.RESPONSE_BODY_SIZE, resp.body().size()));
228228
} else {
229229
outstandingRpcs.remove(resp.requestId);
230230
listener.onSuccess(resp.getNumChunks(), resp.body());
@@ -269,7 +269,7 @@ public void handle(ResponseMessage message) throws Exception {
269269
}
270270
} else {
271271
logger.warn("Stream failure with unknown callback: {}",
272-
MDC.of(LogKeys.ERROR$.MODULE$, resp.error));
272+
MDC.of(LogKeys.ERROR, resp.error));
273273
}
274274
} else {
275275
throw new IllegalStateException("Unknown response type: " + message.type());

common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ protected boolean doAuthChallenge(
9393
} catch (RuntimeException e) {
9494
if (conf.saslFallback()) {
9595
LOG.warn("Failed to parse new auth challenge, reverting to SASL for client {}.",
96-
MDC.of(LogKeys.HOST_PORT$.MODULE$, channel.remoteAddress()));
96+
MDC.of(LogKeys.HOST_PORT, channel.remoteAddress()));
9797
saslHandler = new SaslRpcHandler(conf, channel, null, secretKeyHolder);
9898
message.position(position);
9999
message.limit(limit);

common/network-common/src/main/java/org/apache/spark/network/protocol/MessageEncoder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) thro
6666
// Re-encode this message as a failure response.
6767
String error = e.getMessage() != null ? e.getMessage() : "null";
6868
logger.error("Error processing {} for client {}", e,
69-
MDC.of(LogKeys.MESSAGE$.MODULE$, in),
70-
MDC.of(LogKeys.HOST_PORT$.MODULE$, ctx.channel().remoteAddress()));
69+
MDC.of(LogKeys.MESSAGE, in),
70+
MDC.of(LogKeys.HOST_PORT, ctx.channel().remoteAddress()));
7171
encode(ctx, resp.createFailureResponse(error), out);
7272
} else {
7373
throw e;

common/network-common/src/main/java/org/apache/spark/network/protocol/SslMessageEncoder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) thro
7171
// Re-encode this message as a failure response.
7272
String error = e.getMessage() != null ? e.getMessage() : "null";
7373
logger.error("Error processing {} for client {}", e,
74-
MDC.of(LogKeys.MESSAGE$.MODULE$, in),
75-
MDC.of(LogKeys.HOST_PORT$.MODULE$, ctx.channel().remoteAddress()));
74+
MDC.of(LogKeys.MESSAGE, in),
75+
MDC.of(LogKeys.HOST_PORT, ctx.channel().remoteAddress()));
7676
encode(ctx, resp.createFailureResponse(error), out);
7777
} else {
7878
throw e;

common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public ChunkFetchRequestHandler(
7474
@Override
7575
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
7676
logger.warn("Exception in connection from {}", cause,
77-
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(ctx.channel())));
77+
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(ctx.channel())));
7878
ctx.close();
7979
}
8080

@@ -96,8 +96,8 @@ public void processFetchRequest(
9696
long chunksBeingTransferred = streamManager.chunksBeingTransferred();
9797
if (chunksBeingTransferred >= maxChunksBeingTransferred) {
9898
logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
99-
MDC.of(LogKeys.NUM_CHUNKS$.MODULE$, chunksBeingTransferred),
100-
MDC.of(LogKeys.MAX_NUM_CHUNKS$.MODULE$, maxChunksBeingTransferred));
99+
MDC.of(LogKeys.NUM_CHUNKS, chunksBeingTransferred),
100+
MDC.of(LogKeys.MAX_NUM_CHUNKS, maxChunksBeingTransferred));
101101
channel.close();
102102
return;
103103
}
@@ -111,8 +111,8 @@ public void processFetchRequest(
111111
}
112112
} catch (Exception e) {
113113
logger.error("Error opening block {} for request from {}", e,
114-
MDC.of(LogKeys.STREAM_CHUNK_ID$.MODULE$, msg.streamChunkId),
115-
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
114+
MDC.of(LogKeys.STREAM_CHUNK_ID, msg.streamChunkId),
115+
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
116116
respond(channel, new ChunkFetchFailure(msg.streamChunkId,
117117
Throwables.getStackTraceAsString(e)));
118118
return;
@@ -153,8 +153,8 @@ private ChannelFuture respond(
153153
} else {
154154
logger.error("Error sending result {} to {}; closing connection",
155155
future.cause(),
156-
MDC.of(LogKeys.RESULT$.MODULE$, result),
157-
MDC.of(LogKeys.HOST_PORT$.MODULE$, remoteAddress));
156+
MDC.of(LogKeys.RESULT, result),
157+
MDC.of(LogKeys.HOST_PORT, remoteAddress));
158158
channel.close();
159159
}
160160
});

common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public TransportClient getClient() {
8888
@Override
8989
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
9090
logger.warn("Exception in connection from {}", cause,
91-
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(ctx.channel())));
91+
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(ctx.channel())));
9292
requestHandler.exceptionCaught(cause);
9393
responseHandler.exceptionCaught(cause);
9494
ctx.close();
@@ -168,9 +168,9 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
168168
logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
169169
"requests. Assuming connection is dead; please adjust" +
170170
" spark.{}.io.connectionTimeout if this is wrong.",
171-
MDC.of(LogKeys.HOST_PORT$.MODULE$, address),
172-
MDC.of(LogKeys.TIMEOUT$.MODULE$, requestTimeoutNs / 1000 / 1000),
173-
MDC.of(LogKeys.MODULE_NAME$.MODULE$, transportContext.getConf().getModuleName()));
171+
MDC.of(LogKeys.HOST_PORT, address),
172+
MDC.of(LogKeys.TIMEOUT, requestTimeoutNs / 1000 / 1000),
173+
MDC.of(LogKeys.MODULE_NAME, transportContext.getConf().getModuleName()));
174174
client.timeOut();
175175
ctx.close();
176176
} else if (closeIdleConnections) {

common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@ private void processStreamRequest(final StreamRequest req) {
132132
long chunksBeingTransferred = streamManager.chunksBeingTransferred();
133133
if (chunksBeingTransferred >= maxChunksBeingTransferred) {
134134
logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
135-
MDC.of(LogKeys.NUM_CHUNKS$.MODULE$, chunksBeingTransferred),
136-
MDC.of(LogKeys.MAX_NUM_CHUNKS$.MODULE$, maxChunksBeingTransferred));
135+
MDC.of(LogKeys.NUM_CHUNKS, chunksBeingTransferred),
136+
MDC.of(LogKeys.MAX_NUM_CHUNKS, maxChunksBeingTransferred));
137137
channel.close();
138138
return;
139139
}
@@ -143,8 +143,8 @@ private void processStreamRequest(final StreamRequest req) {
143143
buf = streamManager.openStream(req.streamId);
144144
} catch (Exception e) {
145145
logger.error("Error opening stream {} for request from {}", e,
146-
MDC.of(LogKeys.STREAM_ID$.MODULE$, req.streamId),
147-
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
146+
MDC.of(LogKeys.STREAM_ID, req.streamId),
147+
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
148148
respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
149149
return;
150150
}
@@ -177,8 +177,8 @@ public void onFailure(Throwable e) {
177177
});
178178
} catch (Exception e) {
179179
logger.error("Error while invoking RpcHandler#receive() on RPC id {} from {}", e,
180-
MDC.of(LogKeys.REQUEST_ID$.MODULE$, req.requestId),
181-
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
180+
MDC.of(LogKeys.REQUEST_ID, req.requestId),
181+
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
182182
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
183183
} finally {
184184
req.body().release();
@@ -264,8 +264,8 @@ public String getID() {
264264
new NioManagedBuffer(blockPushNonFatalFailure.getResponse())));
265265
} else {
266266
logger.error("Error while invoking RpcHandler#receive() on RPC id {} from {}", e,
267-
MDC.of(LogKeys.REQUEST_ID$.MODULE$, req.requestId),
268-
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
267+
MDC.of(LogKeys.REQUEST_ID, req.requestId),
268+
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
269269
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
270270
}
271271
// We choose to totally fail the channel, rather than trying to recover as we do in other
@@ -282,7 +282,7 @@ private void processOneWayMessage(OneWayMessage req) {
282282
rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
283283
} catch (Exception e) {
284284
logger.error("Error while invoking RpcHandler#receive() for one-way message from {}.", e,
285-
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
285+
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
286286
} finally {
287287
req.body().release();
288288
}
@@ -307,10 +307,10 @@ public void onFailure(Throwable e) {
307307
});
308308
} catch (Exception e) {
309309
logger.error("Error while invoking receiveMergeBlockMetaReq() for appId {} shuffleId {} "
310-
+ "reduceId {} from {}", e, MDC.of(LogKeys.APP_ID$.MODULE$, req.appId),
311-
MDC.of(LogKeys.SHUFFLE_ID$.MODULE$, req.shuffleId),
312-
MDC.of(LogKeys.REDUCE_ID$.MODULE$, req.reduceId),
313-
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
310+
+ "reduceId {} from {}", e, MDC.of(LogKeys.APP_ID, req.appId),
311+
MDC.of(LogKeys.SHUFFLE_ID, req.shuffleId),
312+
MDC.of(LogKeys.REDUCE_ID, req.reduceId),
313+
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
314314
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
315315
}
316316
}
@@ -326,8 +326,8 @@ private ChannelFuture respond(Encodable result) {
326326
logger.trace("Sent result {} to client {}", result, remoteAddress);
327327
} else {
328328
logger.error("Error sending result {} to {}; closing connection", future.cause(),
329-
MDC.of(LogKeys.RESULT$.MODULE$, result),
330-
MDC.of(LogKeys.HOST_PORT$.MODULE$, remoteAddress));
329+
MDC.of(LogKeys.RESULT, result),
330+
MDC.of(LogKeys.HOST_PORT, remoteAddress));
331331
channel.close();
332332
}
333333
});

common/network-common/src/main/java/org/apache/spark/network/util/LevelDBProvider.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public static DB initLevelDB(File dbFile, StoreVersion version, ObjectMapper map
5050
tmpDb = JniDBFactory.factory.open(dbFile, options);
5151
} catch (NativeDB.DBException e) {
5252
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
53-
logger.info("Creating state database at {}", MDC.of(LogKeys.PATH$.MODULE$, dbFile));
53+
logger.info("Creating state database at {}", MDC.of(LogKeys.PATH, dbFile));
5454
options.createIfMissing(true);
5555
try {
5656
tmpDb = JniDBFactory.factory.open(dbFile, options);
@@ -61,16 +61,16 @@ public static DB initLevelDB(File dbFile, StoreVersion version, ObjectMapper map
6161
// the leveldb file seems to be corrupt somehow. Lets just blow it away and create a new
6262
// one, so we can keep processing new apps
6363
logger.error("error opening leveldb file {}. Creating new file, will not be able to " +
64-
"recover state for existing applications", e, MDC.of(LogKeys.PATH$.MODULE$, dbFile));
64+
"recover state for existing applications", e, MDC.of(LogKeys.PATH, dbFile));
6565
if (dbFile.isDirectory()) {
6666
for (File f : dbFile.listFiles()) {
6767
if (!f.delete()) {
68-
logger.warn("error deleting {}", MDC.of(LogKeys.PATH$.MODULE$, f.getPath()));
68+
logger.warn("error deleting {}", MDC.of(LogKeys.PATH, f.getPath()));
6969
}
7070
}
7171
}
7272
if (!dbFile.delete()) {
73-
logger.warn("error deleting {}", MDC.of(LogKeys.PATH$.MODULE$, dbFile.getPath()));
73+
logger.warn("error deleting {}", MDC.of(LogKeys.PATH, dbFile.getPath()));
7474
}
7575
options.createIfMissing(true);
7676
try {

0 commit comments

Comments
 (0)