diff --git a/src/main/java/io/numaproj/numaflow/mapper/Server.java b/src/main/java/io/numaproj/numaflow/mapper/Server.java index 042bf9a2..265b7c18 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/Server.java +++ b/src/main/java/io/numaproj/numaflow/mapper/Server.java @@ -111,6 +111,10 @@ public void awaitTermination() throws InterruptedException { public void stop() throws InterruptedException { if (server != null) { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + // force shutdown if not terminated + if (!server.isTerminated()) { + server.shutdownNow(); + } } } diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java b/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java index 29fcdc96..98f43ec0 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java @@ -104,6 +104,10 @@ public void awaitTermination() throws InterruptedException { public void stop() throws InterruptedException { if (server != null) { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + // force shutdown if not terminated + if (!server.isTerminated()) { + server.shutdownNow(); + } } } diff --git a/src/main/java/io/numaproj/numaflow/reducer/Server.java b/src/main/java/io/numaproj/numaflow/reducer/Server.java index 78d9f63a..443322fc 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/Server.java +++ b/src/main/java/io/numaproj/numaflow/reducer/Server.java @@ -108,6 +108,10 @@ public void awaitTermination() throws InterruptedException { public void stop() throws InterruptedException { if (server != null) { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + // force shutdown if not terminated + if (!server.isTerminated()) { + server.shutdownNow(); + } } } diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java b/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java index f48efec1..f3825a9b 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java @@ -111,6 +111,10 @@ public void awaitTermination() throws InterruptedException { public void stop() throws InterruptedException { if (server != null) { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + // force shutdown if not terminated + if (!server.isTerminated()) { + server.shutdownNow(); + } } } diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java b/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java index b66d8d21..b0953695 100644 --- a/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java +++ b/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java @@ -111,6 +111,10 @@ public void awaitTermination() throws InterruptedException { public void stop() throws InterruptedException { if (server != null) { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + // force shutdown if not terminated + if (!server.isTerminated()) { + server.shutdownNow(); + } } } diff --git a/src/main/java/io/numaproj/numaflow/sideinput/Server.java b/src/main/java/io/numaproj/numaflow/sideinput/Server.java index 78ac6f65..23b20f6a 100644 --- a/src/main/java/io/numaproj/numaflow/sideinput/Server.java +++ b/src/main/java/io/numaproj/numaflow/sideinput/Server.java @@ -108,6 +108,9 @@ public void awaitTermination() throws InterruptedException { public void stop() throws InterruptedException { if (server != null) { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + if (!server.isTerminated()) { + server.shutdownNow(); + } } } diff --git a/src/main/java/io/numaproj/numaflow/sinker/Server.java b/src/main/java/io/numaproj/numaflow/sinker/Server.java index 58eae576..5c78dfd8 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Server.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Server.java @@ -108,6 +108,10 @@ public void stop() throws InterruptedException { this.service.shutDown(); if (server != null) { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + // force shutdown if not terminated + if (!server.isTerminated()) { + server.shutdownNow(); + } } } diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Server.java b/src/main/java/io/numaproj/numaflow/sourcer/Server.java index 5fbdbbff..855fa6e4 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Server.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Server.java @@ -108,6 +108,10 @@ public void awaitTermination() throws InterruptedException { public void stop() throws InterruptedException { if (server != null) { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + // force shutdown if not terminated + if (!server.isTerminated()) { + server.shutdownNow(); + } } } diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java index 93f84b23..aa02edba 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java @@ -108,6 +108,10 @@ public void awaitTermination() throws InterruptedException { public void stop() throws InterruptedException { if (server != null) { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + // force shutdown if not terminated + if (!server.isTerminated()) { + server.shutdownNow(); + } } }