diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/Server.java b/src/main/java/io/numaproj/numaflow/batchmapper/Server.java index f74d1b90..1b7e551a 100644 --- a/src/main/java/io/numaproj/numaflow/batchmapper/Server.java +++ b/src/main/java/io/numaproj/numaflow/batchmapper/Server.java @@ -72,17 +72,19 @@ public void start() throws Exception { log.info("server started, listening on socket path: {}", this.grpcConfig.getSocketPath()); - // register shutdown hook to gracefully shut down the server - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - // Use stderr here since the logger may have been reset by its JVM shutdown hook. - System.err.println("*** shutting down gRPC server since JVM is shutting down"); - try { - this.stop(); - } catch (InterruptedException e) { - Thread.interrupted(); - e.printStackTrace(System.err); - } - })); + if (!this.grpcConfig.isLocal()) { + // register shutdown hook to gracefully shut down the server + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + this.stop(); + } catch (InterruptedException e) { + Thread.interrupted(); + e.printStackTrace(System.err); + } + })); + } // if there are any exceptions, shutdown the server gracefully. this.shutdownSignal.whenCompleteAsync((v, e) -> { diff --git a/src/main/java/io/numaproj/numaflow/mapper/Server.java b/src/main/java/io/numaproj/numaflow/mapper/Server.java index c5f638b8..7e22414f 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/Server.java +++ b/src/main/java/io/numaproj/numaflow/mapper/Server.java @@ -70,6 +70,21 @@ public void start() throws Exception { this.grpcConfig.getInfoFilePath(), ContainerType.MAPPER, Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE)); + + // register shutdown hook to gracefully shut down the server + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + this.stop(); + // FIXME - this is a workaround to immediately terminate the JVM process + // The correct way to do this is to stop all the actors and wait for them to terminate + System.exit(0); + } catch (InterruptedException e) { + Thread.interrupted(); + e.printStackTrace(System.err); + } + })); } this.server.start(); @@ -79,21 +94,6 @@ public void start() throws Exception { this.grpcConfig.isLocal() ? "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath()); - // register shutdown hook to gracefully shut down the server - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - // Use stderr here since the logger may have been reset by its JVM shutdown hook. - System.err.println("*** shutting down gRPC server since JVM is shutting down"); - try { - this.stop(); - // FIXME - this is a workaround to immediately terminate the JVM process - // The correct way to do this is to stop all the actors and wait for them to terminate - System.exit(0); - } catch (InterruptedException e) { - Thread.interrupted(); - e.printStackTrace(System.err); - } - })); - // if there are any exceptions, shutdown the server gracefully. this.shutdownSignal.whenCompleteAsync((v, e) -> { if (e != null) { diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java b/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java index 2f286761..73044cc9 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java @@ -72,17 +72,19 @@ public void start() throws Exception { log.info("server started, listening on socket path: {}", this.grpcConfig.getSocketPath()); - // register shutdown hook to gracefully shut down the server - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - // Use stderr here since the logger may have been reset by its JVM shutdown hook. - System.err.println("*** shutting down map streamer gRPC server since JVM is shutting down"); - try { - this.stop(); - } catch (InterruptedException e) { - Thread.interrupted(); - e.printStackTrace(System.err); - } - })); + if (!this.grpcConfig.isLocal()) { + // register shutdown hook to gracefully shut down the server + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down map streamer gRPC server since JVM is shutting down"); + try { + this.stop(); + } catch (InterruptedException e) { + Thread.interrupted(); + e.printStackTrace(System.err); + } + })); + } // if there are any exceptions, shutdown the server gracefully. this.shutdownSignal.whenCompleteAsync((v, e) -> { diff --git a/src/main/java/io/numaproj/numaflow/reducer/Server.java b/src/main/java/io/numaproj/numaflow/reducer/Server.java index 2aaad86d..c7d8afd5 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/Server.java +++ b/src/main/java/io/numaproj/numaflow/reducer/Server.java @@ -61,6 +61,18 @@ public void start() throws Exception { this.grpcConfig.getSocketPath(), this.grpcConfig.getInfoFilePath(), ContainerType.REDUCER); + + // register shutdown hook to gracefully shut down the server + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + this.stop(); + } catch (InterruptedException e) { + Thread.interrupted(); + e.printStackTrace(System.err); + } + })); } this.server.start(); @@ -69,18 +81,6 @@ public void start() throws Exception { "server started, listening on {}", grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); - - // register shutdown hook to gracefully shut down the server - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - // Use stderr here since the logger may have been reset by its JVM shutdown hook. - System.err.println("*** shutting down gRPC server since JVM is shutting down"); - try { - this.stop(); - } catch (InterruptedException e) { - Thread.interrupted(); - e.printStackTrace(System.err); - } - })); } /** diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java b/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java index a0946df8..a97ee70b 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java @@ -64,6 +64,18 @@ public void start() throws Exception { this.grpcConfig.getSocketPath(), this.grpcConfig.getInfoFilePath(), ContainerType.REDUCE_STREAMER); + + // register shutdown hook to gracefully shut down the server + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + this.stop(); + } catch (InterruptedException e) { + Thread.interrupted(); + e.printStackTrace(System.err); + } + })); } this.server.start(); @@ -72,18 +84,6 @@ public void start() throws Exception { "server started, listening on {}", this.grpcConfig.isLocal() ? "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath()); - - // register shutdown hook to gracefully shut down the server - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - // Use stderr here since the logger may have been reset by its JVM shutdown hook. - System.err.println("*** shutting down gRPC server since JVM is shutting down"); - try { - this.stop(); - } catch (InterruptedException e) { - Thread.interrupted(); - e.printStackTrace(System.err); - } - })); } /** diff --git a/src/main/java/io/numaproj/numaflow/servingstore/Server.java b/src/main/java/io/numaproj/numaflow/servingstore/Server.java index 3b1788a7..54be669c 100644 --- a/src/main/java/io/numaproj/numaflow/servingstore/Server.java +++ b/src/main/java/io/numaproj/numaflow/servingstore/Server.java @@ -76,6 +76,19 @@ public void start() throws Exception { this.grpcConfig.getSocketPath(), this.grpcConfig.getInfoFilePath(), ContainerType.SERVING); + + // register shutdown hook to gracefully shut down the server + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + // Use stderr here since the logger may have been reset by its JVM shutdown + // hook. + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + this.stop(); + } catch (InterruptedException e) { + Thread.interrupted(); + e.printStackTrace(System.err); + } + })); } this.server.start(); @@ -85,19 +98,6 @@ public void start() throws Exception { this.grpcConfig.isLocal() ? "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath()); - // register shutdown hook to gracefully shut down the server - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - // Use stderr here since the logger may have been reset by its JVM shutdown - // hook. - System.err.println("*** shutting down gRPC server since JVM is shutting down"); - try { - this.stop(); - } catch (InterruptedException e) { - Thread.interrupted(); - e.printStackTrace(System.err); - } - })); - // if there are any exceptions, shutdown the server gracefully. this.shutdownSignal.whenCompleteAsync((v, e) -> { if (e != null) { diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java b/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java index eb0a4308..d6599019 100644 --- a/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java +++ b/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java @@ -64,6 +64,18 @@ public void start() throws Exception { this.grpcConfig.getSocketPath(), this.grpcConfig.getInfoFilePath(), ContainerType.SESSION_REDUCER); + + // register shutdown hook to gracefully shut down the server + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + this.stop(); + } catch (InterruptedException e) { + Thread.interrupted(); + e.printStackTrace(System.err); + } + })); } this.server.start(); @@ -72,18 +84,6 @@ public void start() throws Exception { "server started, listening on {}", this.grpcConfig.isLocal() ? "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath()); - - // register shutdown hook to gracefully shut down the server - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - // Use stderr here since the logger may have been reset by its JVM shutdown hook. - System.err.println("*** shutting down gRPC server since JVM is shutting down"); - try { - this.stop(); - } catch (InterruptedException e) { - Thread.interrupted(); - e.printStackTrace(System.err); - } - })); } /** diff --git a/src/main/java/io/numaproj/numaflow/sideinput/Server.java b/src/main/java/io/numaproj/numaflow/sideinput/Server.java index be4394e2..9163ad54 100644 --- a/src/main/java/io/numaproj/numaflow/sideinput/Server.java +++ b/src/main/java/io/numaproj/numaflow/sideinput/Server.java @@ -62,6 +62,17 @@ public void start() throws Exception { this.grpcConfig.getSocketPath(), this.grpcConfig.getInfoFilePath(), ContainerType.SIDEINPUT); + + // register shutdown hook to gracefully shut down the server + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + try { + this.stop(); + } catch (InterruptedException e) { + Thread.interrupted(); + e.printStackTrace(System.err); + } + })); } this.server.start(); @@ -69,17 +80,6 @@ public void start() throws Exception { log.info( "server started, listening on {}", this.grpcConfig.isLocal() ? "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath()); - - // register shutdown hook to gracefully shut down the server - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - // Use stderr here since the logger may have been reset by its JVM shutdown hook. - try { - this.stop(); - } catch (InterruptedException e) { - Thread.interrupted(); - e.printStackTrace(System.err); - } - })); } /** diff --git a/src/main/java/io/numaproj/numaflow/sinker/Server.java b/src/main/java/io/numaproj/numaflow/sinker/Server.java index cafda94f..6805016c 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Server.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Server.java @@ -58,6 +58,18 @@ public void start() throws Exception { grpcConfig.getSocketPath(), grpcConfig.getInfoFilePath(), ContainerType.SINKER); + + // register shutdown hook to gracefully shut down the server + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down sink gRPC server since JVM is shutting down"); + try { + this.stop(); + } catch (InterruptedException e) { + Thread.interrupted(); + e.printStackTrace(System.err); + } + })); } server.start(); @@ -67,18 +79,6 @@ public void start() throws Exception { grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); - // register shutdown hook to gracefully shut down the server - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - // Use stderr here since the logger may have been reset by its JVM shutdown hook. - System.err.println("*** shutting down sink gRPC server since JVM is shutting down"); - try { - this.stop(); - } catch (InterruptedException e) { - Thread.interrupted(); - e.printStackTrace(System.err); - } - })); - // if there are any exceptions, shutdown the server gracefully. shutdownSignal.whenCompleteAsync((v, e) -> { if (e != null) { diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Server.java b/src/main/java/io/numaproj/numaflow/sourcer/Server.java index 25978766..dcbd503c 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Server.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Server.java @@ -66,6 +66,18 @@ public void start() throws Exception { this.grpcConfig.getSocketPath(), this.grpcConfig.getInfoFilePath(), ContainerType.SOURCER); + + // register shutdown hook to gracefully shut down the server + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down source gRPC server since JVM is shutting down"); + try { + this.stop(); + } catch (InterruptedException e) { + Thread.interrupted(); + e.printStackTrace(System.err); + } + })); } this.server.start(); @@ -75,18 +87,6 @@ public void start() throws Exception { this.grpcConfig.isLocal() ? "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath()); - // register shutdown hook to gracefully shut down the server - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - // Use stderr here since the logger may have been reset by its JVM shutdown hook. - System.err.println("*** shutting down source gRPC server since JVM is shutting down"); - try { - this.stop(); - } catch (InterruptedException e) { - Thread.interrupted(); - e.printStackTrace(System.err); - } - })); - // if there are any exceptions, shutdown the server gracefully. this.shutdownSignal.whenCompleteAsync((v, e) -> { if (e != null) { diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java index 43731c17..7baa3927 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java @@ -66,6 +66,21 @@ public void start() throws Exception { this.grpcConfig.getSocketPath(), this.grpcConfig.getInfoFilePath(), ContainerType.SOURCE_TRANSFORMER); + + // register shutdown hook to gracefully shut down the server + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + this.stop(); + // FIXME - this is a workaround to immediately terminate the JVM process + // The correct way to do this is to stop all the actors and wait for them to terminate + System.exit(0); + } catch (InterruptedException e) { + Thread.interrupted(); + e.printStackTrace(System.err); + } + })); } this.server.start(); @@ -75,21 +90,6 @@ public void start() throws Exception { this.grpcConfig.isLocal() ? "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath()); - // register shutdown hook to gracefully shut down the server - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - // Use stderr here since the logger may have been reset by its JVM shutdown hook. - System.err.println("*** shutting down gRPC server since JVM is shutting down"); - try { - this.stop(); - // FIXME - this is a workaround to immediately terminate the JVM process - // The correct way to do this is to stop all the actors and wait for them to terminate - System.exit(0); - } catch (InterruptedException e) { - Thread.interrupted(); - e.printStackTrace(System.err); - } - })); - // if there are any exceptions, shutdown the server gracefully. this.shutdownSignal.whenCompleteAsync((v, e) -> { if (e != null) {