Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public static class StreamSorter extends Accumulator {
private final TreeSet<Datum> sortedBuffer = new TreeSet<>(Comparator
.comparing(Datum::getEventTime)
.thenComparing(Datum::getID)); // Assuming Datum has a getUniqueId() method

@Override
public void processMessage(Datum datum, OutputStreamObserver outputStream) {
log.info("Received datum with event time: {}", datum.toString());
Expand All @@ -57,10 +58,10 @@ public void handleEndOfStream(OutputStreamObserver outputStreamObserver) {

private void flushBuffer(OutputStreamObserver outputStream) {
log.info("Watermark updated, flushing sortedBuffer: {}", latestWm.toEpochMilli());
while (!sortedBuffer.isEmpty() && !sortedBuffer
while (!sortedBuffer.isEmpty() && sortedBuffer
.first()
.getEventTime()
.isAfter(latestWm)) {
.isBefore(latestWm)) {
Datum datum = sortedBuffer.pollFirst();
assert datum != null;
outputStream.send(new Message(datum));
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/numaproj/numaflow/accumulator/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ static void handleFailure(
// Build gRPC Status
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (
e.getMessage() != null ? e.getMessage() : ""))
.setMessage(
ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
Expand Down
15 changes: 13 additions & 2 deletions src/main/java/io/numaproj/numaflow/reducer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.AllDeadLetters;
import com.google.protobuf.Any;
import com.google.protobuf.Empty;
import com.google.rpc.Code;
import com.google.rpc.DebugInfo;
import io.grpc.Status;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.reduce.v1.ReduceGrpc;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import io.numaproj.numaflow.reducer.metadata.IntervalWindowImpl;
import io.numaproj.numaflow.reducer.metadata.MetadataImpl;
import io.numaproj.numaflow.shared.ExceptionUtils;
import io.numaproj.numaflow.shared.GrpcServerUtils;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -37,8 +42,14 @@ static void handleFailure(
failureFuture.get();
} catch (Exception e) {
e.printStackTrace();
var status = Status.UNKNOWN.withDescription(e.getMessage()).withCause(e);
responseObserver.onError(status.asException());
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
.build();
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
}
}).start();
}
Expand Down
16 changes: 14 additions & 2 deletions src/main/java/io/numaproj/numaflow/reducestreamer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,20 @@
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.AllDeadLetters;
import com.google.protobuf.Any;
import com.google.protobuf.Empty;
import com.google.rpc.Code;
import com.google.rpc.DebugInfo;
import io.grpc.Status;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.reduce.v1.ReduceGrpc;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import io.numaproj.numaflow.reducestreamer.model.IntervalWindow;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer;
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamerFactory;
import io.numaproj.numaflow.shared.ExceptionUtils;
import io.numaproj.numaflow.shared.GrpcServerUtils;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -38,8 +43,15 @@ static void handleFailure(
failureFuture.get();
} catch (Exception e) {
e.printStackTrace();
var status = Status.UNKNOWN.withDescription(e.getMessage()).withCause(e);
responseObserver.onError(status.asException());
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(
ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
.build();
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
}
}).start();
}
Expand Down
17 changes: 14 additions & 3 deletions src/main/java/io/numaproj/numaflow/sessionreducer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
import akka.actor.AllDeadLetters;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.protobuf.Any;
import com.google.protobuf.Empty;
import io.grpc.Status;
import com.google.rpc.Code;
import com.google.rpc.DebugInfo;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.sessionreduce.v1.SessionReduceGrpc;
import io.numaproj.numaflow.sessionreduce.v1.Sessionreduce;
import io.numaproj.numaflow.sessionreducer.model.SessionReducer;
import io.numaproj.numaflow.sessionreducer.model.SessionReducerFactory;
import io.numaproj.numaflow.shared.ExceptionUtils;
import lombok.extern.slf4j.Slf4j;
import scala.concurrent.Await;
import scala.concurrent.Future;
Expand Down Expand Up @@ -39,8 +43,15 @@ static void handleFailure(
failureFuture.get();
} catch (Exception e) {
e.printStackTrace();
var status = Status.UNKNOWN.withDescription(e.getMessage()).withCause(e);
responseObserver.onError(status.asException());
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(
ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
.build();
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
}
}).start();
}
Expand Down
10 changes: 1 addition & 9 deletions src/main/proto/accumulator/v1/accumulator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ message AccumulatorRequest {

Payload payload = 1;
WindowOperation operation = 2;
optional Handshake handshake = 3;
}


Expand All @@ -63,19 +62,12 @@ message AccumulatorResponse {
// window represents a window to which the result belongs.
KeyedWindow window = 2;
repeated string tags = 3;
optional Handshake handshake = 4;
// EOF represents the end of the response for a window.
bool EOF = 5;
bool EOF = 4;
}


// ReadyResponse is the health check result.
message ReadyResponse {
bool ready = 1;
}

// Handshake message between client and server to indicate the start of transmission.
message Handshake {
// Required field indicating the start of transmission.
bool sot = 1;
}
5 changes: 2 additions & 3 deletions src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_END_KEY;
import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_START_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class ServerErrTest {
Expand Down Expand Up @@ -107,9 +108,7 @@ public void given_reducerThrows_when_serverRuns_then_outputStreamContainsThrowab
}
}
try {
assertEquals(
"UNKNOWN: java.lang.RuntimeException: unknown exception",
outputStreamObserver.t.getMessage());
assertTrue(outputStreamObserver.t.getMessage().contains("UDF_EXECUTION_ERROR"));
} catch (Throwable e) {
exceptionInThread.set(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_END_KEY;
import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_START_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class ServerErrTest {
Expand Down Expand Up @@ -107,9 +108,7 @@ public void given_reducerThrows_when_serverRuns_then_outputStreamContainsThrowab
}
}
try {
assertEquals(
"UNKNOWN: java.lang.RuntimeException: unknown exception",
outputStreamObserver.t.getMessage());
assertTrue(outputStreamObserver.t.getMessage().contains("UDF_EXECUTION_ERROR"));
} catch (Throwable e) {
exceptionInThread.set(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class ServerErrTest {
Expand Down Expand Up @@ -78,9 +79,7 @@ public void given_actorThrows_when_serverRuns_then_outputStreamContainsThrowable
try {
// this test triggers a supervisor runtime exception by sending an OPEN request with 2 windows.
// we are expecting the error message below.
assertEquals(
"UNKNOWN: java.lang.RuntimeException: open operation error: expected exactly one window",
outputStreamObserver.t.getMessage());
assertTrue(outputStreamObserver.t.getMessage().contains("expected exactly one window"));
} catch (Throwable e) {
exceptionInThread.set(e);
}
Expand Down Expand Up @@ -148,9 +147,7 @@ public void given_sessionReducerThrows_when_serverRuns_then_outputStreamContains
}
}
try {
assertEquals(
"UNKNOWN: java.lang.RuntimeException: unknown exception",
outputStreamObserver.t.getMessage());
assertTrue(outputStreamObserver.t.getMessage().contains("UDF_EXECUTION_ERROR"));
} catch (Throwable e) {
exceptionInThread.set(e);
}
Expand Down
Loading