diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java index 0d320a19..a72a5bf3 100644 --- a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java +++ b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java @@ -96,7 +96,7 @@ public void onNext(MapOuterClass.MapRequest mapRequest) { datumStream.writeMessage(constructHandlerDatum(mapRequest)); } } catch (Exception e) { - log.error("Encountered an error in batch map onNext - {}", e.getMessage()); + log.error("Encountered an error in batch map onNext", e); shutdownSignal.completeExceptionally(e); responseObserver.onError(Status.INTERNAL .withDescription(e.getMessage()) @@ -108,7 +108,7 @@ public void onNext(MapOuterClass.MapRequest mapRequest) { // Called when an error occurs @Override public void onError(Throwable throwable) { - log.error("Error Encountered in batchMap Stream - {}", throwable.getMessage()); + log.error("Error Encountered in batchMap Stream", throwable); shutdownSignal.completeExceptionally(throwable); responseObserver.onError(Status.INTERNAL .withDescription(throwable.getMessage()) diff --git a/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java b/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java index 3ce663bd..26a8804e 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java @@ -70,7 +70,10 @@ public static Props props( @Override public void preRestart(Throwable reason, Optional message) { - getContext().getSystem().log().warning("supervisor pre restart was executed due to: {}", reason.getMessage()); + getContext() + .getSystem() + .log() + .warning("supervisor pre restart was executed due to: {}", reason.getMessage()); shutdownSignal.completeExceptionally(reason); responseObserver.onError(Status.INTERNAL .withDescription(reason.getMessage()) @@ -98,7 +101,7 @@ public Receive createReceive() { } private void handleFailure(Exception e) { - log.error("Encountered error in mapFn - {}", e.getMessage()); + log.error("Encountered error in mapFn", e); if (userException == null) { userException = e; // only send the very first exception to the client diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java b/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java index a0961439..0772f3c2 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java @@ -59,7 +59,7 @@ public void onNext(MapOuterClass.MapRequest request) { constructHandlerDatum(request), new OutputObserverImpl(responseObserver)); } catch (Exception e) { - log.error("Encountered error in mapFn onNext - {}", e.getMessage()); + log.error("Encountered error in mapFn onNext", e); shutdownSignal.completeExceptionally(e); responseObserver.onError(Status.INTERNAL .withDescription(e.getMessage()) @@ -80,7 +80,7 @@ public void onNext(MapOuterClass.MapRequest request) { @Override public void onError(Throwable throwable) { - log.error("Encountered error in mapStream Stream - {}", throwable.getMessage()); + log.error("Encountered error in mapStream Stream", throwable); shutdownSignal.completeExceptionally(throwable); responseObserver.onError(Status.INTERNAL .withDescription(throwable.getMessage()) diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java index 95f105b8..8a21548e 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Service.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java @@ -95,15 +95,17 @@ public void onNext(SinkOuterClass.SinkRequest request) { datumStream.writeMessage(constructHandlerDatum(request)); } } catch (Exception e) { - log.error("Encountered error in sinkFn onNext - {}", e.getMessage()); + log.error("Encountered error in sinkFn onNext", e); shutdownSignal.completeExceptionally(e); - responseObserver.onError(Status.INTERNAL.withDescription(e.getMessage()).asException()); + responseObserver.onError(Status.INTERNAL + .withDescription(e.getMessage()) + .asException()); } } @Override public void onError(Throwable throwable) { - log.error("Encountered error in sinkFn - {}", throwable.getMessage()); + log.error("Encountered error in sinkFn", throwable); shutdownSignal.completeExceptionally(throwable); responseObserver.onError(Status.INTERNAL .withDescription(throwable.getMessage()) diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Service.java b/src/main/java/io/numaproj/numaflow/sourcer/Service.java index c65d3c66..8d7850b7 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Service.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Service.java @@ -71,13 +71,14 @@ public void onNext(SourceOuterClass.ReadRequest request) { .setCode(SourceOuterClass.ReadResponse.Status.Code.SUCCESS) .build(); - SourceOuterClass.ReadResponse response = SourceOuterClass.ReadResponse.newBuilder() + SourceOuterClass.ReadResponse response = SourceOuterClass.ReadResponse + .newBuilder() .setStatus(status) .build(); responseObserver.onNext(response); } catch (Exception e) { - log.error("Encountered error in readFn onNext - {}", e.getMessage()); + log.error("Encountered error in readFn onNext", e); shutdownSignal.completeExceptionally(e); responseObserver.onError(Status.INTERNAL .withDescription(e.getMessage()) @@ -88,7 +89,7 @@ public void onNext(SourceOuterClass.ReadRequest request) { @Override public void onError(Throwable t) { - log.error("Encountered error in readFn onNext - {}", t.getMessage()); + log.error("Encountered error in readFn onNext", t); shutdownSignal.completeExceptionally(t); responseObserver.onError(Status.INTERNAL .withDescription(t.getMessage()) @@ -152,7 +153,7 @@ public void onNext(SourceOuterClass.AckRequest request) { responseObserver.onNext(response); } catch (Exception e) { - log.error("Encountered error in ackFn onNext - {}", e.getMessage()); + log.error("Encountered error in ackFn onNext", e); shutdownSignal.completeExceptionally(e); responseObserver.onError(Status.INTERNAL .withDescription(e.getMessage()) @@ -163,7 +164,7 @@ public void onNext(SourceOuterClass.AckRequest request) { @Override public void onError(Throwable t) { - log.error("Encountered error in ackFn onNext - {}", t.getMessage()); + log.error("Encountered error in ackFn onNext", t); shutdownSignal.completeExceptionally(t); responseObserver.onError(Status.INTERNAL .withDescription(t.getMessage()) diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java index ec37b4ed..36335898 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java @@ -96,7 +96,10 @@ public static Props props( */ @Override public void preRestart(Throwable reason, Optional message) { - getContext().getSystem().log().warning("supervisor pre restart was executed due to: {}", reason.getMessage()); + getContext() + .getSystem() + .log() + .warning("supervisor pre restart was executed due to: {}", reason.getMessage()); responseObserver.onError(Status.INTERNAL .withDescription(reason.getMessage()) .withCause(reason) @@ -136,7 +139,7 @@ public Receive createReceive() { * @param e The exception to be handled. */ private void handleFailure(Exception e) { - log.error("Encountered error in sourceTransformFn - {}", e.getMessage()); + log.error("Encountered error in sourceTransformFn", e); if (userException == null) { userException = e; // only send the very first exception to the client