Skip to content

Commit dbbb75d

Browse files
committed
feat(*): tune single Thread into SingleThreadExecutor
1 parent 61c6177 commit dbbb75d

5 files changed

Lines changed: 106 additions & 38 deletions

File tree

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package org.tron.common.es;
2+
3+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
4+
import java.util.concurrent.ExecutorService;
5+
import java.util.concurrent.Executors;
6+
import java.util.concurrent.ScheduledExecutorService;
7+
import lombok.extern.slf4j.Slf4j;
8+
9+
@Slf4j(topic = "common")
10+
public class ExecutorServiceManager {
11+
12+
public static ExecutorService newSingleThreadExecutor(String name) {
13+
return newSingleThreadExecutor(name, false);
14+
}
15+
16+
public static ExecutorService newSingleThreadExecutor(String name, boolean isDaemon) {
17+
return Executors.newSingleThreadExecutor(
18+
new ThreadFactoryBuilder().setNameFormat(name).setDaemon(isDaemon).build());
19+
}
20+
21+
22+
public static ScheduledExecutorService newSingleThreadScheduledExecutor(String name) {
23+
return newSingleThreadScheduledExecutor(name, false);
24+
}
25+
26+
public static ScheduledExecutorService newSingleThreadScheduledExecutor(String name,
27+
boolean isDaemon) {
28+
return Executors.newSingleThreadScheduledExecutor(
29+
new ThreadFactoryBuilder().setNameFormat(name).setDaemon(isDaemon).build());
30+
}
31+
32+
public static void shutdownAndAwaitTermination(ExecutorService pool, String name) {
33+
if (pool == null) {
34+
return;
35+
}
36+
pool.shutdown(); // Disable new tasks from being submitted
37+
try {
38+
// Wait a while for existing tasks to terminate
39+
if (!pool.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) {
40+
pool.shutdownNow(); // Cancel currently executing tasks
41+
// Wait a while for tasks to respond to being cancelled
42+
if (!pool.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) {
43+
logger.warn("Pool {} did not terminate", name);
44+
}
45+
}
46+
} catch (InterruptedException ie) {
47+
// (Re-)Cancel if current thread also interrupted
48+
pool.shutdownNow();
49+
// Preserve interrupt status
50+
Thread.currentThread().interrupt();
51+
}
52+
}
53+
}

consensus/src/main/java/org/tron/consensus/dpos/DposTask.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
import static org.tron.core.config.Parameter.ChainConstant.BLOCK_PRODUCED_INTERVAL;
44

55
import com.google.protobuf.ByteString;
6+
import java.util.concurrent.ExecutorService;
67
import lombok.Setter;
78
import lombok.extern.slf4j.Slf4j;
89
import org.joda.time.DateTime;
910
import org.springframework.beans.factory.annotation.Autowired;
1011
import org.springframework.stereotype.Component;
11-
import org.springframework.util.StringUtils;
12+
import org.springframework.util.ObjectUtils;
13+
import org.tron.common.es.ExecutorServiceManager;
1214
import org.tron.common.parameter.CommonParameter;
1315
import org.tron.common.utils.ByteArray;
1416
import org.tron.common.utils.Sha256Hash;
@@ -34,16 +36,18 @@ public class DposTask {
3436
@Setter
3537
private DposService dposService;
3638

37-
private Thread produceThread;
39+
private ExecutorService produceExecutor;
40+
41+
private final String name = "DPosMiner";
3842

3943
private volatile boolean isRunning = true;
4044

4145
public void init() {
4246

43-
if (!dposService.isEnable() || StringUtils.isEmpty(dposService.getMiners())) {
47+
if (!dposService.isEnable() || ObjectUtils.isEmpty(dposService.getMiners())) {
4448
return;
4549
}
46-
50+
produceExecutor = ExecutorServiceManager.newSingleThreadExecutor(name);
4751
Runnable runnable = () -> {
4852
while (isRunning) {
4953
try {
@@ -67,17 +71,15 @@ public void init() {
6771
}
6872
}
6973
};
70-
produceThread = new Thread(runnable, "DPosMiner");
71-
produceThread.start();
74+
produceExecutor.submit(runnable);
7275
logger.info("DPoS task started.");
7376
}
7477

7578
public void stop() {
79+
logger.info("DPoS task shutdown...");
7680
isRunning = false;
77-
if (produceThread != null) {
78-
produceThread.interrupt();
79-
}
80-
logger.info("DPoS task stopped.");
81+
ExecutorServiceManager.shutdownAndAwaitTermination(produceExecutor, name);
82+
logger.info("DPoS task shutdown complete");
8183
}
8284

8385
private State produceBlock() {

framework/src/main/java/org/tron/common/backup/socket/BackupServer.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
import io.netty.channel.socket.nio.NioDatagramChannel;
88
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
99
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
10+
import java.util.concurrent.ExecutorService;
1011
import java.util.concurrent.TimeUnit;
1112
import lombok.extern.slf4j.Slf4j;
1213
import org.springframework.beans.factory.annotation.Autowired;
1314
import org.springframework.stereotype.Component;
1415
import org.tron.common.backup.BackupManager;
16+
import org.tron.common.es.ExecutorServiceManager;
1517
import org.tron.common.parameter.CommonParameter;
1618
import org.tron.p2p.stats.TrafficStats;
1719

@@ -29,20 +31,24 @@ public class BackupServer {
2931

3032
private volatile boolean shutdown = false;
3133

34+
private final String name = "BackupServer";
35+
private ExecutorService executor;
36+
3237
@Autowired
3338
public BackupServer(final BackupManager backupManager) {
3439
this.backupManager = backupManager;
3540
}
3641

3742
public void initServer() {
3843
if (port > 0 && commonParameter.getBackupMembers().size() > 0) {
39-
new Thread(() -> {
44+
executor = ExecutorServiceManager.newSingleThreadExecutor(name);
45+
executor.submit(() -> {
4046
try {
4147
start();
4248
} catch (Exception e) {
4349
logger.error("Start backup server failed, {}", e);
4450
}
45-
}, "BackupServer").start();
51+
});
4652
}
4753
}
4854

@@ -95,5 +101,7 @@ public void close() {
95101
logger.warn("Closing backup server failed.", e);
96102
}
97103
}
104+
ExecutorServiceManager.shutdownAndAwaitTermination(executor, name);
105+
logger.info("Backup server closed.");
98106
}
99107
}

framework/src/main/java/org/tron/core/db/Manager.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.tron.api.GrpcAPI.TransactionInfoList;
5050
import org.tron.common.args.GenesisBlock;
5151
import org.tron.common.bloom.Bloom;
52+
import org.tron.common.es.ExecutorServiceManager;
5253
import org.tron.common.logsfilter.EventPluginLoader;
5354
import org.tron.common.logsfilter.FilterQuery;
5455
import org.tron.common.logsfilter.capsule.BlockFilterCapsule;
@@ -253,6 +254,13 @@ public class Manager {
253254
private AtomicInteger blockWaitLock = new AtomicInteger(0);
254255
private Object transactionLock = new Object();
255256

257+
private ExecutorService rePushEs;
258+
private static final String rePushEsName = "repush";
259+
private ExecutorService triggerEs;
260+
private static final String triggerEsName = "event-trigger";
261+
private ExecutorService filterEs;
262+
private static final String filterEsName = "filter";
263+
256264
/**
257265
* Cycle thread to rePush Transactions
258266
*/
@@ -429,14 +437,17 @@ public BlockingQueue<TransactionCapsule> getRePushTransactions() {
429437

430438
public void stopRePushThread() {
431439
isRunRePushThread = false;
440+
ExecutorServiceManager.shutdownAndAwaitTermination(rePushEs, rePushEsName);
432441
}
433442

434443
public void stopRePushTriggerThread() {
435444
isRunTriggerCapsuleProcessThread = false;
445+
ExecutorServiceManager.shutdownAndAwaitTermination(triggerEs, triggerEsName);
436446
}
437447

438448
public void stopFilterProcessThread() {
439449
isRunFilterProcessThread = false;
450+
ExecutorServiceManager.shutdownAndAwaitTermination(filterEs, filterEsName);
440451
}
441452

442453
@PostConstruct
@@ -524,21 +535,19 @@ public void init() {
524535
revokingStore.enable();
525536
validateSignService = Executors
526537
.newFixedThreadPool(Args.getInstance().getValidateSignThreadNum());
527-
Thread rePushThread = new Thread(rePushLoop);
528-
rePushThread.setDaemon(true);
529-
rePushThread.start();
538+
rePushEs = ExecutorServiceManager.newSingleThreadExecutor(rePushEsName, true);
539+
rePushEs.submit(rePushLoop);
530540
// add contract event listener for subscribing
531541
if (Args.getInstance().isEventSubscribe()) {
532542
startEventSubscribing();
533-
Thread triggerCapsuleProcessThread = new Thread(triggerCapsuleProcessLoop);
534-
triggerCapsuleProcessThread.setDaemon(true);
535-
triggerCapsuleProcessThread.start();
543+
triggerEs = ExecutorServiceManager.newSingleThreadExecutor(triggerEsName, true);
544+
triggerEs.submit(triggerCapsuleProcessLoop);
536545
}
537546

538547
// start json rpc filter process
539548
if (CommonParameter.getInstance().isJsonRpcFilterEnabled()) {
540-
Thread filterProcessThread = new Thread(filterProcessLoop);
541-
filterProcessThread.start();
549+
filterEs = ExecutorServiceManager.newSingleThreadExecutor(filterEsName, true);
550+
filterEs.submit(filterProcessLoop);
542551
}
543552

544553
//initStoreFactory

framework/src/main/java/org/tron/core/net/service/nodepersist/NodePersistService.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44
import java.util.ArrayList;
55
import java.util.Comparator;
66
import java.util.List;
7-
import java.util.Objects;
8-
import java.util.Timer;
9-
import java.util.TimerTask;
7+
import java.util.concurrent.ScheduledExecutorService;
8+
import java.util.concurrent.TimeUnit;
109
import lombok.extern.slf4j.Slf4j;
1110
import org.springframework.beans.factory.annotation.Autowired;
1211
import org.springframework.stereotype.Component;
12+
import org.tron.common.es.ExecutorServiceManager;
1313
import org.tron.common.parameter.CommonParameter;
1414
import org.tron.common.utils.ByteArray;
1515
import org.tron.common.utils.JsonUtil;
@@ -27,28 +27,24 @@ public class NodePersistService {
2727
private final boolean isNodePersist = CommonParameter.getInstance().isNodeDiscoveryPersist();
2828
@Autowired
2929
private CommonStore commonStore;
30-
private Timer nodePersistTaskTimer;
30+
31+
private ScheduledExecutorService nodePersistExecutor;
32+
33+
private final String name = "NodePersistTask";
3134

3235
public void init() {
3336
if (isNodePersist) {
34-
nodePersistTaskTimer = new Timer("NodePersistTaskTimer");
35-
nodePersistTaskTimer.scheduleAtFixedRate(new TimerTask() {
36-
@Override
37-
public void run() {
38-
dbWrite();
39-
}
40-
}, DB_COMMIT_RATE, DB_COMMIT_RATE);
37+
nodePersistExecutor = ExecutorServiceManager.newSingleThreadScheduledExecutor(name);
38+
nodePersistExecutor.scheduleAtFixedRate(this::dbWrite, DB_COMMIT_RATE, DB_COMMIT_RATE,
39+
TimeUnit.MILLISECONDS);
4140
}
4241
}
4342

4443
public void close() {
45-
if (Objects.isNull(nodePersistTaskTimer)) {
46-
return;
47-
}
48-
try {
49-
nodePersistTaskTimer.cancel();
50-
} catch (Exception e) {
51-
logger.error("Close nodePersistTaskTimer failed", e);
44+
if (isNodePersist) {
45+
logger.info("Node persist service shutdown...");
46+
ExecutorServiceManager.shutdownAndAwaitTermination(nodePersistExecutor, name);
47+
logger.info("Node persist service shutdown complete");
5248
}
5349
}
5450

0 commit comments

Comments
 (0)