Skip to content

Commit c610d57

Browse files
authored
message size calculation improvements (#820)
1 parent 8e35e0f commit c610d57

21 files changed

+317
-284
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ repositories {
4040
dependencies {
4141
implementation 'net.i2p.crypto:eddsa:0.3.0'
4242
testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0'
43-
testImplementation 'io.nats:jnats-server-runner:1.0.9'
43+
testImplementation 'io.nats:jnats-server-runner:1.0.14'
4444
}
4545

4646
sourceSets {

dependencies.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ This file lists the dependencies used in this repository.
1212

1313
| Dependency | License |
1414
|-------------------------------------------------|-----------------------------------------|
15-
| io.nats:jnats-server-runner:1.0.9 | Apache 2.0 License |
15+
| io.nats:jnats-server-runner:1.0.14 | Apache 2.0 License |
1616
| org.apiguardian:apiguardian-api:1.1.0 | Apache 2.0 License |
1717
| org.junit.jupiter:junit-jupiter:5.9.0 | Eclipse Public License v2.0 |
1818
| org.junit:junit-bom:5.9.0 | Eclipse Public License v2.0 |
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright 2015-2022 The NATS Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at:
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package io.nats.client.impl;
15+
16+
public class IncomingMessage extends NatsMessage {
17+
IncomingMessage() {}
18+
19+
IncomingMessage(byte[] data) {
20+
super(data);
21+
}
22+
23+
@Override
24+
byte[] getProtocolBytes() {
25+
throw new IllegalStateException("getProtocolBytes not supported for this type of message.");
26+
}
27+
28+
@Override
29+
int getControlLineLength() {
30+
throw new IllegalStateException("getControlLineLength not supported for this type of message.");
31+
}
32+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright 2015-2022 The NATS Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at:
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package io.nats.client.impl;
15+
16+
import io.nats.client.support.IncomingHeadersProcessor;
17+
import io.nats.client.support.Status;
18+
19+
import static io.nats.client.support.NatsJetStreamConstants.JS_ACK_SUBJECT_PREFIX;
20+
21+
// ----------------------------------------------------------------------------------------------------
22+
// Incoming Message Factory - internal use only
23+
// ----------------------------------------------------------------------------------------------------
24+
class IncomingMessageFactory {
25+
private final String sid;
26+
private final String subject;
27+
private final String replyTo;
28+
private final int protocolLineLength;
29+
private final boolean utf8mode;
30+
31+
private byte[] data;
32+
private Headers headers;
33+
private Status status;
34+
private int headerLen;
35+
36+
// Create an incoming message for a subscriber
37+
// Doesn't check control line size, since the server sent us the message
38+
IncomingMessageFactory(String sid, String subject, String replyTo, int protocolLength, boolean utf8mode) {
39+
this.sid = sid;
40+
this.subject = subject;
41+
this.replyTo = replyTo;
42+
this.protocolLineLength = protocolLength;
43+
this.utf8mode = utf8mode;
44+
}
45+
46+
void setHeaders(IncomingHeadersProcessor ihp) {
47+
headers = ihp.getHeaders();
48+
status = ihp.getStatus();
49+
headerLen = ihp.getSerializedLength();
50+
}
51+
52+
void setData(byte[] data) {
53+
this.data = data;
54+
}
55+
56+
NatsMessage getMessage() {
57+
NatsMessage message;
58+
if (status != null) {
59+
message = new StatusMessage(status);
60+
}
61+
else if (replyTo != null && replyTo.startsWith(JS_ACK_SUBJECT_PREFIX)) {
62+
message = new NatsJetStreamMessage(data);
63+
}
64+
else {
65+
message = new IncomingMessage(data);
66+
}
67+
message.sid = sid;
68+
message.subject = subject;
69+
message.replyTo = replyTo;
70+
message.headers = headers;
71+
message.headerLen = headerLen;
72+
message.utf8mode = utf8mode;
73+
message.sizeInBytes = protocolLineLength + headerLen + message.dataLen + 4; // Two CRLFs
74+
return message;
75+
}
76+
}

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import io.nats.client.*;
1717
import io.nats.client.ConnectionListener.Events;
1818
import io.nats.client.api.ServerInfo;
19-
import io.nats.client.impl.NatsMessage.ProtocolMessage;
2019
import io.nats.client.support.ByteArrayBuilder;
2120
import io.nats.client.support.NatsRequestCompletableFuture;
2221
import io.nats.client.support.Validator;
@@ -1286,18 +1285,27 @@ CompletableFuture<Boolean> sendPing(boolean treatAsInternal) {
12861285
pongQueue.add(pongFuture);
12871286

12881287
if (treatAsInternal) {
1289-
queueInternalOutgoing(new ProtocolMessage(OP_PING_BYTES));
1288+
queueInternalOutgoing(new ProtocolMessage(PING_PROTO));
12901289
} else {
1291-
queueOutgoing(new ProtocolMessage(OP_PING_BYTES));
1290+
queueOutgoing(new ProtocolMessage(PING_PROTO));
12921291
}
12931292

12941293
this.needPing.set(true);
12951294
this.statistics.incrementPingCount();
12961295
return pongFuture;
12971296
}
12981297

1298+
// This is a minor speed / memory enhancement.
1299+
// We can't reuse the same instance of any NatsMessage b/c of the "NatsMessage next" state
1300+
// But it is safe to share the data bytes and the size since those fields are just being read
1301+
// This constructor "ProtocolMessage(ProtocolMessage pm)" shares the data and size
1302+
// reducing allocation of data for something that is often created and used
1303+
// These static instances are the once that are used for copying, sendPing and sendPong
1304+
private static final ProtocolMessage PING_PROTO = new ProtocolMessage(OP_PING_BYTES);
1305+
private static final ProtocolMessage PONG_PROTO = new ProtocolMessage(OP_PONG_BYTES);
1306+
12991307
void sendPong() {
1300-
queueInternalOutgoing(new ProtocolMessage(OP_PONG_BYTES));
1308+
queueInternalOutgoing(new ProtocolMessage(PONG_PROTO));
13011309
}
13021310

13031311
// Called by the reader

src/main/java/io/nats/client/impl/NatsConnectionReader.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
package io.nats.client.impl;
1515

16-
import io.nats.client.impl.NatsMessage.InternalMessageFactory;
1716
import io.nats.client.support.IncomingHeadersProcessor;
1817

1918
import java.io.IOException;
@@ -46,21 +45,21 @@ enum Mode {
4645
private boolean gotCR;
4746

4847
private String op;
49-
private char[] opArray;
48+
private final char[] opArray;
5049
private int opPos;
5150

52-
private char[] msgLineChars;
51+
private final char[] msgLineChars;
5352
private int msgLinePosition;
5453

5554
private Mode mode;
5655

57-
private InternalMessageFactory incoming;
56+
private IncomingMessageFactory incoming;
5857
private byte[] msgHeaders;
5958
private byte[] msgData;
6059
private int msgHeadersPosition;
6160
private int msgDataPosition;
6261

63-
private byte[] buffer;
62+
private final byte[] buffer;
6463
private int bufferPosition;
6564

6665
private Future<Boolean> stopped;
@@ -418,7 +417,7 @@ static String opFor(char[] chars, int length) {
418417
}
419418
}
420419

421-
private static int[] TENS = new int[] { 1, 10, 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000};
420+
private static final int[] TENS = new int[] { 1, 10, 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000};
422421

423422
public static int parseLength(String s) throws NumberFormatException {
424423
int length = s.length();
@@ -433,7 +432,7 @@ public static int parseLength(String s) throws NumberFormatException {
433432
int d = (c - '0');
434433

435434
if (d>9) {
436-
throw new NumberFormatException("Invalid char in message length \'" + c + "\'");
435+
throw new NumberFormatException("Invalid char in message length '" + c + "'");
437436
}
438437

439438
retVal += d * TENS[length - i - 1];
@@ -476,7 +475,7 @@ void parseProtocolMessage() throws IOException {
476475

477476
int incomingLength = parseLength(lengthChars);
478477

479-
this.incoming = new InternalMessageFactory(sid, subject, replyTo, protocolLineLength, utf8Mode);
478+
this.incoming = new IncomingMessageFactory(sid, subject, replyTo, protocolLineLength, utf8Mode);
480479
this.mode = Mode.GATHER_DATA;
481480
this.msgData = new byte[incomingLength];
482481
this.msgDataPosition = 0;
@@ -518,7 +517,7 @@ void parseProtocolMessage() throws IOException {
518517
throw new IllegalStateException("Bad HMSG control line, missing required fields");
519518
}
520519

521-
this.incoming = new InternalMessageFactory(hSid, hSubject, hReplyTo, hProtocolLineLength, utf8Mode);
520+
this.incoming = new IncomingMessageFactory(hSid, hSubject, hReplyTo, hProtocolLineLength, utf8Mode);
522521
this.msgHeaders = new byte[hdrLen];
523522
this.msgData = new byte[totLen - hdrLen];
524523
this.mode = Mode.GATHER_HEADERS;
@@ -532,7 +531,7 @@ void parseProtocolMessage() throws IOException {
532531
this.mode = Mode.GATHER_OP;
533532
break;
534533
case OP_ERR:
535-
String errorText = StandardCharsets.UTF_8.decode(protocolBuffer).toString().replace("\'", "");
534+
String errorText = StandardCharsets.UTF_8.decode(protocolBuffer).toString().replace("'", "");
536535
this.connection.processError(errorText);
537536
this.op = UNKNOWN_OP;
538537
this.mode = Mode.GATHER_OP;

src/main/java/io/nats/client/impl/NatsJetStreamMessage.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
package io.nats.client.impl;
1515

1616
import io.nats.client.Connection;
17-
import io.nats.client.impl.NatsMessage.InternalMessage;
1817

1918
import java.time.Duration;
2019
import java.util.concurrent.TimeoutException;
@@ -23,11 +22,13 @@
2322
import static io.nats.client.support.NatsConstants.NANOS_PER_MILLI;
2423
import static io.nats.client.support.Validator.validateDurationRequired;
2524

26-
class NatsJetStreamMessage extends InternalMessage {
25+
class NatsJetStreamMessage extends IncomingMessage {
2726

2827
private NatsJetStreamMetaData jsMetaData = null;
2928

30-
NatsJetStreamMessage() {}
29+
NatsJetStreamMessage(byte[] data) {
30+
super(data);
31+
}
3132

3233
/**
3334
* {@inheritDoc}

0 commit comments

Comments
 (0)