diff --git a/src/driver/src/main/java/com/edgedb/driver/binary/duplexers/ChannelDuplexer.java b/src/driver/src/main/java/com/edgedb/driver/binary/duplexers/ChannelDuplexer.java index d025c21b..f9d2bda2 100644 --- a/src/driver/src/main/java/com/edgedb/driver/binary/duplexers/ChannelDuplexer.java +++ b/src/driver/src/main/java/com/edgedb/driver/binary/duplexers/ChannelDuplexer.java @@ -16,7 +16,8 @@ import javax.net.ssl.SSLException; import java.util.ArrayDeque; import java.util.Queue; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.Function; public class ChannelDuplexer extends Duplexer { @@ -36,6 +37,7 @@ public class ChannelDuplexer extends Duplexer { private @Nullable Channel channel; + @io.netty.channel.ChannelHandler.Sharable public class ChannelHandler extends ChannelInboundHandlerAdapter { private CompletableFuture channelActivePromise; @@ -44,26 +46,25 @@ public ChannelHandler() { } @Override - public void channelActive(@NotNull ChannelHandlerContext ctx) throws Exception { + public void channelActive(@NotNull ChannelHandlerContext ctx) { isConnected = true; - super.channelActive(ctx); channelActivePromise.complete(null); } @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if(evt.equals("RESET")) { channelActivePromise = new CompletableFuture<>(); } } @Override - public void channelInactive(@NotNull ChannelHandlerContext ctx) throws Exception { + public void channelInactive(@NotNull ChannelHandlerContext ctx) { isConnected = false; } @Override - public void channelRead(@NotNull ChannelHandlerContext ctx, @NotNull Object msg) throws Exception { + public void channelRead(@NotNull ChannelHandlerContext ctx, @NotNull Object msg) { synchronized (messageEnqueueReference) { if(readPromises.isEmpty()) { messageQueue.add((Receivable)msg); @@ -77,7 +78,7 @@ public void channelRead(@NotNull ChannelHandlerContext ctx, @NotNull Object msg) } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.error("Channel failed", cause); } @@ -197,6 +198,7 @@ public CompletionStage disconnect() { return send(new Terminate()) .thenCompose(v -> ChannelCompletableFuture.completeFrom(this.channel.disconnect())); } + return ChannelCompletableFuture.completeFrom(this.channel.disconnect()); } } diff --git a/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBBinaryClient.java b/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBBinaryClient.java index e5172da0..ca5cc505 100644 --- a/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBBinaryClient.java +++ b/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBBinaryClient.java @@ -924,7 +924,11 @@ private CompletionStage connectInternal() { private CompletionStage retryableConnect() { try { return exceptionallyCompose(this.openConnection(), err -> { - if(err instanceof ConnectionFailedTemporarilyException) { + if(err.getCause() instanceof ConnectionFailedTemporarilyException) { + if(getConfig().getConnectionRetryMode() == ConnectionRetryMode.NEVER_RETRY) { + return CompletableFuture.failedFuture(new ConnectionFailedException(err)); + } + if(this.connectionAttempts < getConfig().getMaxConnectionRetries()) { this.connectionAttempts++; @@ -935,6 +939,7 @@ private CompletionStage retryableConnect() { } else { logger.error("Failed to establish a connection after {} attempts", this.connectionAttempts, err); this.connectionAttempts = 0; + return CompletableFuture.failedFuture(new ConnectionFailedException(this.connectionAttempts, err)); } } diff --git a/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBTCPClient.java b/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBTCPClient.java index 9cdda86a..4dbab069 100644 --- a/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBTCPClient.java +++ b/src/driver/src/main/java/com/edgedb/driver/clients/EdgeDBTCPClient.java @@ -1,13 +1,15 @@ package com.edgedb.driver.clients; +import com.edgedb.driver.*; import com.edgedb.driver.async.ChannelCompletableFuture; import com.edgedb.driver.binary.PacketSerializer; import com.edgedb.driver.binary.duplexers.ChannelDuplexer; -import com.edgedb.driver.*; import com.edgedb.driver.exceptions.ConnectionFailedTemporarilyException; import com.edgedb.driver.util.SslUtils; import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; @@ -35,21 +37,21 @@ public class EdgeDBTCPClient extends EdgeDBBinaryClient implements TransactableC private static final long CONNECTION_TIMEOUT = 5000; private static final Logger logger = LoggerFactory.getLogger(EdgeDBTCPClient.class); - private static final NioEventLoopGroup nettyTcpGroup = new NioEventLoopGroup(); - private static final EventExecutorGroup duplexerGroup = new DefaultEventExecutorGroup(8); + private static final NioEventLoopGroup NETTY_TCP_GROUP = new NioEventLoopGroup(); + private static final EventExecutorGroup DUPLEXER_GROUP = new DefaultEventExecutorGroup(8); - private final Bootstrap bootstrap; private final ChannelDuplexer duplexer; + private final Bootstrap bootstrap; private TransactionState transactionState; public EdgeDBTCPClient(EdgeDBConnection connection, EdgeDBClientConfig config, AutoCloseable poolHandle) { super(connection, config, poolHandle); - this.duplexer = new ChannelDuplexer(this); setDuplexer(this.duplexer); this.bootstrap = new Bootstrap() - .group(nettyTcpGroup) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .group(NETTY_TCP_GROUP) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override @@ -66,7 +68,7 @@ protected void initChannel(@NotNull SocketChannel ch) throws Exception { "edgedb-binary" )); - SslUtils.applyTrustManager(connection, builder); + SslUtils.applyTrustManager(getConnectionArguments(), builder); pipeline.addLast("ssl", builder.build().newHandler(ch.alloc())); @@ -76,13 +78,12 @@ protected void initChannel(@NotNull SocketChannel ch) throws Exception { PacketSerializer.createEncoder() ); - pipeline.addLast(duplexerGroup, duplexer.channelHandler); + pipeline.addLast(DUPLEXER_GROUP, duplexer.channelHandler); duplexer.init(ch); } }); } - @Override protected void setTransactionState(TransactionState state) { this.transactionState = state; @@ -92,18 +93,23 @@ protected void setTransactionState(TransactionState state) { protected CompletionStage openConnection() { final var connection = getConnectionArguments(); + try { - return exceptionallyCompose(ChannelCompletableFuture.completeFrom( - bootstrap.connect( + return exceptionallyCompose( + ChannelCompletableFuture.completeFrom( + bootstrap.connect( connection.getHostname(), connection.getPort() - )), e -> { - if(e instanceof CompletionException && e.getCause() instanceof ConnectException) { - return CompletableFuture.failedFuture(new ConnectionFailedTemporarilyException(e)); + ) + ), + e -> { + if(e instanceof CompletionException && e.getCause() instanceof ConnectException) { + return CompletableFuture.failedFuture(new ConnectionFailedTemporarilyException(e)); + } + + return CompletableFuture.failedFuture(e); } - - return CompletableFuture.failedFuture( e); - }) + ) .orTimeout(getConfig().getConnectionTimeoutValue(), getConfig().getConnectionTimeoutUnit()); } catch (Exception err) { diff --git a/src/driver/src/main/java/com/edgedb/driver/exceptions/ConnectionFailedException.java b/src/driver/src/main/java/com/edgedb/driver/exceptions/ConnectionFailedException.java new file mode 100644 index 00000000..8cdf0bbe --- /dev/null +++ b/src/driver/src/main/java/com/edgedb/driver/exceptions/ConnectionFailedException.java @@ -0,0 +1,23 @@ +package com.edgedb.driver.exceptions; + +/** + * Represents a connection failure that cannot be retried. + */ +public class ConnectionFailedException extends EdgeDBException { + /** + * Constructs a new {@linkplain ConnectionFailedException}. + * @param attempts The number of attempts the binding made to connect. + * @param cause The inner cause of this exception. + */ + public ConnectionFailedException(int attempts, Throwable cause) { + super("The connection failed to be established after " + attempts + " attempt(s)", cause, false, false); + } + + /** + * Constructs a new {@linkplain ConnectionFailedException}. + * @param cause The inner cause of this exception. + */ + public ConnectionFailedException(Throwable cause) { + super("The connection failed to be established", cause, false, false); + } +}