Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions examples/java-examples/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>

<!-- Log level: OFF, ERROR, WARN, INFO, DEBUG, TRACE, ALL -->

<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>
<root level="ALL">
<appender-ref ref="CONSOLE" />
</root>
</configuration>
56 changes: 35 additions & 21 deletions src/driver/src/main/java/com/edgedb/driver/binary/PacketReader.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
package com.edgedb.driver.binary;

import com.edgedb.driver.binary.codecs.Codec;
import com.edgedb.driver.binary.codecs.CodecContext;
import com.edgedb.driver.binary.packets.shared.Annotation;
import com.edgedb.driver.binary.packets.shared.KeyValue;
import com.edgedb.driver.exceptions.EdgeDBException;
import com.edgedb.driver.binary.protocol.common.Annotation;
import com.edgedb.driver.binary.protocol.common.KeyValue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.joou.UByte;
import org.joou.UInteger;
import org.joou.ULong;
import org.joou.UShort;

import javax.naming.OperationNotSupportedException;
import java.lang.reflect.Array;
import java.nio.charset.StandardCharsets;
import java.util.EnumSet;
Expand All @@ -25,7 +22,21 @@
import static org.joou.Unsigned.*;

public class PacketReader {
private final @NotNull ByteBuf buffer;
public static final class ScopedReader extends PacketReader implements AutoCloseable {
public final boolean isNoData;

public ScopedReader(@Nullable ByteBuf buffer) {
super(buffer == null ? Unpooled.EMPTY_BUFFER : buffer);
isNoData = buffer == null;
}

@Override
public void close() {
buffer.release();
}
}

protected final @NotNull ByteBuf buffer;
private static final @NotNull Map<Class<?>, Function<PacketReader, ? extends Number>> numberReaderMap;

private final int initPos;
Expand Down Expand Up @@ -80,20 +91,6 @@ public boolean isEmpty() {
return this.buffer.readableBytes() == 0;
}

public <T> @Nullable T deserializeByteArray(@NotNull Codec<T> codec, CodecContext context) throws EdgeDBException, OperationNotSupportedException {
var buff = readByteArray();

if(buff == null) {
return null;
}

try {
return codec.deserialize(new PacketReader(buff), context);
} finally {
buff.release();
}
}

public byte[] consumeByteArray() {
var arr = new byte[this.buffer.readableBytes()];
this.buffer.readBytes(arr);
Expand Down Expand Up @@ -171,6 +168,23 @@ public short readInt16() {
return arr;
}

/**
* Reads the {@code length} number of bytes and creates a new {@linkplain ScopedReader} wrapping the bytes.
* @param length The number of bytes to read.
* @return A scoped reader whose close method releases the read bytes.
*/
public ScopedReader scopedSlice(int length) {
return new ScopedReader(readBytes(length));
}

/**
* Calls {@code readByteArray()} and creates a new {@linkplain ScopedReader} wrapping the bytes.
* @return A scoped reader whose close method releases the read bytes.
*/
public ScopedReader scopedSlice() {
return new ScopedReader(readByteArray());
}

public @Nullable ByteBuf readByteArray() {
var len = readInt32();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.edgedb.driver.binary;

import com.edgedb.driver.binary.packets.ServerMessageType;
import com.edgedb.driver.binary.packets.receivable.*;
import com.edgedb.driver.binary.packets.sendables.Sendable;
import com.edgedb.driver.binary.protocol.ServerMessageType;
import com.edgedb.driver.binary.protocol.Receivable;
import com.edgedb.driver.binary.protocol.Sendable;
import com.edgedb.driver.clients.EdgeDBBinaryClient;
import com.edgedb.driver.exceptions.ConnectionFailedException;
import com.edgedb.driver.exceptions.EdgeDBException;
import com.edgedb.driver.util.HexUtils;
Expand All @@ -24,33 +25,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.function.Function;
import java.util.stream.Collectors;

public class PacketSerializer {
private static final Logger logger = LoggerFactory.getLogger(PacketSerializer.class);
private static final @NotNull Map<ServerMessageType, Function<PacketReader, Receivable>> deserializerMap;
private static final Map<Class<?>, Map<Number, Enum<?>>> binaryEnumMap = new HashMap<>();

static {
deserializerMap = new HashMap<>();

deserializerMap.put(ServerMessageType.AUTHENTICATION, AuthenticationStatus::new);
deserializerMap.put(ServerMessageType.COMMAND_COMPLETE, CommandComplete::new);
deserializerMap.put(ServerMessageType.COMMAND_DATA_DESCRIPTION, CommandDataDescription::new);
deserializerMap.put(ServerMessageType.DATA, Data::new);
deserializerMap.put(ServerMessageType.DUMP_BLOCK, DumpBlock::new);
deserializerMap.put(ServerMessageType.DUMP_HEADER, DumpHeader::new);
deserializerMap.put(ServerMessageType.ERROR_RESPONSE, ErrorResponse::new);
deserializerMap.put(ServerMessageType.LOG_MESSAGE, LogMessage::new);
deserializerMap.put(ServerMessageType.PARAMETER_STATUS, ParameterStatus::new);
deserializerMap.put(ServerMessageType.READY_FOR_COMMAND, ReadyForCommand::new);
deserializerMap.put(ServerMessageType.RESTORE_READY, RestoreReady::new);
deserializerMap.put(ServerMessageType.SERVER_HANDSHAKE, ServerHandshake::new);
deserializerMap.put(ServerMessageType.SERVER_KEY_DATA, ServerKeyData::new);
deserializerMap.put(ServerMessageType.STATE_DATA_DESCRIPTION, StateDataDescription::new);
}

public static <T extends Enum<?> & BinaryEnum<U>, U extends Number> void registerBinaryEnum(Class<T> cls, T @NotNull [] values) {
binaryEnumMap.put(cls, Arrays.stream(values).collect(Collectors.toMap(BinaryEnum::getValue, v -> v)));
}
Expand All @@ -64,7 +44,7 @@ public static <T extends Enum<T> & BinaryEnum<U>, U extends Number> T getEnumVal
return (T)binaryEnumMap.get(enumCls).get(raw);
}

public static @NotNull MessageToMessageDecoder<ByteBuf> createDecoder() {
public static @NotNull MessageToMessageDecoder<ByteBuf> createDecoder(EdgeDBBinaryClient client) {
return new MessageToMessageDecoder<>() {
private final Map<Channel, PacketContract> contracts = new HashMap<>();

Expand Down Expand Up @@ -96,7 +76,7 @@ protected void decode(@NotNull ChannelHandlerContext ctx, @NotNull ByteBuf msg,

// can we read this packet?
if (msg.readableBytes() >= length) {
var packet = PacketSerializer.deserialize(type, length, msg.readSlice((int) length));
var packet = PacketSerializer.deserialize(client, type, length, msg.readSlice((int) length));

if(packet == null) {
logger.error("Got null result for packet type {}", type);
Expand Down Expand Up @@ -175,7 +155,7 @@ public boolean tryComplete(@NotNull ByteBuf other) {

if (data.readableBytes() >= length) {
// read
packet = PacketSerializer.deserialize(messageType, length, data, false);
packet = PacketSerializer.deserialize(client, messageType, length, data, false);

return true;
}
Expand Down Expand Up @@ -218,42 +198,36 @@ protected void encode(@NotNull ChannelHandlerContext ctx, @NotNull Sendable msg,
}

public static @Nullable Receivable deserialize(
ServerMessageType messageType, long length, @NotNull ByteBuf buffer
EdgeDBBinaryClient client, ServerMessageType messageType, long length, @NotNull ByteBuf buffer
) {
var reader = new PacketReader(buffer);
return deserializeSingle(messageType, length, reader, true);
return deserializeSingle(client, messageType, length, reader, true);
}

public static @Nullable Receivable deserialize(
ServerMessageType messageType, long length, @NotNull ByteBuf buffer, boolean verifyEmpty
EdgeDBBinaryClient client, ServerMessageType messageType, long length, @NotNull ByteBuf buffer, boolean verifyEmpty
) {
var reader = new PacketReader(buffer);
return deserializeSingle(messageType, length, reader, verifyEmpty);
return deserializeSingle(client, messageType, length, reader, verifyEmpty);
}

public static @Nullable Receivable deserializeSingle(PacketReader reader) {
public static @Nullable Receivable deserializeSingle(EdgeDBBinaryClient client, PacketReader reader) {
var messageType = reader.readEnum(ServerMessageType.class, Byte.TYPE);
var length = reader.readUInt32().longValue();

return deserializeSingle(messageType, length, reader, false);
return deserializeSingle(client, messageType, length, reader, false);
}

public static @Nullable Receivable deserializeSingle(
ServerMessageType type, long length, @NotNull PacketReader reader,
EdgeDBBinaryClient client, ServerMessageType type, long length, @NotNull PacketReader reader,
boolean verifyEmpty
) {
if(!deserializerMap.containsKey(type)) {
logger.error("Unknown packet type {}", type);
reader.skip(length);
return null;
}

try {
return deserializerMap.get(type).apply(reader);
return client.getProtocolProvider().readPacket(type, (int)length, reader);
}
catch (Exception x) {
logger.error("Failed to deserialize packet", x);
throw x;
return null;
}
finally {
// ensure we read the entire packet
Expand All @@ -263,9 +237,15 @@ protected void encode(@NotNull ChannelHandlerContext ctx, @NotNull Sendable msg,
}
}

public static HttpResponse.BodyHandler<List<Receivable>> PACKET_BODY_HANDLER = new PacketBodyHandler();

public static HttpResponse.BodyHandler<List<Receivable>> createHandler(EdgeDBBinaryClient client) {
return new PacketBodyHandler(client);
}
private static class PacketBodyHandler implements HttpResponse.BodyHandler<List<Receivable>> {
private final EdgeDBBinaryClient client;
public PacketBodyHandler(EdgeDBBinaryClient client) {
this.client = client;
}

@Override
public HttpResponse.BodySubscriber<List<Receivable>> apply(HttpResponse.ResponseInfo responseInfo) {
// ensure success
Expand All @@ -276,7 +256,7 @@ public HttpResponse.BodySubscriber<List<Receivable>> apply(HttpResponse.Response
: new PacketBodySubscriber(responseInfo.statusCode());
}

private static class PacketBodySubscriber implements HttpResponse.BodySubscriber<List<Receivable>> {
private class PacketBodySubscriber implements HttpResponse.BodySubscriber<List<Receivable>> {
private final @Nullable List<@NotNull ByteBuf> buffers;
private final CompletableFuture<List<Receivable>> promise;

Expand Down Expand Up @@ -334,7 +314,7 @@ public void onComplete() {
var data = new ArrayList<Receivable>();

while(completeBuffer.readableBytes() > 0) {
var packet = deserializeSingle(reader);
var packet = deserializeSingle(client, reader);

if(packet == null && completeBuffer.readableBytes() > 0) {
promise.completeExceptionally(
Expand Down
Loading