Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
public class SumFactory extends ReducerFactory<SumFunction> {

public static void main(String[] args) throws Exception {
log.info("sum udf was invoked");
log.info("Starting sum udf server");
Server server = new Server(new SumFactory());

// Start the server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void processMessage(
log.info("error while parsing integer - {}", e.getMessage());
}
if (sum >= 100) {
outputStreamObserver.send(new Message(String.valueOf(sum).getBytes()));
outputStreamObserver.send(new Message(String.valueOf(sum).getBytes(), keys));
sum = 0;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import com.google.common.base.Preconditions;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import io.numaproj.numaflow.reducer.metadata.IntervalWindowImpl;
import io.numaproj.numaflow.reducer.metadata.MetadataImpl;
import lombok.extern.slf4j.Slf4j;
import scala.PartialFunction;
import scala.collection.Iterable;
Expand All @@ -25,31 +27,26 @@
@Slf4j
class ReduceSupervisorActor extends AbstractActor {
private final ReducerFactory<? extends Reducer> reducerFactory;
private final Metadata md;
private final ActorRef shutdownActor;
private final StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver;
private final Map<String, ActorRef> actorsMap = new HashMap<>();

public ReduceSupervisorActor(
ReducerFactory<? extends Reducer> reducerFactory,
Metadata md,
ActorRef shutdownActor,
StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver) {
this.reducerFactory = reducerFactory;
this.md = md;
this.shutdownActor = shutdownActor;
this.responseObserver = responseObserver;
}

public static Props props(
ReducerFactory<? extends Reducer> reducerFactory,
Metadata md,
ActorRef shutdownActor,
StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver) {
return Props.create(
ReduceSupervisorActor.class,
reducerFactory,
md,
shutdownActor,
responseObserver);
}
Expand Down Expand Up @@ -92,8 +89,18 @@ public Receive createReceive() {
private void invokeActors(ActorRequest actorRequest) {
String[] keys = actorRequest.getKeySet();
String uniqueId = actorRequest.getUniqueIdentifier();
ReduceOuterClass.Window window = actorRequest.getRequest().getOperation().getWindows(0);
if (!actorsMap.containsKey(uniqueId)) {
Reducer reduceHandler = reducerFactory.createReducer();
// create metadata
IntervalWindow iw = new IntervalWindowImpl(
Instant.ofEpochSecond(
window.getStart().getSeconds(),
window.getStart().getNanos()),
Instant.ofEpochSecond(
window.getEnd().getSeconds(),
window.getEnd().getNanos()));
Metadata md = new MetadataImpl(iw);
ActorRef actorRef = getContext()
.actorOf(ReduceActor.props(keys, md, reduceHandler));
actorsMap.put(uniqueId, actorRef);
Expand Down
13 changes: 0 additions & 13 deletions src/main/java/io/numaproj/numaflow/reducer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,6 @@ public StreamObserver<ReduceOuterClass.ReduceRequest> reduceFn(final StreamObser
responseObserver);
}

// get window start and end time from gPRC metadata
String winSt = GrpcServerUtils.WINDOW_START_TIME.get();
String winEt = GrpcServerUtils.WINDOW_END_TIME.get();

// convert the start and end time to Instant
Instant startTime = Instant.ofEpochMilli(Long.parseLong(winSt));
Instant endTime = Instant.ofEpochMilli(Long.parseLong(winEt));

// create metadata
IntervalWindow iw = new IntervalWindowImpl(startTime, endTime);
Metadata md = new MetadataImpl(iw);

CompletableFuture<Void> failureFuture = new CompletableFuture<>();

// create a shutdown actor that listens to exceptions.
Expand All @@ -84,7 +72,6 @@ public StreamObserver<ReduceOuterClass.ReduceRequest> reduceFn(final StreamObser
ActorRef supervisorActor = reduceActorSystem
.actorOf(ReduceSupervisorActor.props(
reducerFactory,
md,
shutdownActorRef,
responseObserver));

Expand Down
10 changes: 10 additions & 0 deletions src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.reducer;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.ManagedChannel;
Expand Down Expand Up @@ -128,6 +129,15 @@ public void given_reducerThrows_when_serverRuns_then_outputStreamContainsThrowab
.addKeys("reduce-key")
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.setOperation(ReduceOuterClass.ReduceRequest.WindowOperation
.newBuilder()
.addWindows(
ReduceOuterClass.Window
.newBuilder()
.setStart(Timestamp.newBuilder().setSeconds(60000).build())
.setEnd(Timestamp.newBuilder().setSeconds(120000).build())
.build()
))
.build();
inputStreamObserver.onNext(reduceRequest);
}
Expand Down
29 changes: 19 additions & 10 deletions src/test/java/io/numaproj/numaflow/reducer/ServerTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.reducer;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.ManagedChannel;
Expand Down Expand Up @@ -91,16 +92,11 @@ public void tearDown() throws Exception {
public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequestsGetAggregatedToOneResponse() {
String reduceKey = "reduce-key";

Metadata metadata = new Metadata();
metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000");
metadata.put(Metadata.Key.of(WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER), "120000");

// create an output stream observer
ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();

StreamObserver<ReduceOuterClass.ReduceRequest> inputStreamObserver = ReduceGrpc
.newStub(inProcessChannel)
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata))
.reduceFn(outputStreamObserver);

for (int i = 1; i <= 10; i++) {
Expand All @@ -110,6 +106,15 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.addAllKeys(List.of(reduceKey))
.build())
.setOperation(ReduceOuterClass.ReduceRequest.WindowOperation
.newBuilder()
.addWindows(
ReduceOuterClass.Window
.newBuilder()
.setStart(Timestamp.newBuilder().setSeconds(60000).build())
.setEnd(Timestamp.newBuilder().setSeconds(120000).build())
.build()
))
.build();
inputStreamObserver.onNext(request);
}
Expand Down Expand Up @@ -143,16 +148,11 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then
String reduceKey = "reduce-key";
int keyCount = 10;

Metadata metadata = new Metadata();
metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000");
metadata.put(Metadata.Key.of(WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER), "120000");

// create an output stream observer
ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();

StreamObserver<ReduceOuterClass.ReduceRequest> inputStreamObserver = ReduceGrpc
.newStub(inProcessChannel)
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata))
.reduceFn(outputStreamObserver);

// send messages with keyCount different keys
Expand All @@ -164,6 +164,15 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then
.addAllKeys(List.of(reduceKey + j))
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.setOperation(ReduceOuterClass.ReduceRequest.WindowOperation
.newBuilder()
.addWindows(
ReduceOuterClass.Window
.newBuilder()
.setStart(Timestamp.newBuilder().setSeconds(60000).build())
.setEnd(Timestamp.newBuilder().setSeconds(120000).build())
.build()
))
.build();
inputStreamObserver.onNext(request);
}
Expand Down
25 changes: 12 additions & 13 deletions src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
import akka.actor.AllDeadLetters;
import akka.actor.DeadLetter;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import io.numaproj.numaflow.reducer.metadata.IntervalWindowImpl;
import io.numaproj.numaflow.reducer.metadata.MetadataImpl;
import org.junit.Test;

import java.time.Instant;
import java.util.concurrent.CompletableFuture;

import static org.junit.Assert.assertEquals;
Expand All @@ -33,14 +31,10 @@ public void testFailure() {
.actorOf(ReduceShutdownActor
.props(completableFuture));

Metadata md = new MetadataImpl(
new IntervalWindowImpl(Instant.now(), Instant.now()));

ActorRef supervisorActor = actorSystem
.actorOf(ReduceSupervisorActor
.props(
new TestExceptionFactory(),
md,
shutdownActor,
new ReduceOutputStreamObserver()));

Expand All @@ -49,14 +43,23 @@ public void testFailure() {
.addKeys("reduce-test")
.setValue(ByteString.copyFromUtf8(String.valueOf(1)))
.build())
.setOperation(ReduceOuterClass.ReduceRequest.WindowOperation
.newBuilder()
.addWindows(
ReduceOuterClass.Window
.newBuilder()
.setStart(Timestamp.newBuilder().setSeconds(60000).build())
.setEnd(Timestamp.newBuilder().setSeconds(60000).build())
.build()
))
.build());
supervisorActor.tell(reduceRequest, ActorRef.noSender());

try {
completableFuture.get();
fail("Expected the future to complete with exception");
} catch (Exception e) {
assertEquals(e.getMessage(), "java.lang.RuntimeException: UDF Failure");
assertEquals("java.lang.RuntimeException: UDF Failure", e.getMessage());
}
}

Expand All @@ -71,14 +74,10 @@ public void testDeadLetterHandling() {

actorSystem.eventStream().subscribe(shutdownActor, AllDeadLetters.class);

Metadata md = new MetadataImpl(
new IntervalWindowImpl(Instant.now(), Instant.now()));

ActorRef supervisorActor = actorSystem
.actorOf(ReduceSupervisorActor
.props(
new TestExceptionFactory(),
md,
shutdownActor,
new ReduceOutputStreamObserver()));

Expand All @@ -89,7 +88,7 @@ public void testDeadLetterHandling() {
completableFuture.get();
fail("Expected the future to complete with exception");
} catch (Exception e) {
assertEquals(e.getMessage(), "java.lang.Throwable: dead letters");
assertEquals("java.lang.Throwable: dead letters", e.getMessage());
}
}

Expand Down
42 changes: 26 additions & 16 deletions src/test/java/io/numaproj/numaflow/reducer/SupervisorActorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import io.numaproj.numaflow.reducer.metadata.IntervalWindowImpl;
import io.numaproj.numaflow.reducer.metadata.MetadataImpl;
import org.junit.Test;

import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -29,14 +27,11 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
.actorOf(ReduceShutdownActor
.props(completableFuture));

Metadata md = new MetadataImpl(
new IntervalWindowImpl(Instant.now(), Instant.now()));

ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();

ActorRef supervisorActor = actorSystem
.actorOf(ReduceSupervisorActor
.props(new TestReducerFactory(), md, shutdownActor, outputStreamObserver));
.props(new TestReducerFactory(), shutdownActor, outputStreamObserver));

for (int i = 1; i <= 10; i++) {
ActorRequest reduceRequest = new ActorRequest(ReduceOuterClass.ReduceRequest
Expand All @@ -47,6 +42,15 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
.addAllKeys(Arrays.asList("key-1", "key-2"))
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.setOperation(ReduceOuterClass.ReduceRequest.WindowOperation
.newBuilder()
.addWindows(
ReduceOuterClass.Window
.newBuilder()
.setStart(Timestamp.newBuilder().setSeconds(60000).build())
.setEnd(Timestamp.newBuilder().setSeconds(60000).build())
.build()
))
.build());
supervisorActor.tell(reduceRequest, ActorRef.noSender());
}
Expand All @@ -57,11 +61,12 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
List<ReduceOuterClass.ReduceResponse> result = outputStreamObserver.resultDatum.get();
// the observer should receive 2 messages, one is the aggregated result, the other is the EOF response.
assertEquals(2, result.size());
assertEquals("10", result
.get(0)
.getResult()
.getValue()
.toStringUtf8());
assertEquals(
"10", result
.get(0)
.getResult()
.getValue()
.toStringUtf8());
assertTrue(result
.get(1)
.getEOF());
Expand All @@ -80,15 +85,11 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas
.actorOf(ReduceShutdownActor
.props(completableFuture));

Metadata md = new MetadataImpl(
new IntervalWindowImpl(Instant.now(), Instant.now()));

ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();
ActorRef supervisorActor = actorSystem
.actorOf(ReduceSupervisorActor
.props(
new TestReducerFactory(),
md,
shutdownActor,
outputStreamObserver)
);
Expand All @@ -102,6 +103,15 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas
.addAllKeys(Arrays.asList("shared-key", "unique-key-" + i))
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.setOperation(ReduceOuterClass.ReduceRequest.WindowOperation
.newBuilder()
.addWindows(
ReduceOuterClass.Window
.newBuilder()
.setStart(Timestamp.newBuilder().setSeconds(60000).build())
.setEnd(Timestamp.newBuilder().setSeconds(60000).build())
.build()
))
.build());
supervisorActor.tell(reduceRequest, ActorRef.noSender());
}
Expand Down
Loading