Skip to content

Conversation

@scottf
Copy link
Contributor

@scottf scottf commented Oct 3, 2025

The original intention here was to know when the re-subscribe messages, queued just before this the flush, finished publishing. This would indicate that the reconnect mode had completed and the reconnect mode could be cleared. The problem was that with very large number of subscriptions, the flush could fail, triggering a reconnect loop.

This has been replaced with a marker in the reconnect queue, set via writer.enterWaitingForEndReconnectMode(). The writer will then automatically switch the mode itself when it reaches the marker.

Modes

There are now 3 modes:

enum Mode {
   Normal, Reconnect, WaitingForEndReconnect
}

Normal

  • accumulate messages from the normal queue for writing across the wire
  • add all messages to the normal queue.

Reconnect

  • accumulate internal messages (Connect, Ping, [re]subscribe) from the reconnect queue, bypassing the user messages in the normal queue
  • add internal messages to the reconnect queue for accumulation/bypass. user messages still go to the normal queue.

WaitingForEndReconnect

  • accumulate internal messages from the reconnect queue until the marker is found. This allows all the previously queued internal/reconnect messages to be processed
  • add all messages to the normal queue since all the reconnect specific messages have been queued and are processing.

Queuing an "internal" message

When queuing an "internal" message, the idea is that if we are in reconnect mode that message has to go through the reconnect queue since we are holding user messages in the normal queue. Connect, Ping and [re] subscribe messages are "internal". Ping can also be sent during normal connection, and even though internal, when in normal connected mode, it just goes in the normal queue.

@scottf scottf requested a review from MauriceVanVeen October 3, 2025 12:20
Copy link
Member

@MauriceVanVeen MauriceVanVeen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

// A simple == is used to check if any message in the queue is this message.
// /\ /\ /\ /\ which is why it is now a static. It's just a marker anyway.
protected static final NatsMessage POISON_PILL = new NatsMessage(POISON, null, EMPTY_BODY);
protected static final NatsMessage POISON_PILL = new NatsMessage("_poison", null, EMPTY_BODY);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was in this file looking at the marker pill and ended up doing some refactoring. No functionality was changed and I clarified the comment.

} finally {
editLock.unlock();
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows clearing of a queue.

}
catch (Exception exp) {
this.processException(exp);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that the original intention here was to know when the re-subscribe messages, queued just before this block, finished publishing, meaning the reconnect mode had completed and the reconnect mode could be cleared. The problem was that with very large number of subscriptions, the flush could fail, triggering a reconnect loop.
This has been replaced with a marker in the reconnect queue, set via writer.enterWaitingForEndReconnectMode() which the writer will then automatically switch the mode itself when it reaches the marker.


// When the flush returns, we are done sending internal messages,
// so we can switch to the non-reconnect queue
this.writer.setReconnectMode(false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced by writer.enterWaitingForEndReconnectMode

try {
long time = NatsSystemClock.nanoTime();
writer.queueInternalMessage(new ProtocolMessage(PING_PROTO));
writer.queue(new ProtocolMessage(PING_PROTO));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was changed after a review of the internal queuing and determined it was kinda pointless. If they make an rtt call while normally connected, this message just goes into the normal queue anyway. If they made the rtt call during reconnect, it would have gone into the internal queue, but so what, it's time is going to be skewed anyway.
This whole RTT may need to be put into a "bypass" queue of some sort so on normal connect, it goes directly to the front of the line.

}
else {
msg = this.outgoing.accumulate(sendBufferLength.get(), Options.MAX_MESSAGES_IN_NETWORK_BUFFER, outgoingTimeout);
msg = this.reconnectOutgoing.accumulate(sendBufferLength.get(), Options.MAX_MESSAGES_IN_NETWORK_BUFFER, reconnectTimeout);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not normal mode is both the others, we need to get all the messages from the reconnect queue until the end mark is found, and then we can switch to the normal mode.

reconnectMode.set(tf);
void enterReconnectMode() {
reconnectOutgoing.clear();
mode.set(Mode.Reconnect);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. clear the queue. Should probably already be empty, but just in case.
  2. set the mode


void enterWaitingForEndReconnectMode() {
reconnectOutgoing.push(END_RECONNECT);
mode.set(Mode.WaitingForEndReconnect);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. mark the reconnect queue
  2. set the mode

} else {
this.outgoing.push(msg, true);
if (mode.get() == Mode.Reconnect) {
reconnectOutgoing.push(msg);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If mode is Reconnect, messages go in that queue.

reconnectOutgoing.push(msg);
}
else {
normalOutgoing.push(msg, true);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If mode is normal put in the normal queue
If mode is waiting for end marker, this means that all the reconnect specific messages have been queued, but we still need to read from the reconnect queue, but write to the normal queue

@scottf scottf merged commit 9ee63d4 into main Oct 3, 2025
4 checks passed
@scottf scottf deleted the track-end-reconnect branch October 3, 2025 14:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants