diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index e894ec915..3f93f1e74 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -302,6 +302,18 @@ public class Options { * {@link Builder#socketSoLinger(int) socketSoLinger}. */ public static final String PROP_SOCKET_SO_LINGER = PFX + "socket.so.linger"; + /** + * Property used to configure a builder from a Properties object. {@value}, see + * {@link Builder#receiveBufferSize(int) receiveBufferSize}. + * MAY OVERRIDE THE UNDERLYING JAVA SOCKET IMPLEMENTATION - USE AT YOUR OWN RISK + */ + public static final String PROP_SOCKET_RECEIVE_BUFFER_SIZE = PFX + "socket.receive.buffer.size"; + /** + * Property used to configure a builder from a Properties object. {@value}, see + * {@link Builder#sendBufferSize(int) sendBufferSize}. + * MAY OVERRIDE THE UNDERLYING JAVA SOCKET IMPLEMENTATION - USE AT YOUR OWN RISK + */ + public static final String PROP_SOCKET_SEND_BUFFER_SIZE = PFX + "socket.send.buffer.size"; /** * Property used to configure a builder from a Properties object. {@value}, see * {@link Builder#reconnectBufferSize(long) reconnectBufferSize}. @@ -654,6 +666,8 @@ public class Options { private final int socketReadTimeoutMillis; private final Duration socketWriteTimeout; private final int socketSoLinger; + private final int receiveBufferSize; + private final int sendBufferSize; private final Duration pingInterval; private final Duration requestCleanupInterval; private final int maxPingsOut; @@ -797,6 +811,8 @@ public static class Builder { private int socketReadTimeoutMillis = 0; private Duration socketWriteTimeout = DEFAULT_SOCKET_WRITE_TIMEOUT; private int socketSoLinger = -1; + private int receiveBufferSize = -1; + private int sendBufferSize = -1; private Duration pingInterval = DEFAULT_PING_INTERVAL; private Duration requestCleanupInterval = DEFAULT_REQUEST_CLEANUP_INTERVAL; private int maxPingsOut = DEFAULT_MAX_PINGS_OUT; @@ -942,6 +958,8 @@ public Builder properties(Properties props) { intProperty(props, PROP_SOCKET_READ_TIMEOUT_MS, -1, i -> this.socketReadTimeoutMillis = i); durationProperty(props, PROP_SOCKET_WRITE_TIMEOUT, DEFAULT_SOCKET_WRITE_TIMEOUT, d -> this.socketWriteTimeout = d); intProperty(props, PROP_SOCKET_SO_LINGER, -1, i -> socketSoLinger = i); + intProperty(props, PROP_SOCKET_RECEIVE_BUFFER_SIZE, -1, i -> this.receiveBufferSize = i); + intProperty(props, PROP_SOCKET_SEND_BUFFER_SIZE, -1, i -> this.sendBufferSize = i); intGtEqZeroProperty(props, PROP_MAX_CONTROL_LINE, DEFAULT_MAX_CONTROL_LINE, i -> this.maxControlLine = i); durationProperty(props, PROP_PING_INTERVAL, DEFAULT_PING_INTERVAL, d -> this.pingInterval = d); @@ -1424,6 +1442,30 @@ public Builder socketSoLinger(int socketSoLinger) { return this; } + /** + * Set the value of the socket SO_RCVBUF property in bytes + * The SO_RCVBUF option is used by the platform's networking code as a hint for the size to set the underlying network I/O buffers. + * MAY OVERRIDE THE UNDERLYING JAVA SOCKET IMPLEMENTATION - USE AT YOUR OWN RISK + * @param receiveBufferSize the size in bytes + * @return the Builder for chaining + */ + public Builder receiveBufferSize(int receiveBufferSize) { + this.receiveBufferSize = receiveBufferSize; + return this; + } + + /** + * Set the value of the socket SO_SNDBUF property in bytes + * The SO_SNDBUF option is used by the platform's networking code as a hint for the size to set the underlying network I/O buffers. + * MAY OVERRIDE THE UNDERLYING JAVA SOCKET IMPLEMENTATION - USE AT YOUR OWN RISK + * @param sendBufferSize the size in bytes + * @return the Builder for chaining + */ + public Builder sendBufferSize(int sendBufferSize) { + this.sendBufferSize = sendBufferSize; + return this; + } + /** * Set the interval between attempts to pings the server. These pings are automated, * and capped by {@link #maxPingsOut(int) maxPingsOut()}. As of 2.4.4 the library @@ -1964,10 +2006,18 @@ else if (useDefaultTls) { throw new IllegalArgumentException("Socket Write Timeout cannot be less than " + MINIMUM_SOCKET_WRITE_TIMEOUT_NANOS + " nanoseconds."); } - if (socketSoLinger < 0) { + if (socketSoLinger < 1) { socketSoLinger = -1; } + if (receiveBufferSize < 1) { + receiveBufferSize = -1; + } + + if (sendBufferSize < 1) { + sendBufferSize = -1; + } + if (errorListener == null) { errorListener = new ErrorListenerLoggerImpl(); } @@ -2016,6 +2066,8 @@ public Builder(Options o) { this.socketReadTimeoutMillis = o.socketReadTimeoutMillis; this.socketWriteTimeout = o.socketWriteTimeout; this.socketSoLinger = o.socketSoLinger; + this.receiveBufferSize = o.receiveBufferSize; + this.sendBufferSize = o.sendBufferSize; this.pingInterval = o.pingInterval; this.requestCleanupInterval = o.requestCleanupInterval; this.maxPingsOut = o.maxPingsOut; @@ -2086,6 +2138,8 @@ private Options(Builder b) { this.socketReadTimeoutMillis = b.socketReadTimeoutMillis; this.socketWriteTimeout = b.socketWriteTimeout; this.socketSoLinger = b.socketSoLinger; + this.receiveBufferSize = b.receiveBufferSize; + this.sendBufferSize = b.sendBufferSize; this.pingInterval = b.pingInterval; this.requestCleanupInterval = b.requestCleanupInterval; this.maxPingsOut = b.maxPingsOut; @@ -2484,6 +2538,20 @@ public int getSocketSoLinger() { return socketSoLinger; } + /** + * @return the number of bytes to set the for the SO_RCVBUF property on the socket + */ + public int getReceiveBufferSize() { + return receiveBufferSize; + } + + /** + * @return the number of bytes to set the for the SO_SNDBUF property on the socket + */ + public int getSendBufferSize() { + return sendBufferSize; + } + /** * @return the pingInterval, see {@link Builder#pingInterval(Duration) pingInterval()} in the builder doc */ diff --git a/src/main/java/io/nats/client/impl/SocketDataPort.java b/src/main/java/io/nats/client/impl/SocketDataPort.java index 2bc79fcc7..4b3499d60 100644 --- a/src/main/java/io/nats/client/impl/SocketDataPort.java +++ b/src/main/java/io/nats/client/impl/SocketDataPort.java @@ -48,6 +48,8 @@ public class SocketDataPort implements DataPort { protected Socket socket; protected boolean isSecure = false; protected int soLinger; + protected int receiveBufferSize; + protected int sendBufferSize; protected InputStream in; protected OutputStream out; @@ -55,6 +57,8 @@ public class SocketDataPort implements DataPort { @Override public void afterConstruct(Options options) { soLinger = options.getSocketSoLinger(); + receiveBufferSize = options.getReceiveBufferSize(); + sendBufferSize = options.getSendBufferSize(); } @Override @@ -82,12 +86,20 @@ public void connect(@NonNull NatsConnection conn, @NonNull NatsUri nuri, long ti socket = createSocket(options); socket.connect(new InetSocketAddress(host, port), (int) timeout); } + if (options.getSocketReadTimeoutMillis() > 0) { + socket.setSoTimeout(options.getSocketReadTimeoutMillis()); + } - if (soLinger > -1) { + if (soLinger > 0) { socket.setSoLinger(true, soLinger); } - if (options.getSocketReadTimeoutMillis() > 0) { - socket.setSoTimeout(options.getSocketReadTimeoutMillis()); + + if (receiveBufferSize > 0) { + socket.setReceiveBufferSize(receiveBufferSize); + } + + if (sendBufferSize > 0) { + socket.setSendBufferSize(sendBufferSize); } if (isWebsocketScheme(nuri.getScheme())) {