Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 31 additions & 41 deletions src/test/java/io/nats/client/impl/SocketDataPortBlockSimulator.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,47 +13,31 @@

package io.nats.client.impl;

import io.nats.client.ForceReconnectOptions;
import io.nats.client.NatsSystemClock;
import io.nats.client.Options;
import io.nats.client.support.NatsUri;
import io.nats.client.support.ScheduledTask;
import org.jspecify.annotations.NonNull;

import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicLong;

@SuppressWarnings("ClassEscapesDefinedScope") // NatsConnection
public class SocketDataPortBlockSimulator extends SocketDataPort {

private long writeTimeoutNanos;
private long delayPeriodMillis;
private Timer writeWatcherTimer;
private WriteWatcherTask writeWatcherTask;
private volatile long writeMustBeDoneBy = Long.MAX_VALUE;
private ScheduledTask writeWatchTask;
private final AtomicLong writeMustBeDoneBy;

class WriteWatcherTask extends TimerTask {
@Override
public void run() {
// if now is after when it was supposed to be done by
if (System.nanoTime() > writeMustBeDoneBy) {
writeWatcherTimer.cancel(); // we don't need to repeat this
connection.executeCallback((c, el) -> el.socketWriteTimeout(c));
try {
out.close();
}
catch (IOException ignore) {}
blocking.set(0);
SIMULATE_SOCKET_BLOCK.set(0);
try {
connection.forceReconnect();
}
catch (InterruptedException | IOException ignore) {}
}
}
public SocketDataPortBlockSimulator() {
this.writeMustBeDoneBy = new AtomicLong(Long.MAX_VALUE);
}

@Override
public void afterConstruct(Options options) {
super.afterConstruct(options);
long writeTimeoutMillis;
if (options.getSocketWriteTimeout() == null) {
writeTimeoutMillis = Options.DEFAULT_SOCKET_WRITE_TIMEOUT.toMillis();
Expand All @@ -68,16 +52,33 @@ public void afterConstruct(Options options) {
@Override
public void connect(@NonNull NatsConnection conn, @NonNull NatsUri nuri, long timeoutNanos) throws IOException {
super.connect(conn, nuri, timeoutNanos);
writeWatcherTimer = new Timer();
writeWatcherTask = new WriteWatcherTask();
writeWatcherTimer.schedule(writeWatcherTask, delayPeriodMillis, delayPeriodMillis);
writeWatchTask = new ScheduledTask(conn.getScheduledExecutor(), delayPeriodMillis,
() -> {
// if now is after when it was supposed to be done by
if (NatsSystemClock.nanoTime() > writeMustBeDoneBy.get()) {
writeWatchTask.shutdown(); // we don't need to repeat this, the connection is going to be closed
connection.executeCallback((c, el) -> el.socketWriteTimeout(c));
blocking.set(0);
SIMULATE_SOCKET_BLOCK.set(0);
try {
connection.forceReconnect(ForceReconnectOptions.FORCE_CLOSE_INSTANCE);
}
catch (IOException e) {
// retry maybe?
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
// This task is going to re-run anyway, so no point in throwing
}
}
});
}

public static AtomicLong SIMULATE_SOCKET_BLOCK = new AtomicLong();
AtomicLong blocking = new AtomicLong();
public void write(byte[] src, int toWrite) throws IOException {
try {
writeMustBeDoneBy = System.nanoTime() + writeTimeoutNanos;
writeMustBeDoneBy.set(NatsSystemClock.nanoTime() + writeTimeoutNanos);
blocking.set(SIMULATE_SOCKET_BLOCK.get());
while (blocking.get() > 0) {
try {
Expand All @@ -92,23 +93,12 @@ public void write(byte[] src, int toWrite) throws IOException {
out.write(src, 0, toWrite);
}
finally {
writeMustBeDoneBy = Long.MAX_VALUE;
writeMustBeDoneBy.set(Long.MAX_VALUE);
}
}

public void close() throws IOException {
try {
writeWatcherTask.cancel();
}
catch (Exception ignore) {
// don't want this to be passed along
}
try {
writeWatcherTimer.cancel();
}
catch (Exception ignore) {
// don't want this to be passed along
}
writeWatchTask.shutdown();
super.close();
}
}
Loading