diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java index 82a0bf9cfb641..0b4d94122ee2f 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java @@ -53,6 +53,8 @@ public class KubernetesLeaderElector { @VisibleForTesting public static final String LEADER_ANNOTATION_KEY = "control-plane.alpha.kubernetes.io/leader"; + private final Object lock = new Object(); + private final ExecutorService executorService = Executors.newSingleThreadExecutor( new ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService")); @@ -92,11 +94,20 @@ public KubernetesLeaderElector( } public void run() { - executorService.submit(internalLeaderElector::run); + synchronized (lock) { + if (executorService.isShutdown()) { + LOG.debug( + "Ignoring KubernetesLeaderElector.run call because the leader elector has already been shut down."); + } else { + executorService.execute(internalLeaderElector::run); + } + } } public void stop() { - executorService.shutdownNow(); + synchronized (lock) { + executorService.shutdownNow(); + } } public static boolean hasLeadership(KubernetesConfigMap configMap, String lockIdentity) {