Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

class OutputStreamObserverImpl implements OutputStreamObserver {
private final ActorRef outputActor;
Expand Down Expand Up @@ -58,7 +57,7 @@ private AccumulatorOuterClass.AccumulatorResponse buildResponse(Message message)
.setId(message.getId())
.build())
.addAllTags(
message.getTags() == null ? new ArrayList<>() : List.of(message.getTags()))
message.getTags() == null ? new ArrayList<>() : Arrays.asList(message.getTags()))
.build();
}

Expand Down
5 changes: 3 additions & 2 deletions src/main/java/io/numaproj/numaflow/batchmapper/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -150,9 +151,9 @@ private void buildAndStreamResponse(
== null ? ByteString.EMPTY : ByteString.copyFrom(
res.getValue()))
.addAllKeys(res.getKeys()
== null ? new ArrayList<>() : List.of(res.getKeys()))
== null ? new ArrayList<>() : Arrays.asList(res.getKeys()))
.addAllTags(res.getTags()
== null ? new ArrayList<>() : List.of(res.getTags()))
== null ? new ArrayList<>() : Arrays.asList(res.getTags()))
.build()
);
});
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/numaproj/numaflow/mapper/MapperActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Arrays;

/**
* Mapper actor that processes the map request. It invokes the mapper to process the request and
Expand Down Expand Up @@ -86,9 +86,9 @@ private MapOuterClass.MapResponse buildResponse(MessageList messageList, String
.setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
message.getValue()))
.addAllKeys(message.getKeys()
== null ? new ArrayList<>() : List.of(message.getKeys()))
== null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
.addAllTags(message.getTags()
== null ? new ArrayList<>() : List.of(message.getTags()))
== null ? new ArrayList<>() : Arrays.asList(message.getTags()))
.build());
});
return responseBuilder.setId(ID).build();
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -160,7 +161,7 @@ private MapOuterClass.MapRequest createRequest(
String requestId) {
return MapOuterClass.MapRequest.newBuilder().setRequest(
MapOuterClass.MapRequest.Request.newBuilder()
.addAllKeys(keys == null ? new ArrayList<>() : List.of(keys))
.addAllKeys(keys == null ? new ArrayList<>() : Arrays.asList(keys))
.setValue(data.getValue()
== null ? ByteString.EMPTY : ByteString.copyFrom(data.getValue()))
.setEventTime(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import lombok.AllArgsConstructor;

import java.util.ArrayList;
import java.util.List;
import java.util.Arrays;

/**
* Implementation of the OutputObserver interface.
Expand Down Expand Up @@ -36,9 +36,9 @@ public void send(Message message) {
message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
message.getValue()))
.addAllKeys(message.getKeys()
== null ? new ArrayList<>() : List.of(message.getKeys()))
== null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
.addAllTags(message.getTags()
== null ? new ArrayList<>() : List.of(message.getTags()))
== null ? new ArrayList<>() : Arrays.asList(message.getTags()))
.build()).build();
supervisorActor.tell(response, ActorRef.noSender());
}
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* Reduce actor invokes the reducer and returns the result.
Expand Down Expand Up @@ -70,7 +69,7 @@ private ActorResponse buildActorResponse(Message message) {
.addAllKeys(message.getKeys()
== null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
.addAllTags(
message.getTags() == null ? new ArrayList<>() : List.of(message.getTags()))
message.getTags() == null ? new ArrayList<>() : Arrays.asList(message.getTags()))
.build());
return new ActorResponse(responseBuilder.build());
}
Expand All @@ -89,7 +88,7 @@ private ActorResponse buildEOFActorResponse() {
// set a dummy result with the keys.
responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result
.newBuilder()
.addAllKeys(List.of(this.keys))
.addAllKeys(Arrays.asList(this.keys))
.build());
return new ActorResponse(responseBuilder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

@AllArgsConstructor
class OutputStreamObserverImpl implements OutputStreamObserver {
Expand Down Expand Up @@ -42,7 +41,7 @@ private ActorResponse buildResponse(Message message) {
.addAllKeys(message.getKeys()
== null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
.addAllTags(
message.getTags() == null ? new ArrayList<>() : List.of(message.getTags()))
message.getTags() == null ? new ArrayList<>() : Arrays.asList(message.getTags()))
.build());
return new ActorResponse(responseBuilder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer;
import lombok.AllArgsConstructor;

import java.util.List;
import java.util.Arrays;

/**
* Reduce streamer actor invokes user defined functions to handle reduce requests.
Expand Down Expand Up @@ -71,7 +71,7 @@ private ActorResponse buildEOFResponse() {
// set a dummy result with the keys.
responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result
.newBuilder()
.addAllKeys(List.of(this.keys))
.addAllKeys(Arrays.asList(this.keys))
.build());
return new ActorResponse(responseBuilder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* OutputStreamObserverImpl transforms a message to an ActorResponse.
Expand Down Expand Up @@ -42,7 +41,7 @@ private ActorResponse buildResponse(Message message, Sessionreduce.KeyedWindow k
.addAllKeys(message.getKeys()
== null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
.addAllTags(
message.getTags() == null ? new ArrayList<>() : List.of(message.getTags()))
message.getTags() == null ? new ArrayList<>() : Arrays.asList(message.getTags()))
.build());
return ActorResponse.builder()
.response(responseBuilder.build())
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -157,7 +158,7 @@
.newBuilder()
.addAllKeys(
datum.getKeys()
== null ? new ArrayList<>() : List.of(datum.getKeys()))
== null ? new ArrayList<>() : Arrays.asList(datum.getKeys()))

Check warning on line 161 in src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java#L161

Added line #L161 was not covered by tests
.setValue(datum.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
datum.getValue()))
.setId(datum.getId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import lombok.AllArgsConstructor;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

/**
* OutputObserverImpl is the implementation of the OutputObserver interface.
Expand All @@ -29,7 +29,7 @@ private SourceOuterClass.ReadResponse buildResponse(Message message) {
.newBuilder()
.setResult(SourceOuterClass.ReadResponse.Result.newBuilder()
.addAllKeys(message.getKeys()
== null ? new ArrayList<>() : List.of(message.getKeys()))
== null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
.setPayload(
message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
message.getValue()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -158,7 +159,7 @@ private Sourcetransformer.SourceTransformRequest createRequest(
String requestId) {
return Sourcetransformer.SourceTransformRequest.newBuilder().setRequest(
Sourcetransformer.SourceTransformRequest.Request.newBuilder()
.addAllKeys(keys == null ? new ArrayList<>() : List.of(keys))
.addAllKeys(keys == null ? new ArrayList<>() : Arrays.asList(keys))
.setValue(data.getValue()
== null ? ByteString.EMPTY : ByteString.copyFrom(data.getValue()))
.setEventTime(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Arrays;

/**
* TransformerActor is an actor that processes the SourceTransformRequest.
Expand Down Expand Up @@ -114,9 +114,9 @@ private Sourcetransformer.SourceTransformResponse buildResponse(
.getEpochSecond())
.setNanos(message.getEventTime().getNano()))
.addAllKeys(message.getKeys()
== null ? new ArrayList<>() : List.of(message.getKeys()))
== null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
.addAllTags(message.getTags()
== null ? new ArrayList<>() : List.of(message.getTags()))
== null ? new ArrayList<>() : Arrays.asList(message.getTags()))
.build());
});
return responseBuilder.setId(ID).build();
Expand Down
Loading