diff --git a/src/main/java/io/numaproj/numaflow/accumulator/AccumulatorActor.java b/src/main/java/io/numaproj/numaflow/accumulator/AccumulatorActor.java index 3dbf611d..e0e38539 100644 --- a/src/main/java/io/numaproj/numaflow/accumulator/AccumulatorActor.java +++ b/src/main/java/io/numaproj/numaflow/accumulator/AccumulatorActor.java @@ -50,6 +50,8 @@ private void sendEOF(String EOF) { .newBuilder() .setWindow(AccumulatorOuterClass.KeyedWindow .newBuilder() + .setStart(this.keyedWindow.getStart()) + .setEnd(this.keyedWindow.getEnd()) .addAllKeys(this.keyedWindow.getKeysList())) .setEOF(true) .build(); diff --git a/src/main/java/io/numaproj/numaflow/accumulator/OutputStreamObserverImpl.java b/src/main/java/io/numaproj/numaflow/accumulator/OutputStreamObserverImpl.java index 24811de6..d1ea040e 100644 --- a/src/main/java/io/numaproj/numaflow/accumulator/OutputStreamObserverImpl.java +++ b/src/main/java/io/numaproj/numaflow/accumulator/OutputStreamObserverImpl.java @@ -15,7 +15,7 @@ class OutputStreamObserverImpl implements OutputStreamObserver { private final ActorRef outputActor; private final AccumulatorOuterClass.KeyedWindow keyedWindow; - private final Instant latestWatermark = Instant.ofEpochMilli(-1); + private Instant latestWatermark = Instant.ofEpochMilli(-1); public OutputStreamObserverImpl( ActorRef outputActor, @@ -26,14 +26,17 @@ public OutputStreamObserverImpl( private AccumulatorOuterClass.AccumulatorResponse buildResponse(Message message) { AccumulatorOuterClass.AccumulatorResponse.Builder responseBuilder = AccumulatorOuterClass.AccumulatorResponse.newBuilder(); + if (message.getWatermark().isAfter(latestWatermark)) { + this.latestWatermark = message.getWatermark(); + } // set the window using the actor metadata. responseBuilder.setWindow(AccumulatorOuterClass.KeyedWindow.newBuilder() .setStart(Timestamp.newBuilder() .setSeconds(0) - .setNanos(0)) + .setNanos(0).build()) .setEnd(Timestamp.newBuilder() .setSeconds(this.latestWatermark.getEpochSecond()) - .setNanos(this.latestWatermark.getNano())) + .setNanos(this.latestWatermark.getNano()).build()) .setSlot("slot-0") .addAllKeys(this.keyedWindow.getKeysList()) .build()); @@ -47,11 +50,11 @@ private AccumulatorOuterClass.AccumulatorResponse buildResponse(Message message) .setEventTime(Timestamp .newBuilder() .setSeconds(message.getEventTime().getEpochSecond()) - .setNanos(message.getEventTime().getNano())) + .setNanos(message.getEventTime().getNano()).build()) .setWatermark(Timestamp .newBuilder() .setSeconds(message.getWatermark().getEpochSecond()) - .setNanos(message.getWatermark().getNano())) + .setNanos(message.getWatermark().getNano()).build()) .putAllHeaders(message.getHeaders()) .addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList(message.getKeys()))