Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ public StreamSorter createAccumulator() {

public static class StreamSorter extends Accumulator {
private Instant latestWm = Instant.ofEpochMilli(-1);
private final TreeSet<Datum> sortedBuffer = new TreeSet<>(Comparator.comparing(Datum::getEventTime));

private final TreeSet<Datum> sortedBuffer = new TreeSet<>(Comparator
.comparing(Datum::getEventTime)
.thenComparing(Datum::getID)); // Assuming Datum has a getUniqueId() method
@Override
public void processMessage(Datum datum, OutputStreamObserver outputStream) {
log.info("Received datum with event time: {}", datum.toString());
Expand All @@ -50,6 +51,7 @@ public void processMessage(Datum datum, OutputStreamObserver outputStream) {

@Override
public void handleEndOfStream(OutputStreamObserver outputStreamObserver) {
log.info("Eof received, flushing sortedBuffer: {}", latestWm.toEpochMilli());
flushBuffer(outputStreamObserver);
}

Expand Down
Loading