Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
import javax.net.ssl.SSLException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

public class ChannelDuplexer extends Duplexer {
Expand All @@ -36,6 +37,7 @@ public class ChannelDuplexer extends Duplexer {
private @Nullable Channel channel;


@io.netty.channel.ChannelHandler.Sharable
public class ChannelHandler extends ChannelInboundHandlerAdapter {
private CompletableFuture<Void> channelActivePromise;

Expand All @@ -44,26 +46,25 @@ public ChannelHandler() {
}

@Override
public void channelActive(@NotNull ChannelHandlerContext ctx) throws Exception {
public void channelActive(@NotNull ChannelHandlerContext ctx) {
isConnected = true;
super.channelActive(ctx);
channelActivePromise.complete(null);
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if(evt.equals("RESET")) {
channelActivePromise = new CompletableFuture<>();
}
}

@Override
public void channelInactive(@NotNull ChannelHandlerContext ctx) throws Exception {
public void channelInactive(@NotNull ChannelHandlerContext ctx) {
isConnected = false;
}

@Override
public void channelRead(@NotNull ChannelHandlerContext ctx, @NotNull Object msg) throws Exception {
public void channelRead(@NotNull ChannelHandlerContext ctx, @NotNull Object msg) {
synchronized (messageEnqueueReference) {
if(readPromises.isEmpty()) {
messageQueue.add((Receivable)msg);
Expand All @@ -77,7 +78,7 @@ public void channelRead(@NotNull ChannelHandlerContext ctx, @NotNull Object msg)
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("Channel failed", cause);
}

Expand Down Expand Up @@ -197,6 +198,7 @@ public CompletionStage<Void> disconnect() {
return send(new Terminate())
.thenCompose(v -> ChannelCompletableFuture.completeFrom(this.channel.disconnect()));
}

return ChannelCompletableFuture.completeFrom(this.channel.disconnect());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,11 @@ private CompletionStage<Void> connectInternal() {
private CompletionStage<Void> retryableConnect() {
try {
return exceptionallyCompose(this.openConnection(), err -> {
if(err instanceof ConnectionFailedTemporarilyException) {
if(err.getCause() instanceof ConnectionFailedTemporarilyException) {
if(getConfig().getConnectionRetryMode() == ConnectionRetryMode.NEVER_RETRY) {
return CompletableFuture.failedFuture(new ConnectionFailedException(err));
}

if(this.connectionAttempts < getConfig().getMaxConnectionRetries()) {
this.connectionAttempts++;

Expand All @@ -935,6 +939,7 @@ private CompletionStage<Void> retryableConnect() {
} else {
logger.error("Failed to establish a connection after {} attempts", this.connectionAttempts, err);
this.connectionAttempts = 0;
return CompletableFuture.failedFuture(new ConnectionFailedException(this.connectionAttempts, err));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package com.edgedb.driver.clients;

import com.edgedb.driver.*;
import com.edgedb.driver.async.ChannelCompletableFuture;
import com.edgedb.driver.binary.PacketSerializer;
import com.edgedb.driver.binary.duplexers.ChannelDuplexer;
import com.edgedb.driver.*;
import com.edgedb.driver.exceptions.ConnectionFailedTemporarilyException;
import com.edgedb.driver.util.SslUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
Expand Down Expand Up @@ -35,21 +37,21 @@ public class EdgeDBTCPClient extends EdgeDBBinaryClient implements TransactableC
private static final long CONNECTION_TIMEOUT = 5000;

private static final Logger logger = LoggerFactory.getLogger(EdgeDBTCPClient.class);
private static final NioEventLoopGroup nettyTcpGroup = new NioEventLoopGroup();
private static final EventExecutorGroup duplexerGroup = new DefaultEventExecutorGroup(8);
private static final NioEventLoopGroup NETTY_TCP_GROUP = new NioEventLoopGroup();
private static final EventExecutorGroup DUPLEXER_GROUP = new DefaultEventExecutorGroup(8);

private final Bootstrap bootstrap;
private final ChannelDuplexer duplexer;
private final Bootstrap bootstrap;
private TransactionState transactionState;

public EdgeDBTCPClient(EdgeDBConnection connection, EdgeDBClientConfig config, AutoCloseable poolHandle) {
super(connection, config, poolHandle);

this.duplexer = new ChannelDuplexer(this);
setDuplexer(this.duplexer);

this.bootstrap = new Bootstrap()
.group(nettyTcpGroup)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.group(NETTY_TCP_GROUP)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
Expand All @@ -66,7 +68,7 @@ protected void initChannel(@NotNull SocketChannel ch) throws Exception {
"edgedb-binary"
));

SslUtils.applyTrustManager(connection, builder);
SslUtils.applyTrustManager(getConnectionArguments(), builder);

pipeline.addLast("ssl", builder.build().newHandler(ch.alloc()));

Expand All @@ -76,13 +78,12 @@ protected void initChannel(@NotNull SocketChannel ch) throws Exception {
PacketSerializer.createEncoder()
);

pipeline.addLast(duplexerGroup, duplexer.channelHandler);
pipeline.addLast(DUPLEXER_GROUP, duplexer.channelHandler);

duplexer.init(ch);
}
});
}

@Override
protected void setTransactionState(TransactionState state) {
this.transactionState = state;
Expand All @@ -92,18 +93,23 @@ protected void setTransactionState(TransactionState state) {
protected CompletionStage<Void> openConnection() {
final var connection = getConnectionArguments();


try {
return exceptionallyCompose(ChannelCompletableFuture.completeFrom(
bootstrap.connect(
return exceptionallyCompose(
ChannelCompletableFuture.completeFrom(
bootstrap.connect(
connection.getHostname(),
connection.getPort()
)), e -> {
if(e instanceof CompletionException && e.getCause() instanceof ConnectException) {
return CompletableFuture.failedFuture(new ConnectionFailedTemporarilyException(e));
)
),
e -> {
if(e instanceof CompletionException && e.getCause() instanceof ConnectException) {
return CompletableFuture.failedFuture(new ConnectionFailedTemporarilyException(e));
}

return CompletableFuture.failedFuture(e);
}

return CompletableFuture.failedFuture( e);
})
)
.orTimeout(getConfig().getConnectionTimeoutValue(), getConfig().getConnectionTimeoutUnit());
}
catch (Exception err) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.edgedb.driver.exceptions;

/**
* Represents a connection failure that cannot be retried.
*/
public class ConnectionFailedException extends EdgeDBException {
/**
* Constructs a new {@linkplain ConnectionFailedException}.
* @param attempts The number of attempts the binding made to connect.
* @param cause The inner cause of this exception.
*/
public ConnectionFailedException(int attempts, Throwable cause) {
super("The connection failed to be established after " + attempts + " attempt(s)", cause, false, false);
}

/**
* Constructs a new {@linkplain ConnectionFailedException}.
* @param cause The inner cause of this exception.
*/
public ConnectionFailedException(Throwable cause) {
super("The connection failed to be established", cause, false, false);
}
}