Skip to content

Commit b8883e4

Browse files
authored
Merge pull request #1454 from nats-io/pull-status-warnings-1194
Demonstrate Pull Status Warning Handling
2 parents 375e993 + d7bf2fe commit b8883e4

File tree

4 files changed

+54
-5
lines changed

4 files changed

+54
-5
lines changed

src/main/java/io/nats/client/impl/PullMessageManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,9 @@ protected ManageResult manageStatus(Message msg) {
186186
|| statMsg.equals(LEADERSHIP_CHANGE)
187187
|| statMsg.equals(MESSAGE_SIZE_EXCEEDS_MAX_BYTES))
188188
{
189+
if (raiseStatusWarnings) {
190+
conn.executeCallback((c, el) -> el.pullStatusWarning(c, sub, status));
191+
}
189192
return STATUS_TERMINUS;
190193
}
191194
break;

src/main/java/io/nats/client/support/Token.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import static io.nats.client.support.NatsConstants.*;
2020
import static io.nats.client.support.NatsJetStreamConstants.*;
2121
import static io.nats.client.support.Status.*;
22-
import static java.nio.charset.StandardCharsets.US_ASCII;
22+
import static java.nio.charset.StandardCharsets.UTF_8;
2323

2424
public class Token {
2525
private final byte[] serialized;
@@ -117,7 +117,7 @@ public String getValueOrNull() {
117117
}
118118

119119
private String valueAsString() {
120-
return new String(serialized, start, valueLength, US_ASCII).trim();
120+
return new String(serialized, start, valueLength, UTF_8).trim();
121121
}
122122

123123
@NonNull
@@ -274,7 +274,6 @@ private boolean valueStartsWithExceededMax() {
274274
return true;
275275
}
276276

277-
278277
private boolean endsMatch(byte @NonNull [] checkBytes, int compareStartIndex) {
279278
if (valueLength != checkBytes.length) {
280279
return false;

src/test/java/io/nats/client/impl/JetStreamConsumerTests.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,4 +410,51 @@ private static void validateMultipleSubjectFilterSub(JetStreamSubscription sub,
410410
assertEquals(5, count2);
411411
}
412412

413+
@Test
414+
public void testRaiseStatusWarnings1194() throws Exception {
415+
ListenerForTesting listenerForTesting = new ListenerForTesting(true, true);
416+
runInJsServer(listenerForTesting, nc -> {
417+
// Setup
418+
JetStreamManagement jsm = nc.jetStreamManagement();
419+
TestingStreamContainer tsc = new TestingStreamContainer(jsm);
420+
StreamContext streamContext = nc.getStreamContext(tsc.stream);
421+
422+
// Setting maxBatch=1, so we shouldn't allow fetching more messages at once.
423+
ConsumerConfiguration consumerConfig = ConsumerConfiguration.builder().filterSubject(tsc.subject()).maxBatch(1).build();
424+
ConsumerContext consumerContext = streamContext.createOrUpdateConsumer(consumerConfig);
425+
426+
int count = 0;
427+
428+
// Fetching a batch of 100 messages is not allowed, so we rightfully don't get any messages and wait for timeout.
429+
// But we don't get informed about the status message.
430+
FetchConsumeOptions fco = FetchConsumeOptions.builder()
431+
.maxMessages(100)
432+
.expiresIn(1000)
433+
.build();
434+
try (FetchConsumer fetchConsumer = consumerContext.fetch(fco)) {
435+
Message msg;
436+
while ((msg = fetchConsumer.nextMessage()) != null) {
437+
msg.ack();
438+
count++;
439+
}
440+
}
441+
assertEquals(0, count);
442+
assertEquals(0, listenerForTesting.getPullStatusWarnings().size());
443+
444+
fco = FetchConsumeOptions.builder()
445+
.maxMessages(100)
446+
.expiresIn(1000)
447+
.raiseStatusWarnings()
448+
.build();
449+
try (FetchConsumer fetchConsumer = consumerContext.fetch(fco)) {
450+
Message msg;
451+
while ((msg = fetchConsumer.nextMessage()) != null) {
452+
msg.ack();
453+
count++;
454+
}
455+
}
456+
assertEquals(0, count);
457+
assertEquals(1, listenerForTesting.getPullStatusWarnings().size());
458+
});
459+
}
413460
}

src/test/java/io/nats/client/impl/MessageManagerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ private void testPullBqpAndManage(NatsJetStreamSubscription sub, ListenerForTest
185185
sleep(100);
186186

187187
List<ListenerForTesting.StatusEvent> list = listener.getPullStatusWarnings();
188-
int[] codes = new int[]{NOT_FOUND_CODE, REQUEST_TIMEOUT_CODE, CONFLICT_CODE, CONFLICT_CODE, CONFLICT_CODE, CONFLICT_CODE};
189-
assertEquals(6, list.size());
188+
int[] codes = new int[]{NOT_FOUND_CODE, REQUEST_TIMEOUT_CODE, CONFLICT_CODE, CONFLICT_CODE, CONFLICT_CODE, CONFLICT_CODE, CONFLICT_CODE, CONFLICT_CODE};
189+
assertEquals(8, list.size());
190190
for (int x = 0; x < list.size(); x++) {
191191
ListenerForTesting.StatusEvent se = list.get(x);
192192
assertSame(sub.getSID(), se.sid);

0 commit comments

Comments
 (0)