Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/io/numaproj/numaflow/batchmapper/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void onNext(MapOuterClass.MapRequest mapRequest) {
datumStream.writeMessage(constructHandlerDatum(mapRequest));
}
} catch (Exception e) {
log.error("Encountered an error in batch map onNext - {}", e.getMessage());
log.error("Encountered an error in batch map onNext", e);
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
Expand All @@ -108,7 +108,7 @@ public void onNext(MapOuterClass.MapRequest mapRequest) {
// Called when an error occurs
@Override
public void onError(Throwable throwable) {
log.error("Error Encountered in batchMap Stream - {}", throwable.getMessage());
log.error("Error Encountered in batchMap Stream", throwable);
shutdownSignal.completeExceptionally(throwable);
responseObserver.onError(Status.INTERNAL
.withDescription(throwable.getMessage())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ public static Props props(

@Override
public void preRestart(Throwable reason, Optional<Object> message) {
getContext().getSystem().log().warning("supervisor pre restart was executed due to: {}", reason.getMessage());
getContext()
.getSystem()
.log()
.warning("supervisor pre restart was executed due to: {}", reason.getMessage());
shutdownSignal.completeExceptionally(reason);
responseObserver.onError(Status.INTERNAL
.withDescription(reason.getMessage())
Expand Down Expand Up @@ -98,7 +101,7 @@ public Receive createReceive() {
}

private void handleFailure(Exception e) {
log.error("Encountered error in mapFn - {}", e.getMessage());
log.error("Encountered error in mapFn", e);
if (userException == null) {
userException = e;
// only send the very first exception to the client
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/numaproj/numaflow/mapstreamer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void onNext(MapOuterClass.MapRequest request) {
constructHandlerDatum(request),
new OutputObserverImpl(responseObserver));
} catch (Exception e) {
log.error("Encountered error in mapFn onNext - {}", e.getMessage());
log.error("Encountered error in mapFn onNext", e);
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
Expand All @@ -80,7 +80,7 @@ public void onNext(MapOuterClass.MapRequest request) {

@Override
public void onError(Throwable throwable) {
log.error("Encountered error in mapStream Stream - {}", throwable.getMessage());
log.error("Encountered error in mapStream Stream", throwable);
shutdownSignal.completeExceptionally(throwable);
responseObserver.onError(Status.INTERNAL
.withDescription(throwable.getMessage())
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/io/numaproj/numaflow/sinker/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,17 @@ public void onNext(SinkOuterClass.SinkRequest request) {
datumStream.writeMessage(constructHandlerDatum(request));
}
} catch (Exception e) {
log.error("Encountered error in sinkFn onNext - {}", e.getMessage());
log.error("Encountered error in sinkFn onNext", e);
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL.withDescription(e.getMessage()).asException());
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.asException());
}
}

@Override
public void onError(Throwable throwable) {
log.error("Encountered error in sinkFn - {}", throwable.getMessage());
log.error("Encountered error in sinkFn", throwable);
shutdownSignal.completeExceptionally(throwable);
responseObserver.onError(Status.INTERNAL
.withDescription(throwable.getMessage())
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/io/numaproj/numaflow/sourcer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,14 @@ public void onNext(SourceOuterClass.ReadRequest request) {
.setCode(SourceOuterClass.ReadResponse.Status.Code.SUCCESS)
.build();

SourceOuterClass.ReadResponse response = SourceOuterClass.ReadResponse.newBuilder()
SourceOuterClass.ReadResponse response = SourceOuterClass.ReadResponse
.newBuilder()
.setStatus(status)
.build();

responseObserver.onNext(response);
} catch (Exception e) {
log.error("Encountered error in readFn onNext - {}", e.getMessage());
log.error("Encountered error in readFn onNext", e);
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
Expand All @@ -88,7 +89,7 @@ public void onNext(SourceOuterClass.ReadRequest request) {

@Override
public void onError(Throwable t) {
log.error("Encountered error in readFn onNext - {}", t.getMessage());
log.error("Encountered error in readFn onNext", t);
shutdownSignal.completeExceptionally(t);
responseObserver.onError(Status.INTERNAL
.withDescription(t.getMessage())
Expand Down Expand Up @@ -152,7 +153,7 @@ public void onNext(SourceOuterClass.AckRequest request) {

responseObserver.onNext(response);
} catch (Exception e) {
log.error("Encountered error in ackFn onNext - {}", e.getMessage());
log.error("Encountered error in ackFn onNext", e);
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
Expand All @@ -163,7 +164,7 @@ public void onNext(SourceOuterClass.AckRequest request) {

@Override
public void onError(Throwable t) {
log.error("Encountered error in ackFn onNext - {}", t.getMessage());
log.error("Encountered error in ackFn onNext", t);
shutdownSignal.completeExceptionally(t);
responseObserver.onError(Status.INTERNAL
.withDescription(t.getMessage())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ public static Props props(
*/
@Override
public void preRestart(Throwable reason, Optional<Object> message) {
getContext().getSystem().log().warning("supervisor pre restart was executed due to: {}", reason.getMessage());
getContext()
.getSystem()
.log()
.warning("supervisor pre restart was executed due to: {}", reason.getMessage());
responseObserver.onError(Status.INTERNAL
.withDescription(reason.getMessage())
.withCause(reason)
Expand Down Expand Up @@ -136,7 +139,7 @@ public Receive createReceive() {
* @param e The exception to be handled.
*/
private void handleFailure(Exception e) {
log.error("Encountered error in sourceTransformFn - {}", e.getMessage());
log.error("Encountered error in sourceTransformFn", e);
if (userException == null) {
userException = e;
// only send the very first exception to the client
Expand Down