forked from numaproj/numaflow-java
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathServer.java
More file actions
125 lines (111 loc) · 4.59 KB
/
Server.java
File metadata and controls
125 lines (111 loc) · 4.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package io.numaproj.numaflow.mapstreamer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ServerInterceptor;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.shared.GrpcServerUtils;
import io.numaproj.numaflow.shared.GrpcServerWrapper;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
/**
* Server is the gRPC server for executing map operation.
*/
@Slf4j
public class Server {
private final GRPCConfig grpcConfig;
private final CompletableFuture<Void> shutdownSignal;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private final GrpcServerWrapper server;
/**
* constructor to create sink gRPC server.
*
* @param mapStreamer to process the message
*/
public Server(MapStreamer mapStreamer) {
this(mapStreamer, GRPCConfig.defaultGrpcConfig());
}
/**
* constructor to create sink gRPC server with gRPC config.
*
* @param grpcConfig to configure the max message size for grpc
* @param mapStreamer to process the message
*/
public Server(MapStreamer mapStreamer, GRPCConfig grpcConfig) {
this.shutdownSignal = new CompletableFuture<>();
this.grpcConfig = grpcConfig;
this.server = new GrpcServerWrapper(this.grpcConfig, new Service(mapStreamer, this.shutdownSignal));
}
@VisibleForTesting
protected Server(GRPCConfig grpcConfig, MapStreamer service, ServerInterceptor interceptor, String serverName) {
this.grpcConfig = grpcConfig;
this.shutdownSignal = new CompletableFuture<>();
this.server = new GrpcServerWrapper(
interceptor,
serverName,
new Service(service, this.shutdownSignal));
}
/**
* Start serving requests.
*
* @throws Exception if server fails to start
*/
public void start() throws Exception {
GrpcServerUtils.writeServerInfo(
this.serverInfoAccessor,
this.grpcConfig.getSocketPath(),
this.grpcConfig.getInfoFilePath(),
ContainerType.MAPPER,
Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE));
this.server.start();
log.info("server started, listening on socket path: {}", this.grpcConfig.getSocketPath());
if (!this.grpcConfig.isLocal()) {
// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down map streamer gRPC server since JVM is shutting down");
try {
this.stop();
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
}
}));
}
// if there are any exceptions, shutdown the server gracefully.
this.shutdownSignal.whenCompleteAsync((v, e) -> {
if (e != null) {
System.err.println("*** shutting down map streamer gRPC server because of an exception - " + e.getMessage());
try {
this.stop();
} catch (InterruptedException ex) {
Thread.interrupted();
ex.printStackTrace(System.err);
}
}
});
}
/**
* Blocks until the server has terminated. If the server is already terminated, this method
* will return immediately. If the server is not yet terminated, this method will block the
* calling thread until the server has terminated.
*
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public void awaitTermination() throws InterruptedException {
log.info("map stream server is waiting for termination");
this.server.awaitTermination();
log.info("map stream server has terminated");
}
/**
* Stop serving requests and shutdown resources. Await termination on the main thread since the
* grpc library uses daemon threads.
*
* @throws InterruptedException if shutdown is interrupted
*/
public void stop() throws InterruptedException {
this.server.gracefullyShutdown();
}
}