Skip to content

Commit 8ccf643

Browse files
authored
HBASE-24750 : All ExecutorService should use guava ThreadFactoryBuilder (#2214)
Closes #2196 Signed-off-by: Nick Dimiduk <ndimiduk@apache.org> Signed-off-by: Ted Yu <tyu@apache.org> Signed-off-by: niuyulin <nyl353@163.com>
1 parent 96ea136 commit 8ccf643

39 files changed

Lines changed: 176 additions & 191 deletions

File tree

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
5252
import org.apache.hadoop.hbase.util.Threads;
5353
import org.apache.hadoop.security.UserGroupInformation;
54+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
5455
import org.apache.yetus.audience.InterfaceAudience;
5556
import org.slf4j.Logger;
5657
import org.slf4j.LoggerFactory;
@@ -73,7 +74,9 @@ class AsyncConnectionImpl implements AsyncConnection {
7374

7475
@VisibleForTesting
7576
static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
76-
Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
77+
new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d")
78+
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 10,
79+
TimeUnit.MILLISECONDS);
7780

7881
private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
7982

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.hadoop.hbase.util.Addressing;
3838
import org.apache.hadoop.hbase.util.ExceptionUtil;
3939
import org.apache.hadoop.hbase.util.Threads;
40+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
4041
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
4142
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
4243
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
@@ -178,8 +179,9 @@ public boolean isDeadServer(ServerName sn) {
178179
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
179180
class MulticastListener implements Listener {
180181
private DatagramChannel channel;
181-
private final EventLoopGroup group = new NioEventLoopGroup(
182-
1, Threads.newDaemonThreadFactory("hbase-client-clusterStatusListener"));
182+
private final EventLoopGroup group = new NioEventLoopGroup(1,
183+
new ThreadFactoryBuilder().setNameFormat("hbase-client-clusterStatusListener-pool-%d")
184+
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
183185

184186
public MulticastListener() {
185187
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.apache.hadoop.hbase.util.Threads;
8080
import org.apache.hadoop.ipc.RemoteException;
8181
import org.apache.hadoop.security.UserGroupInformation;
82+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
8283
import org.apache.yetus.audience.InterfaceAudience;
8384
import org.apache.zookeeper.KeeperException;
8485
import org.slf4j.Logger;
@@ -490,13 +491,10 @@ private ThreadPoolExecutor getThreadPool(int maxThreads, int coreThreads, String
490491
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
491492
coreThreads = maxThreads;
492493
}
493-
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
494-
coreThreads,
495-
maxThreads,
496-
keepAliveTime,
497-
TimeUnit.SECONDS,
498-
workQueue,
499-
Threads.newDaemonThreadFactory(toString() + nameHint));
494+
ThreadPoolExecutor tpe =
495+
new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue,
496+
new ThreadFactoryBuilder().setNameFormat(toString() + nameHint + "-pool-%d")
497+
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
500498
tpe.allowCoreThreadTimeOut(true);
501499
return tpe;
502500
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.apache.hadoop.hbase.TableName;
3535
import org.apache.hadoop.hbase.filter.Filter;
3636
import org.apache.hadoop.hbase.io.TimeRange;
37+
import org.apache.hadoop.hbase.util.Threads;
38+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
3739
import org.apache.yetus.audience.InterfaceAudience;
3840
import org.apache.yetus.audience.InterfaceStability;
3941
import org.slf4j.Logger;
@@ -56,7 +58,6 @@
5658
import org.apache.hadoop.hbase.util.Bytes;
5759
import org.apache.hadoop.hbase.util.Pair;
5860
import org.apache.hadoop.hbase.util.ReflectionUtils;
59-
import org.apache.hadoop.hbase.util.Threads;
6061

6162
import java.io.IOException;
6263
import java.io.InterruptedIOException;
@@ -138,8 +139,10 @@ public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
138139
// if it is necessary and will grow unbounded. This could be bad but in HCM
139140
// we only create as many Runnables as there are region servers. It means
140141
// it also scales when new region servers are added.
141-
ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime,
142-
TimeUnit.SECONDS, new SynchronousQueue<>(), Threads.newDaemonThreadFactory("htable"));
142+
ThreadPoolExecutor pool =
143+
new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime, TimeUnit.SECONDS,
144+
new SynchronousQueue<>(), new ThreadFactoryBuilder().setNameFormat("htable-pool-%d")
145+
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
143146
pool.allowCoreThreadTimeOut(true);
144147
return pool;
145148
}

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.hadoop.hbase.util.Threads;
4545
import org.apache.hadoop.io.compress.CompressionCodec;
4646
import org.apache.hadoop.ipc.RemoteException;
47+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
4748
import org.apache.yetus.audience.InterfaceAudience;
4849
import org.slf4j.Logger;
4950
import org.slf4j.LoggerFactory;
@@ -91,10 +92,14 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
9192
public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class);
9293

9394
protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
94-
Threads.newDaemonThreadFactory("RpcClient-timer"), 10, TimeUnit.MILLISECONDS);
95+
new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d")
96+
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 10,
97+
TimeUnit.MILLISECONDS);
9598

9699
private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors
97-
.newScheduledThreadPool(1, Threads.newDaemonThreadFactory("Idle-Rpc-Conn-Sweeper"));
100+
.newScheduledThreadPool(1,
101+
new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d")
102+
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
98103

99104
protected boolean running = true; // if client runs
100105

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
3636
import org.apache.hadoop.hbase.util.Threads;
3737
import org.apache.hadoop.security.UserGroupInformation;
38+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
3839
import org.apache.yetus.audience.InterfaceAudience;
3940
import org.slf4j.Logger;
4041
import org.slf4j.LoggerFactory;
@@ -76,8 +77,9 @@ class NettyRpcConnection extends RpcConnection {
7677

7778
private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class);
7879

79-
private static final ScheduledExecutorService RELOGIN_EXECUTOR =
80-
Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
80+
private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
81+
.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d")
82+
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
8183

8284
private final NettyRpcClient rpcClient;
8385

hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.apache.hadoop.hbase.testclassification.LargeTests;
6969
import org.apache.hadoop.hbase.util.Bytes;
7070
import org.apache.hadoop.hbase.util.Threads;
71+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
7172
import org.junit.Assert;
7273
import org.junit.Before;
7374
import org.junit.ClassRule;
@@ -138,7 +139,9 @@ public void beforeEach() {
138139

139140
static class CountingThreadFactory implements ThreadFactory {
140141
final AtomicInteger nbThreads;
141-
ThreadFactory realFactory = Threads.newDaemonThreadFactory("test-TestAsyncProcess");
142+
ThreadFactory realFactory =
143+
new ThreadFactoryBuilder().setNameFormat("test-TestAsyncProcess-pool-%d")
144+
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build();
142145
@Override
143146
public Thread newThread(Runnable r) {
144147
nbThreads.incrementAndGet();

hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.hadoop.hbase.util.Threads;
6060
import org.apache.hadoop.util.Tool;
6161
import org.apache.hadoop.util.ToolRunner;
62+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
6263
import org.junit.Before;
6364
import org.junit.ClassRule;
6465
import org.junit.Ignore;
@@ -803,7 +804,8 @@ public int run(String[] arg0) throws Exception {
803804
// Have them all share the same connection so they all share the same instance of
804805
// ManyServersManyRegionsConnection so I can keep an eye on how many requests by server.
805806
final ExecutorService pool = Executors.newCachedThreadPool(
806-
Threads.newDaemonThreadFactory("p"));
807+
new ThreadFactoryBuilder().setNameFormat("p-pool-%d")
808+
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
807809
// Executors.newFixedThreadPool(servers * 10, Threads.getNamedThreadFactory("p"));
808810
// Share a connection so I can keep counts in the 'server' on concurrency.
809811
final Connection sharedConnection = ConnectionFactory.createConnection(getConf()/*, pool*/);

hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java

Lines changed: 6 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,9 @@
4545
@InterfaceAudience.Private
4646
public class Threads {
4747
private static final Logger LOG = LoggerFactory.getLogger(Threads.class);
48-
private static final AtomicInteger poolNumber = new AtomicInteger(1);
4948

5049
public static final UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER =
51-
new UncaughtExceptionHandler() {
52-
@Override
53-
public void uncaughtException(Thread t, Throwable e) {
54-
LOG.warn("Thread:" + t + " exited with Exception:"
55-
+ StringUtils.stringifyException(e));
56-
}
57-
};
50+
(t, e) -> LOG.warn("Thread:{} exited with Exception:{}", t, StringUtils.stringifyException(e));
5851

5952
/**
6053
* Utility method that sets name, daemon status and starts passed thread.
@@ -186,89 +179,24 @@ public static void sleepWithoutInterrupt(final long msToWait) {
186179
* @return threadPoolExecutor the cachedThreadPool with a bounded number
187180
* as the maximum thread size in the pool.
188181
*/
189-
public static ThreadPoolExecutor getBoundedCachedThreadPool(
190-
int maxCachedThread, long timeout, TimeUnit unit,
191-
ThreadFactory threadFactory) {
182+
public static ThreadPoolExecutor getBoundedCachedThreadPool(int maxCachedThread, long timeout,
183+
TimeUnit unit, ThreadFactory threadFactory) {
192184
ThreadPoolExecutor boundedCachedThreadPool =
193-
new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout,
194-
unit, new LinkedBlockingQueue<>(), threadFactory);
185+
new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout, unit,
186+
new LinkedBlockingQueue<>(), threadFactory);
195187
// allow the core pool threads timeout and terminate
196188
boundedCachedThreadPool.allowCoreThreadTimeOut(true);
197189
return boundedCachedThreadPool;
198190
}
199191

200-
201-
/**
202-
* Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
203-
* with a common prefix.
204-
* @param prefix The prefix of every created Thread's name
205-
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
206-
*/
207-
public static ThreadFactory getNamedThreadFactory(final String prefix) {
208-
SecurityManager s = System.getSecurityManager();
209-
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
210-
.getThreadGroup();
211-
212-
return new ThreadFactory() {
213-
final AtomicInteger threadNumber = new AtomicInteger(1);
214-
private final int poolNumber = Threads.poolNumber.getAndIncrement();
215-
final ThreadGroup group = threadGroup;
216-
217-
@Override
218-
public Thread newThread(Runnable r) {
219-
final String name = prefix + "-pool" + poolNumber + "-t" + threadNumber.getAndIncrement();
220-
return new Thread(group, r, name);
221-
}
222-
};
223-
}
224-
225-
/**
226-
* Same as {#newDaemonThreadFactory(String, UncaughtExceptionHandler)},
227-
* without setting the exception handler.
228-
*/
229-
public static ThreadFactory newDaemonThreadFactory(final String prefix) {
230-
return newDaemonThreadFactory(prefix, null);
231-
}
232-
233-
/**
234-
* Get a named {@link ThreadFactory} that just builds daemon threads.
235-
* @param prefix name prefix for all threads created from the factory
236-
* @param handler unhandles exception handler to set for all threads
237-
* @return a thread factory that creates named, daemon threads with
238-
* the supplied exception handler and normal priority
239-
*/
240-
public static ThreadFactory newDaemonThreadFactory(final String prefix,
241-
final UncaughtExceptionHandler handler) {
242-
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
243-
return new ThreadFactory() {
244-
@Override
245-
public Thread newThread(Runnable r) {
246-
Thread t = namedFactory.newThread(r);
247-
if (handler != null) {
248-
t.setUncaughtExceptionHandler(handler);
249-
} else {
250-
t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
251-
}
252-
if (!t.isDaemon()) {
253-
t.setDaemon(true);
254-
}
255-
if (t.getPriority() != Thread.NORM_PRIORITY) {
256-
t.setPriority(Thread.NORM_PRIORITY);
257-
}
258-
return t;
259-
}
260-
261-
};
262-
}
263-
264192
/** Sets an UncaughtExceptionHandler for the thread which logs the
265193
* Exception stack if the thread dies.
266194
*/
267195
public static void setLoggingUncaughtExceptionHandler(Thread t) {
268196
t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
269197
}
270198

271-
private static interface PrintThreadInfoHelper {
199+
private interface PrintThreadInfoHelper {
272200

273201
void printThreadInfo(PrintStream stream, String title);
274202

hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.hadoop.hbase.util.Threads;
3838
import org.apache.hadoop.util.Tool;
3939
import org.apache.hadoop.util.ToolRunner;
40+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
4041
import org.apache.yetus.audience.InterfaceAudience;
4142
import org.slf4j.Logger;
4243
import org.slf4j.LoggerFactory;
@@ -130,7 +131,8 @@ public int run(String[] args) throws Exception {
130131
TableName tableName = TableName.valueOf(args[0]);
131132
int numOps = args.length > 1 ? Integer.parseInt(args[1]) : DEFAULT_NUM_OPS;
132133
ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE,
133-
Threads.newDaemonThreadFactory("AsyncClientExample"));
134+
new ThreadFactoryBuilder().setNameFormat("AsyncClientExample-pool-%d")
135+
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
134136
// We use AsyncTable here so we need to provide a separated thread pool. RawAsyncTable does not
135137
// need a thread pool and may have a better performance if you use it correctly as it can save
136138
// some context switches. But if you use RawAsyncTable incorrectly, you may have a very bad

0 commit comments

Comments
 (0)