Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ test {
showStandardStreams = true
}
if (isLocal) {
failFast = true
retry {
failOnPassedAfterRetry = false
maxFailures = 2
maxRetries = 2
}
}
else {
retry {
Expand Down
14 changes: 13 additions & 1 deletion src/main/java/io/nats/client/NatsSystemClock.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,29 @@

package io.nats.client;

public class NatsSystemClock {
public final class NatsSystemClock {
private static NatsSystemClockProvider PROVIDER = new NatsSystemClockProvider() {};

/**
* Set the provider. Null will reset to system default
* @param provider the provider
*/
public static void setProvider(final NatsSystemClockProvider provider) {
PROVIDER = provider == null ? new NatsSystemClockProvider() {} : provider;
}

/**
* Get the current milliseconds from the provider
* @return the milliseconds
*/
public static long currentTimeMillis() {
return PROVIDER.currentTimeMillis();
}

/**
* Get the current nano time from the provider
* @return the nano time
*/
public static long nanoTime() {
return PROVIDER.nanoTime();
}
Expand Down
40 changes: 39 additions & 1 deletion src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,11 @@ public class Options {
* {@link Builder#executor(ExecutorService) executor}.
*/
public static final String PROP_EXECUTOR_SERVICE_CLASS = "executor.service.class";
/**
* Property used to set class name for the Executor Service (executor) class
* {@link Builder#executor(ExecutorService) executor}.
*/
public static final String PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS = "scheduled.executor.service.class";
/**
* Property used to set class name for the Connect Thread Factory
* {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory}.
Expand Down Expand Up @@ -660,14 +665,15 @@ public class Options {
private final ErrorListener errorListener;
private final TimeTraceLogger timeTraceLogger;
private final ConnectionListener connectionListener;
private ReadListener readListener;
private final ReadListener readListener;
private final StatisticsCollector statisticsCollector;
private final String dataPortType;

private final boolean trackAdvancedStats;
private final boolean traceConnection;

private final ExecutorService executor;
private final ScheduledExecutorService scheduledExecutor;
private final ThreadFactory connectThreadFactory;
private final ThreadFactory callbackThreadFactory;
private final ServerPool serverPool;
Expand Down Expand Up @@ -808,6 +814,7 @@ public static class Builder {
private StatisticsCollector statisticsCollector = null;
private String dataPortType = DEFAULT_DATA_PORT_TYPE;
private ExecutorService executor;
private ScheduledExecutorService scheduledExecutor;
private ThreadFactory connectThreadFactory;
private ThreadFactory callbackThreadFactory;
private List<java.util.function.Consumer<HttpRequest>> httpRequestInterceptors;
Expand Down Expand Up @@ -939,6 +946,7 @@ public Builder properties(Properties props) {
classnameProperty(props, PROP_SERVERS_POOL_IMPLEMENTATION_CLASS, o -> this.serverPool = (ServerPool) o);
classnameProperty(props, PROP_DISPATCHER_FACTORY_CLASS, o -> this.dispatcherFactory = (DispatcherFactory) o);
classnameProperty(props, PROP_EXECUTOR_SERVICE_CLASS, o -> this.executor = (ExecutorService) o);
classnameProperty(props, PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS, o -> this.scheduledExecutor = (ScheduledExecutorService) o);
classnameProperty(props, PROP_CONNECT_THREAD_FACTORY_CLASS, o -> this.connectThreadFactory = (ThreadFactory) o);
classnameProperty(props, PROP_CALLBACK_THREAD_FACTORY_CLASS, o -> this.callbackThreadFactory = (ThreadFactory) o);
return this;
Expand Down Expand Up @@ -1630,6 +1638,19 @@ public Builder executor(ExecutorService executor) {
return this;
}

/**
* Set the {@link ScheduledExecutorService ScheduledExecutorService} used to run scheduled task like
* heartbeat timers
* The default is a ScheduledThreadPoolExecutor that does not
* execute delayed tasks after shutdown and removes tasks on cancel;
* @param scheduledExecutor The ScheduledExecutorService to use for timer tasks
* @return the Builder for chaining
*/
public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
this.scheduledExecutor = scheduledExecutor;
return this;
}

/**
* Sets custom thread factory for the executor service
*
Expand Down Expand Up @@ -1910,6 +1931,17 @@ else if (useDefaultTls) {
new DefaultThreadFactory(threadPrefix));
}

if (this.scheduledExecutor == null) {
String threadPrefix = nullOrEmpty(this.connectionName) ? DEFAULT_THREAD_NAME_PREFIX : this.connectionName;
// the core pool size of 3 is chosen considering where we know the scheduler is used.
// 1. Ping timer, 2. cleanup timer, 3. SocketDataPortWithWriteTimeout
// Pull message managers also use a scheduler, but we don't even know if this will be consuming
ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(3, new DefaultThreadFactory(threadPrefix));
stpe.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
stpe.setRemoveOnCancelPolicy(true);
this.scheduledExecutor = stpe;
}

if (socketReadTimeoutMillis > 0) {
long srtMin = pingInterval.toMillis() + MINIMUM_SOCKET_WRITE_TIMEOUT_GT_CONNECTION_TIMEOUT;
if (socketReadTimeoutMillis < srtMin) {
Expand Down Expand Up @@ -2014,6 +2046,7 @@ public Builder(Options o) {
this.dataPortType = o.dataPortType;
this.trackAdvancedStats = o.trackAdvancedStats;
this.executor = o.executor;
this.scheduledExecutor = o.scheduledExecutor;
this.callbackThreadFactory = o.callbackThreadFactory;
this.connectThreadFactory = o.connectThreadFactory;
this.httpRequestInterceptors = o.httpRequestInterceptors;
Expand Down Expand Up @@ -2082,6 +2115,7 @@ private Options(Builder b) {
this.dataPortType = b.dataPortType;
this.trackAdvancedStats = b.trackAdvancedStats;
this.executor = b.executor;
this.scheduledExecutor = b.scheduledExecutor;
this.callbackThreadFactory = b.callbackThreadFactory;
this.connectThreadFactory = b.connectThreadFactory;
this.httpRequestInterceptors = b.httpRequestInterceptors;
Expand All @@ -2107,6 +2141,10 @@ public ExecutorService getExecutor() {
return this.executor;
}

public ScheduledExecutorService getScheduledExecutor() {
return scheduledExecutor;
}

/**
* @return the callback executor, see {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory()} in the builder doc
*/
Expand Down
74 changes: 20 additions & 54 deletions src/main/java/io/nats/client/impl/MessageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
import io.nats.client.PullRequestOptions;
import io.nats.client.SubscribeOptions;
import io.nats.client.support.NatsConstants;
import io.nats.client.support.ScheduledTask;

import java.time.Duration;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -46,8 +45,8 @@ public enum ManageResult {MESSAGE, STATUS_HANDLED, STATUS_TERMINUS, STATUS_ERROR
protected boolean hb;
protected long idleHeartbeatSetting;
protected long alarmPeriodSettingNanos;
protected MmTimerTask heartbeatTimerTask;
protected Timer heartbeatTimer;
protected ScheduledTask heartbeatTask;
protected final AtomicLong currentAlarmPeriodNanos;

protected MessageManager(NatsConnection conn, SubscribeOptions so, boolean syncMode) {
stateChangeLock = new ReentrantLock();
Expand All @@ -62,6 +61,7 @@ protected MessageManager(NatsConnection conn, SubscribeOptions so, boolean syncM
idleHeartbeatSetting = 0;
alarmPeriodSettingNanos = 0;
lastMsgReceivedNanoTime = new AtomicLong(NatsSystemClock.nanoTime());
currentAlarmPeriodNanos = new AtomicLong();
}

protected boolean isSyncMode() { return syncMode; }
Expand Down Expand Up @@ -134,62 +134,29 @@ protected void updateLastMessageReceived() {
lastMsgReceivedNanoTime.set(NatsSystemClock.nanoTime());
}

class MmTimerTask extends TimerTask {
long alarmPeriodNanos;
final AtomicBoolean alive;

public MmTimerTask(long alarmPeriodNanos) {
this.alarmPeriodNanos = alarmPeriodNanos;
alive = new AtomicBoolean(true);
}

public void reuse() {
alive.getAndSet(true);
}

public void shutdown() {
alive.getAndSet(false);
}

@Override
public void run() {
if (alive.get() && !Thread.interrupted()) {
long sinceLast = NatsSystemClock.nanoTime() - lastMsgReceivedNanoTime.get();
if (alive.get() && sinceLast > alarmPeriodNanos) {
handleHeartbeatError();
}
}
}

@Override
public String toString() {
long sinceLastMillis = (NatsSystemClock.nanoTime() - lastMsgReceivedNanoTime.get()) / NatsConstants.NANOS_PER_MILLI;
return "MmTimerTask{" +
", alarmPeriod=" + (alarmPeriodNanos / NatsConstants.NANOS_PER_MILLI) +
"ms, alive=" + alive.get() +
", sinceLast=" + sinceLastMillis + "ms}";
}
}

protected void initOrResetHeartbeatTimer() {
stateChangeLock.lock();
try {
if (heartbeatTimer != null) {
if (heartbeatTask != null) {
// Same settings, just reuse the existing timer
if (heartbeatTimerTask.alarmPeriodNanos == alarmPeriodSettingNanos) {
heartbeatTimerTask.reuse();
if (currentAlarmPeriodNanos.get() == alarmPeriodSettingNanos) {
updateLastMessageReceived();
return;
}

// Replace timer since settings have changed
shutdownHeartbeatTimer();
}

// replacement or new comes here
heartbeatTimer = new Timer();
heartbeatTimerTask = new MmTimerTask(alarmPeriodSettingNanos);
long alarmPeriodSettingMillis = alarmPeriodSettingNanos / NatsConstants.NANOS_PER_MILLI;
heartbeatTimer.schedule(heartbeatTimerTask, alarmPeriodSettingMillis, alarmPeriodSettingMillis);
this.currentAlarmPeriodNanos.set(alarmPeriodSettingNanos);
heartbeatTask = new ScheduledTask(conn.getScheduledExecutor(), alarmPeriodSettingNanos, TimeUnit.NANOSECONDS,
() -> {
long sinceLast = NatsSystemClock.nanoTime() - lastMsgReceivedNanoTime.get();
if (sinceLast > currentAlarmPeriodNanos.get()) {
handleHeartbeatError();
}
});
updateLastMessageReceived();
}
finally {
Expand All @@ -200,12 +167,11 @@ protected void initOrResetHeartbeatTimer() {
protected void shutdownHeartbeatTimer() {
stateChangeLock.lock();
try {
if (heartbeatTimer != null) {
heartbeatTimerTask.shutdown();
heartbeatTimerTask = null;
heartbeatTimer.cancel();
heartbeatTimer = null;
if (heartbeatTask != null) {
heartbeatTask.shutdown();
heartbeatTask = null;
}
currentAlarmPeriodNanos.set(0);
}
finally {
stateChangeLock.unlock();
Expand Down
52 changes: 26 additions & 26 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
import io.nats.client.*;
import io.nats.client.ConnectionListener.Events;
import io.nats.client.api.ServerInfo;
import io.nats.client.support.ByteArrayBuilder;
import io.nats.client.support.NatsRequestCompletableFuture;
import io.nats.client.support.NatsUri;
import io.nats.client.support.Validator;
import io.nats.client.support.*;

import java.io.IOException;
import java.net.InetAddress;
Expand Down Expand Up @@ -82,7 +79,8 @@ class NatsConnection implements Connection {
private final String mainInbox;
private final AtomicReference<NatsDispatcher> inboxDispatcher;
private final ReentrantLock inboxDispatcherLock;
private Timer timer;
private ScheduledTask pingTask;
private ScheduledTask cleanupTask;

private final AtomicBoolean needPing;

Expand All @@ -98,6 +96,7 @@ class NatsConnection implements Connection {
private final ExecutorService callbackRunner;
private final ExecutorService executor;
private final ExecutorService connectExecutor;
private final ScheduledExecutorService scheduledExecutor;
private final boolean advancedTracking;

private final ServerPool serverPool;
Expand Down Expand Up @@ -159,6 +158,7 @@ class NatsConnection implements Connection {
this.executor = options.getExecutor();
this.callbackRunner = options.getCallbackExecutor();
this.connectExecutor = options.getConnectExecutor();
this.scheduledExecutor = options.getScheduledExecutor();

timeTraceLogger.trace("creating reader and writer");
this.reader = new NatsConnectionReader(this);
Expand Down Expand Up @@ -595,35 +595,27 @@ void tryToConnect(NatsUri cur, NatsUri resolved, long now) {
pongFuture.get(timeoutNanos, TimeUnit.NANOSECONDS);
}

if (this.timer == null) {
if (pingTask == null) {
timeCheck(end, "starting ping and cleanup timers");
this.timer = new Timer("Nats Connection Timer");

long pingMillis = this.options.getPingInterval().toMillis();

if (pingMillis > 0) {
this.timer.schedule(new TimerTask() {
public void run() {
if (isConnected()) {
try {
softPing(); // The timer always uses the standard queue
}
catch (Exception e) {
// it's running in a thread, there is no point throwing here
}
pingTask = new ScheduledTask(scheduledExecutor, pingMillis, () -> {
if (isConnected()) {
try {
softPing(); // The timer always uses the standard queue
}
catch (Exception e) {
// it's running in a thread, there is no point throwing here
}
}
}, pingMillis, pingMillis);
});
}

long cleanMillis = this.options.getRequestCleanupInterval().toMillis();

if (cleanMillis > 0) {
this.timer.schedule(new TimerTask() {
public void run() {
cleanResponses(false);
}
}, cleanMillis, cleanMillis);
cleanupTask = new ScheduledTask(scheduledExecutor, cleanMillis, () -> cleanResponses(false));
}
}

Expand Down Expand Up @@ -826,9 +818,13 @@ void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedExcep
this.dispatchers.clear();
this.subscribers.clear();

if (timer != null) {
timer.cancel();
timer = null;
if (pingTask != null) {
pingTask.shutdown();
pingTask = null;
}
if (cleanupTask != null) {
cleanupTask.shutdown();
cleanupTask = null;
}

cleanResponses(true);
Expand Down Expand Up @@ -1966,6 +1962,10 @@ ExecutorService getExecutor() {
return executor;
}

ScheduledExecutorService getScheduledExecutor() {
return scheduledExecutor;
}

void updateStatus(Status newStatus) {
Status oldStatus = this.status;

Expand Down
Loading
Loading