Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,33 @@

import io.netty.channel.ChannelFuture;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;

public class ChannelCompletableFuture extends CompletableFuture<Void> {
private static final Logger logger = LoggerFactory.getLogger(ChannelCompletableFuture.class);

public static @NotNull ChannelCompletableFuture completeFrom(@NotNull ChannelFuture future) {
var completableFuture = new ChannelCompletableFuture();
logger.debug("Registering {}", future.hashCode());

future.addListener((v) -> {
if(v.cause() != null) {
logger.debug("Failing from {}", future.hashCode());
completableFuture.completeExceptionally(v.cause());
return;
}

if(v.isCancelled()) {
logger.debug("Cancelling from {}", future.hashCode());
completableFuture.cancel(true);
return;
}

logger.debug("Completing from {}", future.hashCode());

completableFuture.complete(null);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,22 @@ public ChannelHandler() {
channelActivePromise = new CompletableFuture<>();
}

public void reset() {
logger.debug("Resetting channel handler");
this.channelActivePromise = new CompletableFuture<Void>();
}

@Override
public void channelActive(@NotNull ChannelHandlerContext ctx) {
logger.debug("Channel active");
isConnected = true;
channelActivePromise.complete(null);
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, @NotNull Object evt) {
if(evt.equals("RESET")) {
channelActivePromise = new CompletableFuture<>();
} else if (evt.equals("TIMEOUT")) {
logger.debug("event fired {}", evt);
if (evt.equals("TIMEOUT")) {
var exc = new TimeoutException("A message read process passed the configured message timeout");
for(var promise : readPromises) {
promise.completeExceptionally(exc);
Expand All @@ -83,26 +88,28 @@ public void userEventTriggered(ChannelHandlerContext ctx, @NotNull Object evt) {
@Override
public void channelInactive(@NotNull ChannelHandlerContext ctx) {
isConnected = false;
logger.debug("Channel inactive");
}

@Override
public void channelRead(@NotNull ChannelHandlerContext ctx, @NotNull Object msg) {
var protocolMessage = (Receivable)msg;
logger.debug("Read fired, entering message lock, message type {}", protocolMessage.getMessageType());

if(
protocolMessage instanceof ErrorResponse && (
((ErrorResponse)protocolMessage).errorCode == ErrorCode.IDLE_SESSION_TIMEOUT_ERROR ||
((ErrorResponse)protocolMessage).errorCode == ErrorCode.IDLE_TRANSACTION_TIMEOUT_ERROR
)
) {
logger.debug("Got idle disconnect message, marking as closed");
isConnected = false;
return;
}

int completeCount = 0;

try {
logger.debug("Read fired, entering message lock, message type {}", protocolMessage.getMessageType());
if(!messageEnqueueLock.tryLock(client.getConfig().getMessageTimeoutValue(), client.getConfig().getMessageTimeoutUnit())) {
ctx.fireUserEventTriggered("TIMEOUT");
return;
Expand Down Expand Up @@ -248,7 +255,10 @@ private CompletionStage<Void> send0(AtomicInteger attempts, Sendable packet, @Nu

attempts.incrementAndGet();

return client.connect().thenCompose(n -> send0(attempts, packet, packets));
return client.reconnect().thenCompose(n -> {
logger.debug("Reconnect complete, retrying send");
return send0(attempts, packet, packets);
});
}

return send1(packet, packets);
Expand Down Expand Up @@ -333,7 +343,7 @@ public void init(Channel channel) {
@Override
public void reset() {
if(this.channel != null) {
channel.pipeline().fireUserEventTriggered("RESET");
this.channelHandler.reset();
}
}

Expand All @@ -348,11 +358,14 @@ public CompletionStage<Void> disconnect() {
return CompletableFuture.completedFuture(null);
}

if(this.channel.isOpen()) {
if(this.isConnected) {
logger.debug("Sending terminate for disconnect");
return send(new Terminate())
.thenCompose(v -> ChannelCompletableFuture.completeFrom(this.channel.close()));
.thenCompose(v -> ChannelCompletableFuture.completeFrom(this.channel.disconnect()));
}

return ChannelCompletableFuture.completeFrom(this.channel.close());
logger.debug("Closing channel without terminating");

return ChannelCompletableFuture.completeFrom(this.channel.disconnect());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.edgedb.driver.state.Config;
import com.edgedb.driver.state.Session;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Optional;
Expand All @@ -15,6 +17,7 @@
import java.util.function.Function;

public abstract class BaseEdgeDBClient implements StatefulClient, EdgeDBQueryable, AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(BaseEdgeDBClient.class);
private final @NotNull AsyncEvent<BaseEdgeDBClient> onReady;
private final EdgeDBConnection connection;
private final EdgeDBClientConfig config;
Expand Down Expand Up @@ -89,7 +92,10 @@ public EdgeDBClientConfig getConfig() {
public abstract CompletionStage<Void> disconnect();

public CompletionStage<Void> reconnect() {
return disconnect().thenCompose((v) -> connect());
return disconnect().thenCompose((v) -> {
logger.debug("Executing connection attempt from reconnect");
return connect();
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,11 @@ public CompletionStage<Void> connect() {
return CompletableFuture
.runAsync(() -> {
try {
this.connectionSemaphore.acquire();
logger.debug("Acquiring connection lock...");
if(!this.connectionSemaphore.tryAcquire(getConfig().getConnectionTimeoutValue(), getConfig().getConnectionTimeoutUnit())) {
logger.debug("Failed to acquire connection lock after timeout");
throw new CompletionException(new ConnectionFailedException("Connection failed to be established because of a already existing attempt"));
}
} catch (InterruptedException e) {
throw new CompletionException(e);
}
Expand All @@ -874,6 +878,7 @@ public CompletionStage<Void> connect() {
}

private CompletionStage<Void> doClientHandshake() {
logger.debug("Reading for handshake");
return getDuplexer().readNext()
.thenCompose(packet -> {
logger.debug("Processing handshake step with packet: {}", packet == null ? "NULL" : packet.getMessageType());
Expand All @@ -900,10 +905,13 @@ public CompletionStage<Void> disconnect() {
}

private CompletionStage<Void> connectInternal() {
logger.debug("Beginning to run connection logic");
if(getDuplexer().isConnected()) {
logger.debug("Already connected, ignoring connection attempt");
return CompletableFuture.completedFuture(null);
}

logger.debug("Resetting ready state");
getDuplexer().reset();
this.readyPromise = new CompletableFuture<>();

Expand All @@ -926,12 +934,17 @@ private CompletionStage<Void> connectInternal() {
connectionParams,
new ProtocolExtension[0]
))
.thenApply(v -> {
logger.debug("Sending handshake");
return v;
})
.thenCompose(getDuplexer()::send);
}

private CompletionStage<Void> retryableConnect() {
try {
return exceptionallyCompose(this.openConnection(), err -> {
logger.debug("Connection attempt failed", err);
if(err instanceof EdgeDBException && ((EdgeDBException)err).shouldReconnect) {
if(getConfig().getConnectionRetryMode() == ConnectionRetryMode.NEVER_RETRY) {
return CompletableFuture.failedFuture(new ConnectionFailedException(err));
Expand All @@ -955,6 +968,7 @@ private CompletionStage<Void> retryableConnect() {
});
}
catch (Exception x) {
logger.debug("Connection failed", x);
return CompletableFuture.failedFuture(x);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,23 @@ protected void setTransactionState(TransactionState state) {
protected CompletionStage<Void> openConnection() {
final var connection = getConnectionArguments();


try {
logger.debug("Opening connection from bootstrap");
return exceptionallyCompose(
ChannelCompletableFuture.completeFrom(
bootstrap.connect(
connection.getHostname(),
connection.getPort()
)
),
e -> {
if(e instanceof CompletionException && e.getCause() instanceof ConnectException) {
return CompletableFuture.failedFuture(new ConnectionFailedTemporarilyException(e));
}
), e -> {
logger.debug("Connection failed", e);

return CompletableFuture.failedFuture(e);
if(e instanceof CompletionException && e.getCause() instanceof ConnectException) {
return CompletableFuture.failedFuture(new ConnectionFailedTemporarilyException(e));
}
)

return CompletableFuture.failedFuture(e);
})
.orTimeout(getConfig().getConnectionTimeoutValue(), getConfig().getConnectionTimeoutUnit());
}
catch (Exception err) {
Expand Down