Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/main/java/io/nats/client/impl/OrderedMessageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protected void startup(NatsJetStreamSubscription sub) {
@Override
protected ManageResult manage(Message msg) {
if (!msg.getSID().equals(targetSid.get())) {
return STATUS_HANDLED; // wrong sid is throwaway from previous consumer that errored
return STATUS_HANDLED; // wrong sid. message is a throwaway from previous consumer that errored
}

if (msg.isJetStream()) {
Expand Down Expand Up @@ -90,14 +90,14 @@ private void handleErrorCondition() {
}
catch (Exception ignore) {}

// 2. re-subscribe. This means kill the sub then make a new one
// 2. re-subscribe. This means killing the sub then making a new one.
// New sub needs a new deliverSubject
String newDeliverSubject = sub.connection.createInbox();
sub.reSubscribe(newDeliverSubject);
targetSid.set(sub.getSID());

// 3. make a new consumer using the same deliver subject but
// with a new starting point
// 3. make a new consumer using the same "deliver" subject
// but with a new starting point
ConsumerConfiguration userCC = js.consumerConfigurationForOrdered(originalCc, lastStreamSeq, newDeliverSubject, actualConsumerName, null);
ConsumerInfo ci = js._createConsumer(stream, userCC, ConsumerCreateRequest.Action.Create); // this can fail when a server is down.
sub.setConsumerName(ci.getName());
Expand Down
11 changes: 5 additions & 6 deletions src/main/java/io/nats/client/impl/PullMessageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,17 @@ protected ManageResult manageStatus(Message msg) {
case CONFLICT_CODE:
// sometimes just a warning
String statMsg = status.getMessage();
if (statMsg.startsWith("Exceeded Max")
|| statMsg.equals(SERVER_SHUTDOWN)
|| statMsg.equals(LEADERSHIP_CHANGE)
) {
if (statMsg.startsWith(EXCEEDED_MAX_PREFIX) || statMsg.equals(SERVER_SHUTDOWN))
{
if (raiseStatusWarnings) {
conn.executeCallback((c, el) -> el.pullStatusWarning(c, sub, status));
}
return STATUS_HANDLED;
}

if (statMsg.equals(BATCH_COMPLETED) ||
statMsg.equals(MESSAGE_SIZE_EXCEEDS_MAX_BYTES))
if (statMsg.equals(BATCH_COMPLETED)
|| statMsg.equals(LEADERSHIP_CHANGE)
|| statMsg.equals(MESSAGE_SIZE_EXCEEDS_MAX_BYTES))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved LEADERSHIP_CHANGE to STATUS_TERMINUS (for simplified pull consuming, terminus translates to the pull is done, will need a new one)

{
return STATUS_TERMINUS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected void startup(NatsJetStreamSubscription sub) {
@Override
protected ManageResult manage(Message msg) {
if (!msg.getSID().equals(targetSid.get())) {
return STATUS_HANDLED; // wrong sid is throwaway from previous consumer that errored
return STATUS_HANDLED; // wrong sid. message is a throwaway from previous consumer that errored
}

if (msg.isJetStream()) {
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/support/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class Status {
public static String CONSUMER_IS_PUSH_BASED = "Consumer is push based"; // 409

public static String MESSAGE_SIZE_EXCEEDS_MAX_BYTES = "Message Size Exceeds MaxBytes"; // 409
public static String EXCEEDED_MAX_PREFIX = "Exceeded Max";
public static String EXCEEDED_MAX_WAITING = "Exceeded MaxWaiting"; // 409
public static String EXCEEDED_MAX_REQUEST_BATCH = "Exceeded MaxRequestBatch"; // 409
public static String EXCEEDED_MAX_REQUEST_EXPIRES = "Exceeded MaxRequestExpires"; // 409
Expand Down
Loading