diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/accumulator/sorter/StreamSorterFactory.java b/examples/src/main/java/io/numaproj/numaflow/examples/accumulator/sorter/StreamSorterFactory.java index f0391475..b7be7fd8 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/accumulator/sorter/StreamSorterFactory.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/accumulator/sorter/StreamSorterFactory.java @@ -36,8 +36,9 @@ public StreamSorter createAccumulator() { public static class StreamSorter extends Accumulator { private Instant latestWm = Instant.ofEpochMilli(-1); - private final TreeSet sortedBuffer = new TreeSet<>(Comparator.comparing(Datum::getEventTime)); - + private final TreeSet sortedBuffer = new TreeSet<>(Comparator + .comparing(Datum::getEventTime) + .thenComparing(Datum::getID)); // Assuming Datum has a getUniqueId() method @Override public void processMessage(Datum datum, OutputStreamObserver outputStream) { log.info("Received datum with event time: {}", datum.toString()); @@ -50,6 +51,7 @@ public void processMessage(Datum datum, OutputStreamObserver outputStream) { @Override public void handleEndOfStream(OutputStreamObserver outputStreamObserver) { + log.info("Eof received, flushing sortedBuffer: {}", latestWm.toEpochMilli()); flushBuffer(outputStreamObserver); }