diff --git a/src/main/java/io/numaproj/numaflow/accumulator/OutputStreamObserverImpl.java b/src/main/java/io/numaproj/numaflow/accumulator/OutputStreamObserverImpl.java index 24811de6..7c730507 100644 --- a/src/main/java/io/numaproj/numaflow/accumulator/OutputStreamObserverImpl.java +++ b/src/main/java/io/numaproj/numaflow/accumulator/OutputStreamObserverImpl.java @@ -10,7 +10,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; -import java.util.List; class OutputStreamObserverImpl implements OutputStreamObserver { private final ActorRef outputActor; @@ -58,7 +57,7 @@ private AccumulatorOuterClass.AccumulatorResponse buildResponse(Message message) .setId(message.getId()) .build()) .addAllTags( - message.getTags() == null ? new ArrayList<>() : List.of(message.getTags())) + message.getTags() == null ? new ArrayList<>() : Arrays.asList(message.getTags())) .build(); } diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java index fbd103be..4be3b812 100644 --- a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java +++ b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java @@ -16,6 +16,7 @@ import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -150,9 +151,9 @@ private void buildAndStreamResponse( == null ? ByteString.EMPTY : ByteString.copyFrom( res.getValue())) .addAllKeys(res.getKeys() - == null ? new ArrayList<>() : List.of(res.getKeys())) + == null ? new ArrayList<>() : Arrays.asList(res.getKeys())) .addAllTags(res.getTags() - == null ? new ArrayList<>() : List.of(res.getTags())) + == null ? new ArrayList<>() : Arrays.asList(res.getTags())) .build() ); }); diff --git a/src/main/java/io/numaproj/numaflow/mapper/MapperActor.java b/src/main/java/io/numaproj/numaflow/mapper/MapperActor.java index 316c35a6..a6a56078 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/MapperActor.java +++ b/src/main/java/io/numaproj/numaflow/mapper/MapperActor.java @@ -9,7 +9,7 @@ import java.time.Instant; import java.util.ArrayList; -import java.util.List; +import java.util.Arrays; /** * Mapper actor that processes the map request. It invokes the mapper to process the request and @@ -86,9 +86,9 @@ private MapOuterClass.MapResponse buildResponse(MessageList messageList, String .setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom( message.getValue())) .addAllKeys(message.getKeys() - == null ? new ArrayList<>() : List.of(message.getKeys())) + == null ? new ArrayList<>() : Arrays.asList(message.getKeys())) .addAllTags(message.getTags() - == null ? new ArrayList<>() : List.of(message.getTags())) + == null ? new ArrayList<>() : Arrays.asList(message.getTags())) .build()); }); return responseBuilder.setId(ID).build(); diff --git a/src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java b/src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java index e2612dc3..9567568f 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java +++ b/src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java @@ -13,6 +13,7 @@ import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -160,7 +161,7 @@ private MapOuterClass.MapRequest createRequest( String requestId) { return MapOuterClass.MapRequest.newBuilder().setRequest( MapOuterClass.MapRequest.Request.newBuilder() - .addAllKeys(keys == null ? new ArrayList<>() : List.of(keys)) + .addAllKeys(keys == null ? new ArrayList<>() : Arrays.asList(keys)) .setValue(data.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(data.getValue())) .setEventTime( diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/OutputObserverImpl.java b/src/main/java/io/numaproj/numaflow/mapstreamer/OutputObserverImpl.java index af641419..0a008b48 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/OutputObserverImpl.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/OutputObserverImpl.java @@ -6,7 +6,7 @@ import lombok.AllArgsConstructor; import java.util.ArrayList; -import java.util.List; +import java.util.Arrays; /** * Implementation of the OutputObserver interface. @@ -36,9 +36,9 @@ public void send(Message message) { message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom( message.getValue())) .addAllKeys(message.getKeys() - == null ? new ArrayList<>() : List.of(message.getKeys())) + == null ? new ArrayList<>() : Arrays.asList(message.getKeys())) .addAllTags(message.getTags() - == null ? new ArrayList<>() : List.of(message.getTags())) + == null ? new ArrayList<>() : Arrays.asList(message.getTags())) .build()).build(); supervisorActor.tell(response, ActorRef.noSender()); } diff --git a/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java b/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java index 327354a0..d5ca690f 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java +++ b/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java @@ -11,7 +11,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.List; /** * Reduce actor invokes the reducer and returns the result. @@ -70,7 +69,7 @@ private ActorResponse buildActorResponse(Message message) { .addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList(message.getKeys())) .addAllTags( - message.getTags() == null ? new ArrayList<>() : List.of(message.getTags())) + message.getTags() == null ? new ArrayList<>() : Arrays.asList(message.getTags())) .build()); return new ActorResponse(responseBuilder.build()); } @@ -89,7 +88,7 @@ private ActorResponse buildEOFActorResponse() { // set a dummy result with the keys. responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result .newBuilder() - .addAllKeys(List.of(this.keys)) + .addAllKeys(Arrays.asList(this.keys)) .build()); return new ActorResponse(responseBuilder.build()); } diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/OutputStreamObserverImpl.java b/src/main/java/io/numaproj/numaflow/reducestreamer/OutputStreamObserverImpl.java index 688fecaf..c28eca49 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/OutputStreamObserverImpl.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/OutputStreamObserverImpl.java @@ -11,7 +11,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.List; @AllArgsConstructor class OutputStreamObserverImpl implements OutputStreamObserver { @@ -42,7 +41,7 @@ private ActorResponse buildResponse(Message message) { .addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList(message.getKeys())) .addAllTags( - message.getTags() == null ? new ArrayList<>() : List.of(message.getTags())) + message.getTags() == null ? new ArrayList<>() : Arrays.asList(message.getTags())) .build()); return new ActorResponse(responseBuilder.build()); } diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceStreamerActor.java b/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceStreamerActor.java index e1704433..4e5c15f3 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceStreamerActor.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceStreamerActor.java @@ -11,7 +11,7 @@ import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer; import lombok.AllArgsConstructor; -import java.util.List; +import java.util.Arrays; /** * Reduce streamer actor invokes user defined functions to handle reduce requests. @@ -71,7 +71,7 @@ private ActorResponse buildEOFResponse() { // set a dummy result with the keys. responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result .newBuilder() - .addAllKeys(List.of(this.keys)) + .addAllKeys(Arrays.asList(this.keys)) .build()); return new ActorResponse(responseBuilder.build()); } diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/OutputStreamObserverImpl.java b/src/main/java/io/numaproj/numaflow/sessionreducer/OutputStreamObserverImpl.java index 95f2307a..421ea073 100644 --- a/src/main/java/io/numaproj/numaflow/sessionreducer/OutputStreamObserverImpl.java +++ b/src/main/java/io/numaproj/numaflow/sessionreducer/OutputStreamObserverImpl.java @@ -10,7 +10,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.List; /** * OutputStreamObserverImpl transforms a message to an ActorResponse. @@ -42,7 +41,7 @@ private ActorResponse buildResponse(Message message, Sessionreduce.KeyedWindow k .addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList(message.getKeys())) .addAllTags( - message.getTags() == null ? new ArrayList<>() : List.of(message.getTags())) + message.getTags() == null ? new ArrayList<>() : Arrays.asList(message.getTags())) .build()); return ActorResponse.builder() .response(responseBuilder.build()) diff --git a/src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java b/src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java index f6708d4a..efa8a9d4 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java +++ b/src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java @@ -16,6 +16,7 @@ import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -157,7 +158,7 @@ public void onCompleted() { .newBuilder() .addAllKeys( datum.getKeys() - == null ? new ArrayList<>() : List.of(datum.getKeys())) + == null ? new ArrayList<>() : Arrays.asList(datum.getKeys())) .setValue(datum.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom( datum.getValue())) .setId(datum.getId()) diff --git a/src/main/java/io/numaproj/numaflow/sourcer/OutputObserverImpl.java b/src/main/java/io/numaproj/numaflow/sourcer/OutputObserverImpl.java index dbc004a0..14a23543 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/OutputObserverImpl.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/OutputObserverImpl.java @@ -7,8 +7,8 @@ import lombok.AllArgsConstructor; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; -import java.util.List; /** * OutputObserverImpl is the implementation of the OutputObserver interface. @@ -29,7 +29,7 @@ private SourceOuterClass.ReadResponse buildResponse(Message message) { .newBuilder() .setResult(SourceOuterClass.ReadResponse.Result.newBuilder() .addAllKeys(message.getKeys() - == null ? new ArrayList<>() : List.of(message.getKeys())) + == null ? new ArrayList<>() : Arrays.asList(message.getKeys())) .setPayload( message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom( message.getValue())) diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/SourceTransformerTestKit.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/SourceTransformerTestKit.java index 9ab1cc44..bc600a94 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/SourceTransformerTestKit.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/SourceTransformerTestKit.java @@ -13,6 +13,7 @@ import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -158,7 +159,7 @@ private Sourcetransformer.SourceTransformRequest createRequest( String requestId) { return Sourcetransformer.SourceTransformRequest.newBuilder().setRequest( Sourcetransformer.SourceTransformRequest.Request.newBuilder() - .addAllKeys(keys == null ? new ArrayList<>() : List.of(keys)) + .addAllKeys(keys == null ? new ArrayList<>() : Arrays.asList(keys)) .setValue(data.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(data.getValue())) .setEventTime( diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformerActor.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformerActor.java index b2e744cd..08f67a06 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformerActor.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformerActor.java @@ -9,7 +9,7 @@ import java.time.Instant; import java.util.ArrayList; -import java.util.List; +import java.util.Arrays; /** * TransformerActor is an actor that processes the SourceTransformRequest. @@ -114,9 +114,9 @@ private Sourcetransformer.SourceTransformResponse buildResponse( .getEpochSecond()) .setNanos(message.getEventTime().getNano())) .addAllKeys(message.getKeys() - == null ? new ArrayList<>() : List.of(message.getKeys())) + == null ? new ArrayList<>() : Arrays.asList(message.getKeys())) .addAllTags(message.getTags() - == null ? new ArrayList<>() : List.of(message.getTags())) + == null ? new ArrayList<>() : Arrays.asList(message.getTags())) .build()); }); return responseBuilder.setId(ID).build();