Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.bson.BsonString;
import org.bson.ByteBuf;
import org.bson.FieldNameValidator;
import org.bson.io.BsonOutput;

import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
Expand All @@ -55,6 +56,8 @@
import static com.mongodb.connection.ServerType.STANDALONE;
import static com.mongodb.internal.connection.BsonWriterHelper.appendElementsToDocument;
import static com.mongodb.internal.connection.BsonWriterHelper.backpatchLength;
import static com.mongodb.internal.connection.BsonWriterHelper.createBsonBinaryWriter;
import static com.mongodb.internal.connection.BsonWriterHelper.encodeUsingRegistry;
import static com.mongodb.internal.connection.BsonWriterHelper.writeDocumentsOfDualMessageSequences;
import static com.mongodb.internal.connection.BsonWriterHelper.writePayload;
import static com.mongodb.internal.connection.ByteBufBsonDocument.createList;
Expand All @@ -77,16 +80,20 @@ public final class CommandMessage extends RequestMessage {
*/
private static final byte PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE = 1;

private static final int UNINITIALIZED_POSITION = -1;

private final BsonDocument command;
private final FieldNameValidator commandFieldNameValidator;
private final ReadPreference readPreference;
private final boolean exhaustAllowed;
private final MessageSequences sequences;
private final boolean responseExpected;
private final String database;
private int firstDocumentPosition = UNINITIALIZED_POSITION;

/**
* {@code null} iff either {@link #sequences} is not of the {@link DualMessageSequences} type,
* or it is of that type, but it has not been {@linkplain #encodeMessageBodyWithMetadata(ByteBufferBsonOutput, OperationContext) encoded}.
* or it is of that type, but it has not been {@linkplain #encodeMessageBody(ByteBufferBsonOutput, OperationContext) encoded}.
*/
@Nullable
private Boolean dualMessageSequencesRequireResponse;
Expand Down Expand Up @@ -145,7 +152,7 @@ BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) {
try {
CompositeByteBuf byteBuf = new CompositeByteBuf(byteBuffers);
try {
byteBuf.position(getEncodingMetadata().getFirstDocumentPosition());
byteBuf.position(firstDocumentPosition);
ByteBufBsonDocument byteBufBsonDocument = createOne(byteBuf);

// If true, it means there is at least one `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE` section in the OP_MSG
Expand Down Expand Up @@ -223,9 +230,8 @@ boolean isResponseExpected() {
}

@Override
protected EncodingMetadata encodeMessageBodyWithMetadata(final ByteBufferBsonOutput bsonOutput, final OperationContext operationContext) {
int commandStartPosition = useOpMsg() ? writeOpMsg(bsonOutput, operationContext) : writeOpQuery(bsonOutput);
return new EncodingMetadata(commandStartPosition);
protected void encodeMessageBody(final ByteBufferBsonOutput bsonOutput, final OperationContext operationContext) {
this.firstDocumentPosition = useOpMsg() ? writeOpMsg(bsonOutput, operationContext) : writeOpQuery(bsonOutput);
}

@SuppressWarnings("try")
Expand All @@ -237,7 +243,7 @@ private int writeOpMsg(final ByteBufferBsonOutput bsonOutput, final OperationCon
int commandStartPosition = bsonOutput.getPosition();
List<BsonElement> extraElements = getExtraElements(operationContext);

int commandDocumentSizeInBytes = writeDocument(command, bsonOutput, commandFieldNameValidator);
int commandDocumentSizeInBytes = writeCommand(bsonOutput);
if (sequences instanceof SplittablePayload) {
appendElementsToDocument(bsonOutput, commandStartPosition, extraElements);
SplittablePayload payload = (SplittablePayload) sequences;
Expand Down Expand Up @@ -288,7 +294,7 @@ private int writeOpQuery(final ByteBufferBsonOutput bsonOutput) {
elements = new ArrayList<>(3);
addServerApiElements(elements);
}
writeDocument(command, bsonOutput, commandFieldNameValidator);
writeCommand(bsonOutput);
appendElementsToDocument(bsonOutput, commandStartPosition, elements);
return commandStartPosition;
}
Expand Down Expand Up @@ -416,6 +422,13 @@ public String getDatabase() {
return database;
}

private int writeCommand(final BsonOutput bsonOutput) {
BsonBinaryWriter writer = createBsonBinaryWriter(bsonOutput, commandFieldNameValidator, getSettings());
int documentStart = bsonOutput.getPosition();
encodeUsingRegistry(writer, command);
return bsonOutput.getPosition() - documentStart;
}

@FunctionalInterface
private interface FinishOpMsgSectionWithPayloadType1 extends AutoCloseable {
void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class CompressedMessage extends RequestMessage {
}

@Override
protected EncodingMetadata encodeMessageBodyWithMetadata(final ByteBufferBsonOutput bsonOutput, final OperationContext operationContext) {
protected void encodeMessageBody(final ByteBufferBsonOutput bsonOutput, final OperationContext operationContext) {
bsonOutput.writeInt32(wrappedOpcode.getValue());
bsonOutput.writeInt32(getWrappedMessageSize(wrappedMessageBuffers) - MESSAGE_HEADER_LENGTH);
bsonOutput.writeByte(compressor.getId());
Expand All @@ -45,8 +45,6 @@ protected EncodingMetadata encodeMessageBodyWithMetadata(final ByteBufferBsonOut
.position(getFirstWrappedMessageBuffer(wrappedMessageBuffers).position() + MESSAGE_HEADER_LENGTH);

compressor.compress(wrappedMessageBuffers, bsonOutput);

return new EncodingMetadata(0);
}

private static int getWrappedMessageSize(final List<ByteBuf> wrappedMessageBuffers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,12 @@

package com.mongodb.internal.connection;

import org.bson.BsonBinaryWriter;
import org.bson.BsonDocument;
import org.bson.FieldNameValidator;
import org.bson.io.BsonOutput;

import java.util.concurrent.atomic.AtomicInteger;

import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.connection.BsonWriterHelper.backpatchLength;
import static com.mongodb.internal.connection.BsonWriterHelper.createBsonBinaryWriter;
import static com.mongodb.internal.connection.BsonWriterHelper.encodeUsingRegistry;

/**
* Abstract base class for all MongoDB Wire Protocol request messages.
Expand All @@ -40,19 +35,7 @@ abstract class RequestMessage {
private final MessageSettings settings;
private final int id;
private final OpCode opCode;
private EncodingMetadata encodingMetadata;

static class EncodingMetadata {
private final int firstDocumentPosition;

EncodingMetadata(final int firstDocumentPosition) {
this.firstDocumentPosition = firstDocumentPosition;
}

public int getFirstDocumentPosition() {
return firstDocumentPosition;
}
}
/**
* Gets the next available unique message identifier.
*
Expand Down Expand Up @@ -109,18 +92,8 @@ public void encode(final ByteBufferBsonOutput bsonOutput, final OperationContext
notNull("operationContext", operationContext);
int messageStartPosition = bsonOutput.getPosition();
writeMessagePrologue(bsonOutput);
EncodingMetadata encodingMetadata = encodeMessageBodyWithMetadata(bsonOutput, operationContext);
encodeMessageBody(bsonOutput, operationContext);
backpatchLength(messageStartPosition, bsonOutput);
this.encodingMetadata = encodingMetadata;
}

/**
* Gets the encoding metadata from the last attempt to encode this message.
*
* @return Get metadata from the last attempt to encode this message. Returns null if there has not yet been an attempt.
*/
public EncodingMetadata getEncodingMetadata() {
return encodingMetadata;
}

/**
Expand All @@ -138,16 +111,8 @@ protected void writeMessagePrologue(final BsonOutput bsonOutput) {
/**
* Encode the message body to the given output.
*
* @param bsonOutput the output
* @param bsonOutput the output
* @param operationContext the session context
* @return the encoding metadata
*/
protected abstract EncodingMetadata encodeMessageBodyWithMetadata(ByteBufferBsonOutput bsonOutput, OperationContext operationContext);

protected int writeDocument(final BsonDocument document, final BsonOutput bsonOutput, final FieldNameValidator validator) {
BsonBinaryWriter writer = createBsonBinaryWriter(bsonOutput, validator, getSettings());
int documentStart = bsonOutput.getPosition();
encodeUsingRegistry(writer, document);
return bsonOutput.getPosition() - documentStart;
}
protected abstract void encodeMessageBody(ByteBufferBsonOutput bsonOutput, OperationContext operationContext);
}