Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void onNext(MapOuterClass.MapRequest mapRequest) {
// Build gRPC Status
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ExceptionUtils.ERR_BATCH_MAP_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private void handleFailure(Exception e) {
// Build gRPC Status
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ExceptionUtils.ERR_MAP_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/numaproj/numaflow/servingstore/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
// Build gRPC Status
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ExceptionUtils.ERR_SERVING_STORE_EXCEPTION + ": "
.setMessage(ExceptionUtils.getExceptionErrorString() + ": "
+ (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
Expand Down Expand Up @@ -78,7 +78,7 @@
// Build gRPC Status
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ExceptionUtils.ERR_SERVING_STORE_EXCEPTION + ": "
.setMessage(ExceptionUtils.getExceptionErrorString() + ": "

Check warning on line 81 in src/main/java/io/numaproj/numaflow/servingstore/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/servingstore/Service.java#L81

Added line #L81 was not covered by tests
+ (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
Expand Down
28 changes: 17 additions & 11 deletions src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,20 @@

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Objects;

public class ExceptionUtils {
/**
* Formalized exception error strings
*/
public static final String ERR_SOURCE_EXCEPTION = "UDF_EXECUTION_ERROR(source)";
public static final String ERR_TRANSFORMER_EXCEPTION = "UDF_EXECUTION_ERROR(transformer)";
public static final String ERR_SINK_EXCEPTION = "UDF_EXECUTION_ERROR(sink)";
public static final String ERR_MAP_STREAM_EXCEPTION = "UDF_EXECUTION_ERROR(mapstream)";
public static final String ERR_MAP_EXCEPTION = "UDF_EXECUTION_ERROR(map)";
public static final String ERR_BATCH_MAP_EXCEPTION = "UDF_EXECUTION_ERROR(batchmap)";
public static final String ERR_SERVING_STORE_EXCEPTION = "UDF_EXECUTION_ERROR(servingstore)";

/**
* UD Container Type Environment Variable
*/
public static final String ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE";
public static final String CONTAINER_NAME = System.getenv(ENV_UD_CONTAINER_TYPE);

/**
* Converts the stack trace of an exception into a String.
*
* @param e the exception to extract the stack trace from
* @param t the exception to extract the stack trace from
*
* @return the stack trace as a String
*/
Expand All @@ -30,4 +27,13 @@ public static String getStackTrace(Throwable t) {
t.printStackTrace(new PrintWriter(sw));
return sw.toString();
}

/**
* Returns a formalized exception error string.
*
* @return the formalized exception error string
*/
public static String getExceptionErrorString() {
return "UDF_EXECUTION_ERROR(" + Objects.requireNonNullElse(CONTAINER_NAME, "unknown-container") + ")";
}
}
2 changes: 1 addition & 1 deletion src/main/java/io/numaproj/numaflow/sinker/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void onNext(SinkOuterClass.SinkRequest request) {
// Build gRPC Status
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ExceptionUtils.ERR_SINK_EXCEPTION + ": "
.setMessage(ExceptionUtils.getExceptionErrorString() + ": "
+ (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/numaproj/numaflow/sourcer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void onNext(SourceOuterClass.ReadRequest request) {
// Build gRPC Status
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ExceptionUtils.ERR_SOURCE_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private void handleFailure(Exception e) {
// Build gRPC Status
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ExceptionUtils.ERR_TRANSFORMER_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.grpc.testing.GrpcCleanupRule;
import io.numaproj.numaflow.map.v1.MapGrpc;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import io.numaproj.numaflow.shared.ExceptionUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -87,7 +88,7 @@ public void testErrorFromUDF() {
outputStreamObserver.done.get();
fail("Expected exception not thrown");
} catch (InterruptedException | ExecutionException e) {
String expectedSubstring = "UDF_EXECUTION_ERROR(batchmap)";
String expectedSubstring = ExceptionUtils.getExceptionErrorString();
String actualMessage = e.getMessage();
assertNotNull("Error message should not be null", actualMessage);
assertTrue("Expected substring '" + expectedSubstring + "' not found in error message: " + actualMessage,
Expand Down
3 changes: 2 additions & 1 deletion src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.grpc.testing.GrpcCleanupRule;
import io.numaproj.numaflow.map.v1.MapGrpc;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import io.numaproj.numaflow.shared.ExceptionUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -127,7 +128,7 @@ public void testMapperFailure() {
fail("Expected exception not thrown");
} catch (Exception e) {
assertEquals(
"io.grpc.StatusRuntimeException: INTERNAL: UDF_EXECUTION_ERROR(map): unknown exception",
"io.grpc.StatusRuntimeException: INTERNAL: " + ExceptionUtils.getExceptionErrorString() + ": unknown exception",
e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import io.numaproj.numaflow.shared.ExceptionUtils;

public class ServerErrTest {

Expand Down Expand Up @@ -53,7 +54,7 @@ public void testServingStoreError() {
.build());
fail("Expected an exception to be thrown");
} catch (Exception e) {
assertEquals("INTERNAL: UDF_EXECUTION_ERROR(servingstore): unknown exception", e.getMessage());
assertEquals("INTERNAL: " + ExceptionUtils.getExceptionErrorString() + ": unknown exception", e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.grpc.testing.GrpcCleanupRule;
import io.numaproj.numaflow.sourcetransformer.v1.SourceTransformGrpc;
import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer;
import io.numaproj.numaflow.shared.ExceptionUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -88,7 +89,7 @@ public void testSourceTransformerFailure() {
responseObserver.done.get();
fail("Expected exception not thrown");
} catch (Exception e) {
String expectedSubstring = "UDF_EXECUTION_ERROR(transformer)";
String expectedSubstring = ExceptionUtils.getExceptionErrorString();
String actualMessage = e.getMessage();
assertNotNull("Error message should not be null", actualMessage);
assertTrue("Expected substring '" + expectedSubstring + "' not found in error message: " + actualMessage,
Expand Down
Loading