Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions src/main/java/io/nats/client/impl/ProtocolMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,15 @@ class ProtocolMessage extends NatsPublishableMessage {
}

ProtocolMessage(byte[] protocol) {
super(false);
protocolBab = new ByteArrayBuilder(protocol);
sizeInBytes = controlLineLength = protocolBab.length() + 2; // CRLF, protocol doesn't have data
this.filterOnStop = true;
this(new ByteArrayBuilder(protocol), true);
}

ProtocolMessage(byte[] protocol, boolean filterOnStop) {
this(new ByteArrayBuilder(protocol), filterOnStop);
}

ProtocolMessage(ProtocolMessage pm) {
super(false);
protocolBab = pm.protocolBab;
sizeInBytes = controlLineLength = pm.sizeInBytes;
filterOnStop = pm.filterOnStop;
this(pm.protocolBab, pm.filterOnStop);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,35 @@
package io.nats.client.impl;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class ForceReconnectQueueCheckDataPort extends SocketDataPort {
public static String WRITE_CHECK;
private static byte[] WRITE_CHECK;
private static int WC_LEN;
public static long DELAY;

public static void setCheck(String check) {
WRITE_CHECK = check.getBytes(StandardCharsets.ISO_8859_1);
WC_LEN = check.length();
}

@Override
public void write(byte[] src, int toWrite) throws IOException {
String s = new String(src, 0, Math.min(7, toWrite));
if (s.startsWith(WRITE_CHECK)) {
try {
Thread.sleep(DELAY);
if (src.length >= WC_LEN) {
boolean check = true;
for (int x = 0; x < WC_LEN; x++) {
if (src[x] != WRITE_CHECK[x]) {
check = false;
break;
}
}
catch (InterruptedException e) {
throw new RuntimeException(e);
if (check) {
try {
Thread.sleep(DELAY);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
super.write(src, toWrite);
Expand Down
12 changes: 10 additions & 2 deletions src/test/java/io/nats/client/impl/ListenerForTesting.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package io.nats.client.impl;

import io.nats.client.*;
import io.nats.client.support.DateTimeUtils;
import io.nats.client.support.Status;

import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -291,8 +293,14 @@ else if (consumer instanceof NatsDispatcher) {
}
}

public static final DateTimeFormatter SIMPLE_TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

public static String simpleTime() {
return SIMPLE_TIME_FORMATTER.format(DateTimeUtils.gmtNow());
}

private void report(String func, Object message) {
System.out.println("[" + System.currentTimeMillis() + " ListenerForTesting." + func + "] " + message);
System.out.println("[" + simpleTime() + " ListenerForTesting." + func + "] " + message);
}

private final ReentrantLock listLock = new ReentrantLock();
Expand Down Expand Up @@ -597,4 +605,4 @@ public String toString() {
private static String extractSid(JetStreamSubscription sub) {
return ((NatsJetStreamSubscription)sub).getSID();
}
}
}
12 changes: 5 additions & 7 deletions src/test/java/io/nats/client/impl/ReconnectTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.nats.client.*;
import io.nats.client.ConnectionListener.Events;
import io.nats.client.api.ServerInfo;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Isolated;

Expand Down Expand Up @@ -806,7 +805,6 @@ public boolean includeAllServers() {
}

@Test
@Disabled("TODO FIGURE THIS OUT")
public void testForceReconnectQueueBehaviorCheck() throws Exception {
runInJsCluster((nc0, nc1, nc2) -> {
if (atLeast2_9_0(nc0)) {
Expand All @@ -818,19 +816,19 @@ public void testForceReconnectQueueBehaviorCheck() throws Exception {
ForceReconnectQueueCheckDataPort.DELAY = 75;

String subject = subject();
ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject;
ForceReconnectQueueCheckDataPort.setCheck("PUB " + subject);
_testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, false, 0);

subject = subject();
ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject;
ForceReconnectQueueCheckDataPort.setCheck("PUB " + subject);
_testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, false, flushWait);

subject = subject();
ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject;
ForceReconnectQueueCheckDataPort.setCheck("PUB " + subject);
_testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, true, 0);

subject = subject();
ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject;
ForceReconnectQueueCheckDataPort.setCheck("PUB " + subject);
_testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, true, flushWait);
}
});
Expand Down Expand Up @@ -972,7 +970,7 @@ public void testSocketDataPortTimeout() throws Exception {
try {
nc.publish(subject, ("" + pubId.incrementAndGet()).getBytes());
if (pubId.get() == 10) {
SocketDataPortBlockSimulator.SIMULATE_SOCKET_BLOCK.set(60000);
SocketDataPortBlockSimulator.simulateBlock();
}
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ public void connect(@NonNull NatsConnection conn, @NonNull NatsUri nuri, long ti
});
}

public static AtomicLong SIMULATE_SOCKET_BLOCK = new AtomicLong();
private static final AtomicLong SIMULATE_SOCKET_BLOCK = new AtomicLong();

public static void simulateBlock() {
SIMULATE_SOCKET_BLOCK.set(60000);
}

AtomicLong blocking = new AtomicLong();
public void write(byte[] src, int toWrite) throws IOException {
try {
Expand All @@ -87,6 +92,7 @@ public void write(byte[] src, int toWrite) throws IOException {
blocking.addAndGet(-100);
}
catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
return;
}
}
Expand Down
Loading