From 94ac2ce08b69643817b3b4cf2dbb351ad8bcee5d Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Mon, 24 Mar 2025 19:55:12 +0530 Subject: [PATCH] fix: stream sorter example to consider unique id while storing messages Signed-off-by: Yashash H L --- .../examples/accumulator/sorter/StreamSorterFactory.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/accumulator/sorter/StreamSorterFactory.java b/examples/src/main/java/io/numaproj/numaflow/examples/accumulator/sorter/StreamSorterFactory.java index f0391475..b7be7fd8 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/accumulator/sorter/StreamSorterFactory.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/accumulator/sorter/StreamSorterFactory.java @@ -36,8 +36,9 @@ public StreamSorter createAccumulator() { public static class StreamSorter extends Accumulator { private Instant latestWm = Instant.ofEpochMilli(-1); - private final TreeSet sortedBuffer = new TreeSet<>(Comparator.comparing(Datum::getEventTime)); - + private final TreeSet sortedBuffer = new TreeSet<>(Comparator + .comparing(Datum::getEventTime) + .thenComparing(Datum::getID)); // Assuming Datum has a getUniqueId() method @Override public void processMessage(Datum datum, OutputStreamObserver outputStream) { log.info("Received datum with event time: {}", datum.toString()); @@ -50,6 +51,7 @@ public void processMessage(Datum datum, OutputStreamObserver outputStream) { @Override public void handleEndOfStream(OutputStreamObserver outputStreamObserver) { + log.info("Eof received, flushing sortedBuffer: {}", latestWm.toEpochMilli()); flushBuffer(outputStreamObserver); }