diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/reduce/sum/SumFactory.java b/examples/src/main/java/io/numaproj/numaflow/examples/reduce/sum/SumFactory.java index d6c74386..f7cfd959 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/reduce/sum/SumFactory.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/reduce/sum/SumFactory.java @@ -8,7 +8,7 @@ public class SumFactory extends ReducerFactory { public static void main(String[] args) throws Exception { - log.info("sum udf was invoked"); + log.info("Starting sum udf server"); Server server = new Server(new SumFactory()); // Start the server diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFunction.java b/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFunction.java index 60f4972b..44dd2a56 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFunction.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFunction.java @@ -29,7 +29,7 @@ public void processMessage( log.info("error while parsing integer - {}", e.getMessage()); } if (sum >= 100) { - outputStreamObserver.send(new Message(String.valueOf(sum).getBytes())); + outputStreamObserver.send(new Message(String.valueOf(sum).getBytes(), keys)); sum = 0; } } diff --git a/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java b/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java index 3fadae42..78a2cb10 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java @@ -10,6 +10,8 @@ import com.google.common.base.Preconditions; import io.grpc.stub.StreamObserver; import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; +import io.numaproj.numaflow.reducer.metadata.IntervalWindowImpl; +import io.numaproj.numaflow.reducer.metadata.MetadataImpl; import lombok.extern.slf4j.Slf4j; import scala.PartialFunction; import scala.collection.Iterable; @@ -25,31 +27,26 @@ @Slf4j class ReduceSupervisorActor extends AbstractActor { private final ReducerFactory reducerFactory; - private final Metadata md; private final ActorRef shutdownActor; private final StreamObserver responseObserver; private final Map actorsMap = new HashMap<>(); public ReduceSupervisorActor( ReducerFactory reducerFactory, - Metadata md, ActorRef shutdownActor, StreamObserver responseObserver) { this.reducerFactory = reducerFactory; - this.md = md; this.shutdownActor = shutdownActor; this.responseObserver = responseObserver; } public static Props props( ReducerFactory reducerFactory, - Metadata md, ActorRef shutdownActor, StreamObserver responseObserver) { return Props.create( ReduceSupervisorActor.class, reducerFactory, - md, shutdownActor, responseObserver); } @@ -92,8 +89,18 @@ public Receive createReceive() { private void invokeActors(ActorRequest actorRequest) { String[] keys = actorRequest.getKeySet(); String uniqueId = actorRequest.getUniqueIdentifier(); + ReduceOuterClass.Window window = actorRequest.getRequest().getOperation().getWindows(0); if (!actorsMap.containsKey(uniqueId)) { Reducer reduceHandler = reducerFactory.createReducer(); + // create metadata + IntervalWindow iw = new IntervalWindowImpl( + Instant.ofEpochSecond( + window.getStart().getSeconds(), + window.getStart().getNanos()), + Instant.ofEpochSecond( + window.getEnd().getSeconds(), + window.getEnd().getNanos())); + Metadata md = new MetadataImpl(iw); ActorRef actorRef = getContext() .actorOf(ReduceActor.props(keys, md, reduceHandler)); actorsMap.put(uniqueId, actorRef); diff --git a/src/main/java/io/numaproj/numaflow/reducer/Service.java b/src/main/java/io/numaproj/numaflow/reducer/Service.java index 1a0c0c53..efffe9f4 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/Service.java +++ b/src/main/java/io/numaproj/numaflow/reducer/Service.java @@ -55,18 +55,6 @@ public StreamObserver reduceFn(final StreamObser responseObserver); } - // get window start and end time from gPRC metadata - String winSt = GrpcServerUtils.WINDOW_START_TIME.get(); - String winEt = GrpcServerUtils.WINDOW_END_TIME.get(); - - // convert the start and end time to Instant - Instant startTime = Instant.ofEpochMilli(Long.parseLong(winSt)); - Instant endTime = Instant.ofEpochMilli(Long.parseLong(winEt)); - - // create metadata - IntervalWindow iw = new IntervalWindowImpl(startTime, endTime); - Metadata md = new MetadataImpl(iw); - CompletableFuture failureFuture = new CompletableFuture<>(); // create a shutdown actor that listens to exceptions. @@ -84,7 +72,6 @@ public StreamObserver reduceFn(final StreamObser ActorRef supervisorActor = reduceActorSystem .actorOf(ReduceSupervisorActor.props( reducerFactory, - md, shutdownActorRef, responseObserver)); diff --git a/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java index 6d4b15e0..d7ffba4c 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java @@ -1,6 +1,7 @@ package io.numaproj.numaflow.reducer; import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; import io.grpc.Context; import io.grpc.Contexts; import io.grpc.ManagedChannel; @@ -128,6 +129,15 @@ public void given_reducerThrows_when_serverRuns_then_outputStreamContainsThrowab .addKeys("reduce-key") .setValue(ByteString.copyFromUtf8(String.valueOf(i))) .build()) + .setOperation(ReduceOuterClass.ReduceRequest.WindowOperation + .newBuilder() + .addWindows( + ReduceOuterClass.Window + .newBuilder() + .setStart(Timestamp.newBuilder().setSeconds(60000).build()) + .setEnd(Timestamp.newBuilder().setSeconds(120000).build()) + .build() + )) .build(); inputStreamObserver.onNext(reduceRequest); } diff --git a/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java b/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java index 880c2f0e..9abeb14a 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java @@ -1,6 +1,7 @@ package io.numaproj.numaflow.reducer; import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; import io.grpc.Context; import io.grpc.Contexts; import io.grpc.ManagedChannel; @@ -91,16 +92,11 @@ public void tearDown() throws Exception { public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequestsGetAggregatedToOneResponse() { String reduceKey = "reduce-key"; - Metadata metadata = new Metadata(); - metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000"); - metadata.put(Metadata.Key.of(WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER), "120000"); - // create an output stream observer ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver(); StreamObserver inputStreamObserver = ReduceGrpc .newStub(inProcessChannel) - .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)) .reduceFn(outputStreamObserver); for (int i = 1; i <= 10; i++) { @@ -110,6 +106,15 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ .setValue(ByteString.copyFromUtf8(String.valueOf(i))) .addAllKeys(List.of(reduceKey)) .build()) + .setOperation(ReduceOuterClass.ReduceRequest.WindowOperation + .newBuilder() + .addWindows( + ReduceOuterClass.Window + .newBuilder() + .setStart(Timestamp.newBuilder().setSeconds(60000).build()) + .setEnd(Timestamp.newBuilder().setSeconds(120000).build()) + .build() + )) .build(); inputStreamObserver.onNext(request); } @@ -143,16 +148,11 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then String reduceKey = "reduce-key"; int keyCount = 10; - Metadata metadata = new Metadata(); - metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000"); - metadata.put(Metadata.Key.of(WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER), "120000"); - // create an output stream observer ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver(); StreamObserver inputStreamObserver = ReduceGrpc .newStub(inProcessChannel) - .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)) .reduceFn(outputStreamObserver); // send messages with keyCount different keys @@ -164,6 +164,15 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then .addAllKeys(List.of(reduceKey + j)) .setValue(ByteString.copyFromUtf8(String.valueOf(i))) .build()) + .setOperation(ReduceOuterClass.ReduceRequest.WindowOperation + .newBuilder() + .addWindows( + ReduceOuterClass.Window + .newBuilder() + .setStart(Timestamp.newBuilder().setSeconds(60000).build()) + .setEnd(Timestamp.newBuilder().setSeconds(120000).build()) + .build() + )) .build(); inputStreamObserver.onNext(request); } diff --git a/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java b/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java index 897872b8..af528e29 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java +++ b/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java @@ -5,12 +5,10 @@ import akka.actor.AllDeadLetters; import akka.actor.DeadLetter; import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; -import io.numaproj.numaflow.reducer.metadata.IntervalWindowImpl; -import io.numaproj.numaflow.reducer.metadata.MetadataImpl; import org.junit.Test; -import java.time.Instant; import java.util.concurrent.CompletableFuture; import static org.junit.Assert.assertEquals; @@ -33,14 +31,10 @@ public void testFailure() { .actorOf(ReduceShutdownActor .props(completableFuture)); - Metadata md = new MetadataImpl( - new IntervalWindowImpl(Instant.now(), Instant.now())); - ActorRef supervisorActor = actorSystem .actorOf(ReduceSupervisorActor .props( new TestExceptionFactory(), - md, shutdownActor, new ReduceOutputStreamObserver())); @@ -49,6 +43,15 @@ public void testFailure() { .addKeys("reduce-test") .setValue(ByteString.copyFromUtf8(String.valueOf(1))) .build()) + .setOperation(ReduceOuterClass.ReduceRequest.WindowOperation + .newBuilder() + .addWindows( + ReduceOuterClass.Window + .newBuilder() + .setStart(Timestamp.newBuilder().setSeconds(60000).build()) + .setEnd(Timestamp.newBuilder().setSeconds(60000).build()) + .build() + )) .build()); supervisorActor.tell(reduceRequest, ActorRef.noSender()); @@ -56,7 +59,7 @@ public void testFailure() { completableFuture.get(); fail("Expected the future to complete with exception"); } catch (Exception e) { - assertEquals(e.getMessage(), "java.lang.RuntimeException: UDF Failure"); + assertEquals("java.lang.RuntimeException: UDF Failure", e.getMessage()); } } @@ -71,14 +74,10 @@ public void testDeadLetterHandling() { actorSystem.eventStream().subscribe(shutdownActor, AllDeadLetters.class); - Metadata md = new MetadataImpl( - new IntervalWindowImpl(Instant.now(), Instant.now())); - ActorRef supervisorActor = actorSystem .actorOf(ReduceSupervisorActor .props( new TestExceptionFactory(), - md, shutdownActor, new ReduceOutputStreamObserver())); @@ -89,7 +88,7 @@ public void testDeadLetterHandling() { completableFuture.get(); fail("Expected the future to complete with exception"); } catch (Exception e) { - assertEquals(e.getMessage(), "java.lang.Throwable: dead letters"); + assertEquals("java.lang.Throwable: dead letters", e.getMessage()); } } diff --git a/src/test/java/io/numaproj/numaflow/reducer/SupervisorActorTest.java b/src/test/java/io/numaproj/numaflow/reducer/SupervisorActorTest.java index 3cc0b34d..704d4af5 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/SupervisorActorTest.java +++ b/src/test/java/io/numaproj/numaflow/reducer/SupervisorActorTest.java @@ -3,12 +3,10 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; -import io.numaproj.numaflow.reducer.metadata.IntervalWindowImpl; -import io.numaproj.numaflow.reducer.metadata.MetadataImpl; import org.junit.Test; -import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -29,14 +27,11 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then .actorOf(ReduceShutdownActor .props(completableFuture)); - Metadata md = new MetadataImpl( - new IntervalWindowImpl(Instant.now(), Instant.now())); - ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver(); ActorRef supervisorActor = actorSystem .actorOf(ReduceSupervisorActor - .props(new TestReducerFactory(), md, shutdownActor, outputStreamObserver)); + .props(new TestReducerFactory(), shutdownActor, outputStreamObserver)); for (int i = 1; i <= 10; i++) { ActorRequest reduceRequest = new ActorRequest(ReduceOuterClass.ReduceRequest @@ -47,6 +42,15 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then .addAllKeys(Arrays.asList("key-1", "key-2")) .setValue(ByteString.copyFromUtf8(String.valueOf(i))) .build()) + .setOperation(ReduceOuterClass.ReduceRequest.WindowOperation + .newBuilder() + .addWindows( + ReduceOuterClass.Window + .newBuilder() + .setStart(Timestamp.newBuilder().setSeconds(60000).build()) + .setEnd(Timestamp.newBuilder().setSeconds(60000).build()) + .build() + )) .build()); supervisorActor.tell(reduceRequest, ActorRef.noSender()); } @@ -57,11 +61,12 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then List result = outputStreamObserver.resultDatum.get(); // the observer should receive 2 messages, one is the aggregated result, the other is the EOF response. assertEquals(2, result.size()); - assertEquals("10", result - .get(0) - .getResult() - .getValue() - .toStringUtf8()); + assertEquals( + "10", result + .get(0) + .getResult() + .getValue() + .toStringUtf8()); assertTrue(result .get(1) .getEOF()); @@ -80,15 +85,11 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas .actorOf(ReduceShutdownActor .props(completableFuture)); - Metadata md = new MetadataImpl( - new IntervalWindowImpl(Instant.now(), Instant.now())); - ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver(); ActorRef supervisorActor = actorSystem .actorOf(ReduceSupervisorActor .props( new TestReducerFactory(), - md, shutdownActor, outputStreamObserver) ); @@ -102,6 +103,15 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas .addAllKeys(Arrays.asList("shared-key", "unique-key-" + i)) .setValue(ByteString.copyFromUtf8(String.valueOf(i))) .build()) + .setOperation(ReduceOuterClass.ReduceRequest.WindowOperation + .newBuilder() + .addWindows( + ReduceOuterClass.Window + .newBuilder() + .setStart(Timestamp.newBuilder().setSeconds(60000).build()) + .setEnd(Timestamp.newBuilder().setSeconds(60000).build()) + .build() + )) .build()); supervisorActor.tell(reduceRequest, ActorRef.noSender()); }