Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/impl/MessageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void shutdown() {

@Override
public void run() {
if (alive.get()) {
if (alive.get() && !Thread.interrupted()) {
long sinceLast = System.currentTimeMillis() - lastMsgReceived.get();
if (alive.get() && sinceLast > alarmPeriodSetting) {
handleHeartbeatError();
Expand Down
18 changes: 6 additions & 12 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -2178,20 +2178,17 @@ public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutExceptio
// Wait for the timeout or the pending count to go to 0
executor.submit(() -> {
try {
Instant now = Instant.now();

while (timeout == null || timeout.equals(Duration.ZERO)
|| Duration.between(start, now).compareTo(timeout) < 0) {
long stop = (timeout == null || timeout.equals(Duration.ZERO))
? Long.MAX_VALUE
: System.nanoTime() + timeout.toNanos();
while (System.nanoTime() < stop && !Thread.interrupted())
{
consumers.removeIf(NatsConsumer::isDrained);

if (consumers.isEmpty()) {
break;
}

//noinspection BusyWait
Thread.sleep(1); // Sleep 1 milli

now = Instant.now();
}

// Stop publishing
Expand All @@ -2201,16 +2198,13 @@ public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutExceptio
if (timeout == null || timeout.equals(Duration.ZERO)) {
this.flush(Duration.ZERO);
} else {
now = Instant.now();

Instant now = Instant.now();
Duration passed = Duration.between(start, now);
Duration newTimeout = timeout.minus(passed);

if (newTimeout.toNanos() > 0) {
this.flush(newTimeout);
}
}

this.close(false, false); // close the connection after the last flush
tracker.complete(consumers.isEmpty());
} catch (TimeoutException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void run() {
this.gotCR = false;
this.opPos = 0;

while (this.running.get()) {
while (running.get() && !Thread.interrupted()) {
this.bufferPosition = 0;
int bytesRead = dataPort.read(this.buffer, 0, this.buffer.length);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void run() {
dataPort = this.dataPortFuture.get(); // Will wait for the future to complete
StatisticsCollector stats = this.connection.getNatsStatistics();

while (this.running.get()) {
while (running.get() && !Thread.interrupted()) {
NatsMessage msg;
if (this.reconnectMode.get()) {
msg = this.reconnectOutgoing.accumulate(sendBufferLength.get(), Options.MAX_MESSAGES_IN_NETWORK_BUFFER, reconnectTimeout);
Expand Down
12 changes: 5 additions & 7 deletions src/main/java/io/nats/client/impl/NatsConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,15 @@ public CompletableFuture<Boolean> drain(Duration timeout) throws InterruptedExce
// draining
connection.getExecutor().submit(() -> {
try {
Instant now = Instant.now();

while (timeout == null || timeout.equals(Duration.ZERO)
|| Duration.between(start, now).compareTo(timeout) < 0) {
long stop = (timeout == null || timeout.equals(Duration.ZERO))
? Long.MAX_VALUE
: System.nanoTime() + timeout.toNanos();
while (System.nanoTime() < stop && !Thread.interrupted()) {
if (this.isDrained()) {
break;
}

//noinspection BusyWait
Thread.sleep(1); // Sleep 1 milli

now = Instant.now();
}

this.cleanUpAfterDrain();
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/io/nats/client/impl/NatsDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,8 @@ boolean breakRunLoop() {

public void run() {
try {
while (this.running.get()) { // start

while (running.get() && !Thread.interrupted()) {
NatsMessage msg = this.incoming.pop(this.waitForMessage);

if (msg != null) {
NatsSubscription sub = msg.getNatsSubscription();
if (sub != null && sub.isActive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ class NatsDispatcherWithExecutor extends NatsDispatcher {
@Override
public void run() {
try {
while (this.running.get()) { // start

while (running.get() && !Thread.interrupted()) {
NatsMessage msg = this.incoming.pop(this.waitForMessage);

if (msg != null) {
NatsSubscription sub = msg.getNatsSubscription();
if (sub != null && sub.isActive()) {
Expand Down