diff --git a/src/main/java/EvenOddFunction.java b/examples/src/main/java/io/numaproj/numaflow/examples/map/evenodd/EvenOddFunction.java similarity index 100% rename from src/main/java/EvenOddFunction.java rename to examples/src/main/java/io/numaproj/numaflow/examples/map/evenodd/EvenOddFunction.java diff --git a/src/main/java/io/numaproj/numaflow/sinker/Constants.java b/src/main/java/io/numaproj/numaflow/sinker/Constants.java index 87a8bb4e..8f99ec04 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Constants.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Constants.java @@ -3,11 +3,19 @@ class Constants { public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/sink.sock"; + public static final String DEFAULT_FB_SINK_SOCKET_PATH = "/var/run/numaflow/fb-sink.sock"; + public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sinker-server-info"; + public static final String DEFAULT_FB_SERVER_INFO_FILE_PATH = "/var/run/numaflow/fb-sinker-server-info"; + public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64; public static final int DEFAULT_PORT = 50051; public static final String DEFAULT_HOST = "localhost"; + + public static final String ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE"; + + public static final String UD_CONTAINER_FALLBACK_SINK = "fb-udsink"; } diff --git a/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java index 4c7ec956..381ebca3 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java @@ -27,10 +27,20 @@ public class GRPCConfig { * Static method to create default GRPCConfig. */ static GRPCConfig defaultGrpcConfig() { + String containerType = System.getenv(Constants.ENV_UD_CONTAINER_TYPE); + String socketPath = Constants.DEFAULT_SOCKET_PATH; + String infoFilePath = Constants.DEFAULT_SERVER_INFO_FILE_PATH; + + // if containerType is fb-udsink then we need to use fb sink socket path and info file path + if (Constants.UD_CONTAINER_FALLBACK_SINK.equals(containerType)) { + socketPath = Constants.DEFAULT_FB_SINK_SOCKET_PATH; + infoFilePath = Constants.DEFAULT_FB_SERVER_INFO_FILE_PATH; + } return GRPCConfig.newBuilder() - .infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH) + .infoFilePath(infoFilePath) .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) - .isLocal(System.getenv("NUMAFLOW_POD") == null) // if NUMAFLOW_POD is not set, then we are not running using numaflow - .socketPath(Constants.DEFAULT_SOCKET_PATH).build(); + .isLocal(containerType == null) // if ENV_UD_CONTAINER_TYPE is not set, then we are not running using numaflow + .socketPath(socketPath) + .build(); } } diff --git a/src/main/java/io/numaproj/numaflow/sinker/Response.java b/src/main/java/io/numaproj/numaflow/sinker/Response.java index eb87aa11..e185669a 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Response.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Response.java @@ -16,6 +16,7 @@ public class Response { private final String id; private final Boolean success; private final String err; + private final Boolean fallback; /** * Static method to create response for successful message processing. @@ -25,7 +26,7 @@ public class Response { * @return Response object with success status */ public static Response responseOK(String id) { - return new Response(id, true, null); + return new Response(id, true, null, false); } /** @@ -37,6 +38,19 @@ public static Response responseOK(String id) { * @return Response object with failure status and error message */ public static Response responseFailure(String id, String errMsg) { - return new Response(id, false, errMsg); + return new Response(id, false, errMsg, false); + } + + + /** + * Static method to create response for fallback message. + * This indicates that the message should be sent to the fallback sink. + * + * @param id id of the message + * + * @return Response object with fallback status + */ + public static Response responseFallback(String id) { + return new Response(id, false, null, true); } } diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java index 53e82112..ff04eadb 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Service.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java @@ -120,10 +120,12 @@ private HandlerDatum constructHandlerDatum(SinkOuterClass.SinkRequest d) { public SinkOuterClass.SinkResponse buildResponseList(ResponseList responses) { var responseBuilder = SinkOuterClass.SinkResponse.newBuilder(); responses.getResponses().forEach(response -> { + SinkOuterClass.Status status = response.getFallback() ? SinkOuterClass.Status.FALLBACK : + response.getSuccess() ? SinkOuterClass.Status.SUCCESS : SinkOuterClass.Status.FAILURE; responseBuilder.addResults(SinkOuterClass.SinkResponse.Result.newBuilder() .setId(response.getId() == null ? "" : response.getId()) .setErrMsg(response.getErr() == null ? "" : response.getErr()) - .setSuccess(response.getSuccess()) + .setStatus(status) .build()); }); return responseBuilder.build(); diff --git a/src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java b/src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java index 0dabf910..1c3b7f24 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java +++ b/src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java @@ -179,12 +179,14 @@ public void onCompleted() { ResponseList.ResponseListBuilder responseListBuilder = ResponseList.newBuilder(); for (SinkOuterClass.SinkResponse.Result result : response.getResultsList()) { - if (result.getSuccess()) { + if (result.getStatus() == SinkOuterClass.Status.SUCCESS) { responseListBuilder.addResponse(Response.responseOK(result.getId())); + } else if (result.getStatus() == SinkOuterClass.Status.FALLBACK) { + responseListBuilder.addResponse(Response.responseFallback( + result.getId())); } else { responseListBuilder.addResponse(Response.responseFailure( - result.getId(), - result.getErrMsg())); + result.getId(), result.getErrMsg())); } } diff --git a/src/main/proto/sink/v1/sink.proto b/src/main/proto/sink/v1/sink.proto index d378bcba..1dde5c1e 100644 --- a/src/main/proto/sink/v1/sink.proto +++ b/src/main/proto/sink/v1/sink.proto @@ -34,6 +34,15 @@ message ReadyResponse { bool ready = 1; } +/* + * Status is the status of the response. + */ +enum Status { + SUCCESS = 0; + FAILURE = 1; + FALLBACK = 2; +} + /** * SinkResponse is the individual response of each message written to the sink. */ @@ -41,8 +50,8 @@ message SinkResponse { message Result { // id is the ID of the message, can be used to uniquely identify the message. string id = 1; - // success denotes the status of persisting to disk. if set to false, it means writing to sink for the message failed. - bool success = 2; + // status denotes the status of persisting to sink. It can be SUCCESS, FAILURE, or FALLBACK. + Status status = 2; // err_msg is the error message, set it if success is set to false. string err_msg = 3; }