Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/main/java/io/numaproj/numaflow/sinker/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,19 @@
class Constants {
public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/sink.sock";

public static final String DEFAULT_FB_SINK_SOCKET_PATH = "/var/run/numaflow/fb-sink.sock";

public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sinker-server-info";

public static final String DEFAULT_FB_SERVER_INFO_FILE_PATH = "/var/run/numaflow/fb-sinker-server-info";

public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;

public static final int DEFAULT_PORT = 50051;

public static final String DEFAULT_HOST = "localhost";

public static final String ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE";

public static final String UD_CONTAINER_FALLBACK_SINK = "fb-udsink";
}
16 changes: 13 additions & 3 deletions src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,20 @@ public class GRPCConfig {
* Static method to create default GRPCConfig.
*/
static GRPCConfig defaultGrpcConfig() {
String containerType = System.getenv(Constants.ENV_UD_CONTAINER_TYPE);
String socketPath = Constants.DEFAULT_SOCKET_PATH;
String infoFilePath = Constants.DEFAULT_SERVER_INFO_FILE_PATH;

// if containerType is fb-udsink then we need to use fb sink socket path and info file path
if (Constants.UD_CONTAINER_FALLBACK_SINK.equals(containerType)) {
socketPath = Constants.DEFAULT_FB_SINK_SOCKET_PATH;
infoFilePath = Constants.DEFAULT_FB_SERVER_INFO_FILE_PATH;
}
return GRPCConfig.newBuilder()
.infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH)
.infoFilePath(infoFilePath)
.maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE)
.isLocal(System.getenv("NUMAFLOW_POD") == null) // if NUMAFLOW_POD is not set, then we are not running using numaflow
.socketPath(Constants.DEFAULT_SOCKET_PATH).build();
.isLocal(containerType == null) // if ENV_UD_CONTAINER_TYPE is not set, then we are not running using numaflow
.socketPath(socketPath)
.build();
}
}
18 changes: 16 additions & 2 deletions src/main/java/io/numaproj/numaflow/sinker/Response.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class Response {
private final String id;
private final Boolean success;
private final String err;
private final Boolean fallback;

/**
* Static method to create response for successful message processing.
Expand All @@ -25,7 +26,7 @@ public class Response {
* @return Response object with success status
*/
public static Response responseOK(String id) {
return new Response(id, true, null);
return new Response(id, true, null, false);
}

/**
Expand All @@ -37,6 +38,19 @@ public static Response responseOK(String id) {
* @return Response object with failure status and error message
*/
public static Response responseFailure(String id, String errMsg) {
return new Response(id, false, errMsg);
return new Response(id, false, errMsg, false);
}


/**
* Static method to create response for fallback message.
* This indicates that the message should be sent to the fallback sink.
*
* @param id id of the message
*
* @return Response object with fallback status
*/
public static Response responseFallback(String id) {
return new Response(id, false, null, true);
}
}
4 changes: 3 additions & 1 deletion src/main/java/io/numaproj/numaflow/sinker/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,12 @@ private HandlerDatum constructHandlerDatum(SinkOuterClass.SinkRequest d) {
public SinkOuterClass.SinkResponse buildResponseList(ResponseList responses) {
var responseBuilder = SinkOuterClass.SinkResponse.newBuilder();
responses.getResponses().forEach(response -> {
SinkOuterClass.Status status = response.getFallback() ? SinkOuterClass.Status.FALLBACK :
response.getSuccess() ? SinkOuterClass.Status.SUCCESS : SinkOuterClass.Status.FAILURE;
responseBuilder.addResults(SinkOuterClass.SinkResponse.Result.newBuilder()
.setId(response.getId() == null ? "" : response.getId())
.setErrMsg(response.getErr() == null ? "" : response.getErr())
.setSuccess(response.getSuccess())
.setStatus(status)
.build());
});
return responseBuilder.build();
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,14 @@ public void onCompleted() {

ResponseList.ResponseListBuilder responseListBuilder = ResponseList.newBuilder();
for (SinkOuterClass.SinkResponse.Result result : response.getResultsList()) {
if (result.getSuccess()) {
if (result.getStatus() == SinkOuterClass.Status.SUCCESS) {
responseListBuilder.addResponse(Response.responseOK(result.getId()));
} else if (result.getStatus() == SinkOuterClass.Status.FALLBACK) {
responseListBuilder.addResponse(Response.responseFallback(
result.getId()));
} else {
responseListBuilder.addResponse(Response.responseFailure(
result.getId(),
result.getErrMsg()));
result.getId(), result.getErrMsg()));
}
}

Expand Down
13 changes: 11 additions & 2 deletions src/main/proto/sink/v1/sink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,24 @@ message ReadyResponse {
bool ready = 1;
}

/*
* Status is the status of the response.
*/
enum Status {
SUCCESS = 0;
FAILURE = 1;
FALLBACK = 2;
}

/**
* SinkResponse is the individual response of each message written to the sink.
*/
message SinkResponse {
message Result {
// id is the ID of the message, can be used to uniquely identify the message.
string id = 1;
// success denotes the status of persisting to disk. if set to false, it means writing to sink for the message failed.
bool success = 2;
// status denotes the status of persisting to sink. It can be SUCCESS, FAILURE, or FALLBACK.
Status status = 2;
// err_msg is the error message, set it if success is set to false.
string err_msg = 3;
}
Expand Down