Skip to content

Commit e2cb11e

Browse files
authored
Merge pull request #1416 from nats-io/lock-around-outgoing
Lock around access to pending message and byte counts
2 parents 0f6b3ea + 80c9985 commit e2cb11e

File tree

2 files changed

+29
-5
lines changed

2 files changed

+29
-5
lines changed

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class NatsConnection implements Connection {
5454
private boolean disconnecting; // you can only disconnect in one thread
5555
private boolean closing; // respect a close call regardless
5656
private Exception exceptionDuringConnectChange; // exception occurred in another thread while dis/connecting
57-
final ReentrantLock closeSocketLock;
57+
final ReentrantLock closeSocketLock; // this is not private so it can be tested
5858

5959
private Status status;
6060
private final ReentrantLock statusLock;
@@ -2550,14 +2550,26 @@ private void ensureNotClosing() throws IOException {
25502550
*/
25512551
@Override
25522552
public long outgoingPendingMessageCount() {
2553-
return writer.outgoingPendingMessageCount();
2553+
closeSocketLock.lock();
2554+
try {
2555+
return writer == null ? -1 : writer.outgoingPendingMessageCount();
2556+
}
2557+
finally {
2558+
closeSocketLock.unlock();
2559+
}
25542560
}
25552561

25562562
/**
25572563
* {@inheritDoc}
25582564
*/
25592565
@Override
25602566
public long outgoingPendingBytes() {
2561-
return writer.outgoingPendingBytes();
2567+
closeSocketLock.lock();
2568+
try {
2569+
return writer == null ? -1 : writer.outgoingPendingBytes();
2570+
}
2571+
finally {
2572+
closeSocketLock.unlock();
2573+
}
25622574
}
25632575
}

src/main/java/io/nats/client/impl/NatsConnectionWriter.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,22 @@ void flushBuffer() {
256256
}
257257

258258
long outgoingPendingMessageCount() {
259-
return outgoing.length();
259+
writerLock.lock();
260+
try {
261+
return outgoing == null ? -1 : outgoing.length();
262+
}
263+
finally {
264+
writerLock.unlock();
265+
}
260266
}
261267

262268
long outgoingPendingBytes() {
263-
return outgoing.sizeInBytes();
269+
writerLock.lock();
270+
try {
271+
return outgoing == null ? -1 : outgoing.length();
272+
}
273+
finally {
274+
writerLock.unlock();
275+
}
264276
}
265277
}

0 commit comments

Comments
 (0)