Skip to content

Commit 04e11f8

Browse files
committed
simplify thread pool shutdown
1 parent ecb622f commit 04e11f8

24 files changed

Lines changed: 190 additions & 37 deletions

File tree

chainbase/src/main/java/org/tron/common/storage/metric/DbStatService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public void register(DbSourceInter<byte[]> db) {
3030

3131
public void shutdown() {
3232
if (Metrics.enabled()) {
33-
ExecutorServiceManager.shutdownAndAwaitTermination(statExecutor, esName);
33+
ExecutorServiceManager.shutdownAndAwaitTermination(esName);
3434
}
3535
}
3636
}

chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,8 @@ public synchronized void disable() {
272272

273273
@Override
274274
public void shutdown() {
275-
ExecutorServiceManager.shutdownAndAwaitTermination(pruneCheckpointThread, pruneName);
276-
flushServices.forEach((key, value) -> ExecutorServiceManager.shutdownAndAwaitTermination(value,
275+
ExecutorServiceManager.shutdownAndAwaitTermination(pruneName);
276+
flushServices.keySet().forEach(key -> ExecutorServiceManager.shutdownAndAwaitTermination(
277277
"flush-service-" + key));
278278
logger.info("******** Begin to pop revokingDb. ********");
279279
logger.info("******** Before revokingDb size: {}.", size);

common/src/main/java/org/tron/common/es/ExecutorServiceManager.java

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,69 @@
11
package org.tron.common.es;
22

33
import com.google.common.util.concurrent.ThreadFactoryBuilder;
4+
import java.util.Map;
45
import java.util.concurrent.BlockingQueue;
6+
import java.util.concurrent.ConcurrentHashMap;
57
import java.util.concurrent.ExecutorService;
68
import java.util.concurrent.Executors;
79
import java.util.concurrent.ScheduledExecutorService;
810
import java.util.concurrent.ThreadPoolExecutor;
911
import java.util.concurrent.TimeUnit;
12+
import javax.annotation.PreDestroy;
1013
import lombok.extern.slf4j.Slf4j;
14+
import org.springframework.stereotype.Component;
1115

1216
@Slf4j(topic = "common-executor")
17+
@Component
1318
public class ExecutorServiceManager {
1419

20+
static final Map<String, ExecutorService> TO_BE_RELEASED = new ConcurrentHashMap<>();
21+
22+
private static <T extends ExecutorService> T addToBeReleased(String name, T es) {
23+
if (TO_BE_RELEASED.putIfAbsent(name, es) != null) {
24+
throw new DuplicateExecutorServiceException(name);
25+
}
26+
return es;
27+
}
28+
29+
@PreDestroy
30+
public static synchronized void release() {
31+
TO_BE_RELEASED.forEach((name, executorService) -> shutdownAndAwaitTermination(name));
32+
}
33+
34+
public static class DuplicateExecutorServiceException extends RuntimeException {
35+
public DuplicateExecutorServiceException(String name) {
36+
super("Duplicate executor service name: " + name);
37+
}
38+
}
39+
1540
public static ExecutorService newSingleThreadExecutor(String name) {
1641
return newSingleThreadExecutor(name, false);
1742
}
1843

1944
public static ExecutorService newSingleThreadExecutor(String name, boolean isDaemon) {
20-
return Executors.newSingleThreadExecutor(
21-
new ThreadFactoryBuilder().setNameFormat(name).setDaemon(isDaemon).build());
45+
return addToBeReleased(name, Executors.newSingleThreadExecutor(
46+
new ThreadFactoryBuilder().setNameFormat(name).setDaemon(isDaemon).build()));
2247
}
2348

24-
2549
public static ScheduledExecutorService newSingleThreadScheduledExecutor(String name) {
2650
return newSingleThreadScheduledExecutor(name, false);
2751
}
2852

2953
public static ScheduledExecutorService newSingleThreadScheduledExecutor(String name,
3054
boolean isDaemon) {
31-
return Executors.newSingleThreadScheduledExecutor(
32-
new ThreadFactoryBuilder().setNameFormat(name).setDaemon(isDaemon).build());
55+
return addToBeReleased(name, Executors.newSingleThreadScheduledExecutor(
56+
new ThreadFactoryBuilder().setNameFormat(name).setDaemon(isDaemon).build()));
3357
}
3458

3559
public static ExecutorService newFixedThreadPool(String name, int fixThreads) {
3660
return newFixedThreadPool(name, fixThreads, false);
3761
}
3862

3963
public static ExecutorService newFixedThreadPool(String name, int fixThreads, boolean isDaemon) {
40-
return Executors.newFixedThreadPool(fixThreads,
41-
new ThreadFactoryBuilder().setNameFormat(name + "-%d").setDaemon(isDaemon).build());
64+
return addToBeReleased(name, Executors.newFixedThreadPool(fixThreads,
65+
new ThreadFactoryBuilder().setNameFormat(name + (fixThreads == 1 ? "" : "-%d"))
66+
.setDaemon(isDaemon).build()));
4267
}
4368

4469
public static ExecutorService newThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
@@ -53,11 +78,13 @@ public static ExecutorService newThreadPoolExecutor(int corePoolSize, int maximu
5378
long keepAliveTime, TimeUnit unit,
5479
BlockingQueue<Runnable> workQueue,
5580
String name, boolean isDaemon) {
56-
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
57-
new ThreadFactoryBuilder().setNameFormat(name + "-%d").setDaemon(isDaemon).build());
81+
return addToBeReleased(name,
82+
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
83+
new ThreadFactoryBuilder().setNameFormat(name + "-%d").setDaemon(isDaemon).build()));
5884
}
5985

60-
public static void shutdownAndAwaitTermination(ExecutorService pool, String name) {
86+
public static void shutdownAndAwaitTermination(String name) {
87+
ExecutorService pool = TO_BE_RELEASED.remove(name);
6188
if (pool == null) {
6289
return;
6390
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package org.tron.common.annotation;
2+
3+
import java.lang.annotation.ElementType;
4+
import java.lang.annotation.Retention;
5+
import java.lang.annotation.RetentionPolicy;
6+
import java.lang.annotation.Target;
7+
8+
/**
9+
* JUnit 4 equivalent of JUnit 5's {@code org.junit.jupiter.api.Order}
10+
*/
11+
@Retention(RetentionPolicy.RUNTIME)
12+
@Target({ ElementType.METHOD })
13+
public @interface Order {
14+
/**
15+
* Default order value for elements not explicitly annotated with {@code @Order}.
16+
*
17+
* @see Order#value
18+
*/
19+
int DEFAULT = 0;
20+
21+
/**
22+
* The order value for the annotated element.
23+
* <p>Elements are ordered based on priority where a lower value has greater
24+
* priority than a higher value. For example, {@link Integer#MAX_VALUE} has
25+
* the lowest priority.
26+
*
27+
* @see #DEFAULT
28+
*/
29+
int value();
30+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.tron.common.annotation;
2+
3+
import org.junit.runner.Description;
4+
import org.junit.runner.manipulation.Ordering;
5+
import org.junit.runner.manipulation.Sorter;
6+
7+
import java.util.Comparator;
8+
import java.util.Optional;
9+
10+
/**
11+
* Order test methods by their {@link Order} annotation. The lower value has the highest priority.
12+
* The tests that are not annotated get the default value {@link Order#DEFAULT}.
13+
*/
14+
public class OrderAnnotation extends Sorter implements Ordering.Factory {
15+
public OrderAnnotation() {
16+
super(COMPARATOR);
17+
}
18+
19+
@Override
20+
public Ordering create(Context context) {
21+
return this;
22+
}
23+
24+
private static final Comparator<Description> COMPARATOR = Comparator.comparingInt(
25+
description -> Optional.ofNullable(description.getAnnotation(Order.class))
26+
.map(Order::value)
27+
.orElse(Order.DEFAULT));
28+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package org.tron.common.es;
2+
3+
import java.util.concurrent.LinkedBlockingQueue;
4+
import java.util.concurrent.TimeUnit;
5+
import org.junit.AfterClass;
6+
import org.junit.Assert;
7+
import org.junit.Rule;
8+
import org.junit.Test;
9+
import org.junit.rules.ExpectedException;
10+
import org.junit.runner.OrderWith;
11+
import org.tron.common.annotation.Order;
12+
import org.tron.common.annotation.OrderAnnotation;
13+
14+
@OrderWith(OrderAnnotation.class)
15+
public class ExecutorServiceManagerTest {
16+
17+
@Rule
18+
public ExpectedException exceptionRule = ExpectedException.none();
19+
20+
@Test
21+
@Order(1)
22+
public void newSingleThreadExecutor() {
23+
ExecutorServiceManager.newSingleThreadExecutor("test1");
24+
ExecutorServiceManager.newSingleThreadExecutor("test2");
25+
ExecutorServiceManager.shutdownAndAwaitTermination("test1");
26+
Assert.assertEquals(1, ExecutorServiceManager.TO_BE_RELEASED.size());
27+
exceptionRule.expect(ExecutorServiceManager.DuplicateExecutorServiceException.class);
28+
ExecutorServiceManager.newSingleThreadExecutor("test2", true);
29+
}
30+
31+
@Test
32+
@Order(2)
33+
public void newFixedThreadPool() {
34+
ExecutorServiceManager.newFixedThreadPool("test1", 1);
35+
ExecutorServiceManager.newFixedThreadPool("test3", 2, true);
36+
ExecutorServiceManager.shutdownAndAwaitTermination("test1");
37+
Assert.assertEquals(2, ExecutorServiceManager.TO_BE_RELEASED.size());
38+
}
39+
40+
@Test
41+
@Order(3)
42+
public void newSingleThreadScheduledExecutor() {
43+
ExecutorServiceManager.newSingleThreadScheduledExecutor("test1");
44+
ExecutorServiceManager.newSingleThreadScheduledExecutor("test4", true);
45+
Assert.assertEquals(4, ExecutorServiceManager.TO_BE_RELEASED.size());
46+
ExecutorServiceManager.shutdownAndAwaitTermination("test1");
47+
Assert.assertEquals(3, ExecutorServiceManager.TO_BE_RELEASED.size());
48+
}
49+
50+
@Test
51+
@Order(4)
52+
public void newThreadPoolExecutor() {
53+
ExecutorServiceManager.newThreadPoolExecutor( 1, 1, 1,
54+
TimeUnit.SECONDS,new LinkedBlockingQueue<>(), "test1");
55+
}
56+
57+
@Test
58+
@Order(5)
59+
public void shutdownAndAwaitTermination() {
60+
ExecutorServiceManager.shutdownAndAwaitTermination( "test-not-exist");
61+
Assert.assertEquals(4, ExecutorServiceManager.TO_BE_RELEASED.size());
62+
}
63+
64+
@AfterClass
65+
public static void release() {
66+
ExecutorServiceManager.release();
67+
}
68+
}

consensus/src/main/java/org/tron/consensus/dpos/DposTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void init() {
7777

7878
public void stop() {
7979
isRunning = false;
80-
ExecutorServiceManager.shutdownAndAwaitTermination(produceExecutor, name);
80+
ExecutorServiceManager.shutdownAndAwaitTermination(name);
8181
}
8282

8383
private State produceBlock() {

consensus/src/main/java/org/tron/consensus/pbft/PbftManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public boolean verifyMsg(PbftBaseMessage msg) {
114114

115115
@Override
116116
public void close() {
117-
ExecutorServiceManager.shutdownAndAwaitTermination(executorService, esName);
117+
ExecutorServiceManager.shutdownAndAwaitTermination(esName);
118118
}
119119

120120
}

framework/src/main/java/org/tron/common/backup/BackupManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public void handleEvent(UdpEvent udpEvent) {
148148
}
149149

150150
public void stop() {
151-
ExecutorServiceManager.shutdownAndAwaitTermination(executorService, esName);
151+
ExecutorServiceManager.shutdownAndAwaitTermination(esName);
152152
}
153153

154154
@Override

framework/src/main/java/org/tron/common/backup/socket/BackupServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public void initChannel(NioDatagramChannel ch)
9595
public void close() {
9696
logger.info("Closing backup server...");
9797
shutdown = true;
98-
ExecutorServiceManager.shutdownAndAwaitTermination(executor, name);
98+
ExecutorServiceManager.shutdownAndAwaitTermination(name);
9999
backupManager.stop();
100100
if (channel != null) {
101101
try {

0 commit comments

Comments
 (0)