Skip to content

Commit 04541cc

Browse files
authored
Merge pull request #1432 from nats-io/socket-danger
Options to set underlying socket configuration of SO_SNDBUF and SO_RCVBUF
2 parents 56735d6 + b846d5c commit 04541cc

2 files changed

Lines changed: 84 additions & 4 deletions

File tree

src/main/java/io/nats/client/Options.java

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,18 @@ public class Options {
302302
* {@link Builder#socketSoLinger(int) socketSoLinger}.
303303
*/
304304
public static final String PROP_SOCKET_SO_LINGER = PFX + "socket.so.linger";
305+
/**
306+
* Property used to configure a builder from a Properties object. {@value}, see
307+
* {@link Builder#receiveBufferSize(int) receiveBufferSize}.
308+
* MAY OVERRIDE THE UNDERLYING JAVA SOCKET IMPLEMENTATION - USE AT YOUR OWN RISK
309+
*/
310+
public static final String PROP_SOCKET_RECEIVE_BUFFER_SIZE = PFX + "socket.receive.buffer.size";
311+
/**
312+
* Property used to configure a builder from a Properties object. {@value}, see
313+
* {@link Builder#sendBufferSize(int) sendBufferSize}.
314+
* MAY OVERRIDE THE UNDERLYING JAVA SOCKET IMPLEMENTATION - USE AT YOUR OWN RISK
315+
*/
316+
public static final String PROP_SOCKET_SEND_BUFFER_SIZE = PFX + "socket.send.buffer.size";
305317
/**
306318
* Property used to configure a builder from a Properties object. {@value}, see
307319
* {@link Builder#reconnectBufferSize(long) reconnectBufferSize}.
@@ -654,6 +666,8 @@ public class Options {
654666
private final int socketReadTimeoutMillis;
655667
private final Duration socketWriteTimeout;
656668
private final int socketSoLinger;
669+
private final int receiveBufferSize;
670+
private final int sendBufferSize;
657671
private final Duration pingInterval;
658672
private final Duration requestCleanupInterval;
659673
private final int maxPingsOut;
@@ -797,6 +811,8 @@ public static class Builder {
797811
private int socketReadTimeoutMillis = 0;
798812
private Duration socketWriteTimeout = DEFAULT_SOCKET_WRITE_TIMEOUT;
799813
private int socketSoLinger = -1;
814+
private int receiveBufferSize = -1;
815+
private int sendBufferSize = -1;
800816
private Duration pingInterval = DEFAULT_PING_INTERVAL;
801817
private Duration requestCleanupInterval = DEFAULT_REQUEST_CLEANUP_INTERVAL;
802818
private int maxPingsOut = DEFAULT_MAX_PINGS_OUT;
@@ -942,6 +958,8 @@ public Builder properties(Properties props) {
942958
intProperty(props, PROP_SOCKET_READ_TIMEOUT_MS, -1, i -> this.socketReadTimeoutMillis = i);
943959
durationProperty(props, PROP_SOCKET_WRITE_TIMEOUT, DEFAULT_SOCKET_WRITE_TIMEOUT, d -> this.socketWriteTimeout = d);
944960
intProperty(props, PROP_SOCKET_SO_LINGER, -1, i -> socketSoLinger = i);
961+
intProperty(props, PROP_SOCKET_RECEIVE_BUFFER_SIZE, -1, i -> this.receiveBufferSize = i);
962+
intProperty(props, PROP_SOCKET_SEND_BUFFER_SIZE, -1, i -> this.sendBufferSize = i);
945963

946964
intGtEqZeroProperty(props, PROP_MAX_CONTROL_LINE, DEFAULT_MAX_CONTROL_LINE, i -> this.maxControlLine = i);
947965
durationProperty(props, PROP_PING_INTERVAL, DEFAULT_PING_INTERVAL, d -> this.pingInterval = d);
@@ -1424,6 +1442,30 @@ public Builder socketSoLinger(int socketSoLinger) {
14241442
return this;
14251443
}
14261444

1445+
/**
1446+
* Set the value of the socket SO_RCVBUF property in bytes
1447+
* The SO_RCVBUF option is used by the platform's networking code as a hint for the size to set the underlying network I/O buffers.
1448+
* MAY OVERRIDE THE UNDERLYING JAVA SOCKET IMPLEMENTATION - USE AT YOUR OWN RISK
1449+
* @param receiveBufferSize the size in bytes
1450+
* @return the Builder for chaining
1451+
*/
1452+
public Builder receiveBufferSize(int receiveBufferSize) {
1453+
this.receiveBufferSize = receiveBufferSize;
1454+
return this;
1455+
}
1456+
1457+
/**
1458+
* Set the value of the socket SO_SNDBUF property in bytes
1459+
* The SO_SNDBUF option is used by the platform's networking code as a hint for the size to set the underlying network I/O buffers.
1460+
* MAY OVERRIDE THE UNDERLYING JAVA SOCKET IMPLEMENTATION - USE AT YOUR OWN RISK
1461+
* @param sendBufferSize the size in bytes
1462+
* @return the Builder for chaining
1463+
*/
1464+
public Builder sendBufferSize(int sendBufferSize) {
1465+
this.sendBufferSize = sendBufferSize;
1466+
return this;
1467+
}
1468+
14271469
/**
14281470
* Set the interval between attempts to pings the server. These pings are automated,
14291471
* and capped by {@link #maxPingsOut(int) maxPingsOut()}. As of 2.4.4 the library
@@ -1964,10 +2006,18 @@ else if (useDefaultTls) {
19642006
throw new IllegalArgumentException("Socket Write Timeout cannot be less than " + MINIMUM_SOCKET_WRITE_TIMEOUT_NANOS + " nanoseconds.");
19652007
}
19662008

1967-
if (socketSoLinger < 0) {
2009+
if (socketSoLinger < 1) {
19682010
socketSoLinger = -1;
19692011
}
19702012

2013+
if (receiveBufferSize < 1) {
2014+
receiveBufferSize = -1;
2015+
}
2016+
2017+
if (sendBufferSize < 1) {
2018+
sendBufferSize = -1;
2019+
}
2020+
19712021
if (errorListener == null) {
19722022
errorListener = new ErrorListenerLoggerImpl();
19732023
}
@@ -2016,6 +2066,8 @@ public Builder(Options o) {
20162066
this.socketReadTimeoutMillis = o.socketReadTimeoutMillis;
20172067
this.socketWriteTimeout = o.socketWriteTimeout;
20182068
this.socketSoLinger = o.socketSoLinger;
2069+
this.receiveBufferSize = o.receiveBufferSize;
2070+
this.sendBufferSize = o.sendBufferSize;
20192071
this.pingInterval = o.pingInterval;
20202072
this.requestCleanupInterval = o.requestCleanupInterval;
20212073
this.maxPingsOut = o.maxPingsOut;
@@ -2086,6 +2138,8 @@ private Options(Builder b) {
20862138
this.socketReadTimeoutMillis = b.socketReadTimeoutMillis;
20872139
this.socketWriteTimeout = b.socketWriteTimeout;
20882140
this.socketSoLinger = b.socketSoLinger;
2141+
this.receiveBufferSize = b.receiveBufferSize;
2142+
this.sendBufferSize = b.sendBufferSize;
20892143
this.pingInterval = b.pingInterval;
20902144
this.requestCleanupInterval = b.requestCleanupInterval;
20912145
this.maxPingsOut = b.maxPingsOut;
@@ -2484,6 +2538,20 @@ public int getSocketSoLinger() {
24842538
return socketSoLinger;
24852539
}
24862540

2541+
/**
2542+
* @return the number of bytes to set the for the SO_RCVBUF property on the socket
2543+
*/
2544+
public int getReceiveBufferSize() {
2545+
return receiveBufferSize;
2546+
}
2547+
2548+
/**
2549+
* @return the number of bytes to set the for the SO_SNDBUF property on the socket
2550+
*/
2551+
public int getSendBufferSize() {
2552+
return sendBufferSize;
2553+
}
2554+
24872555
/**
24882556
* @return the pingInterval, see {@link Builder#pingInterval(Duration) pingInterval()} in the builder doc
24892557
*/

src/main/java/io/nats/client/impl/SocketDataPort.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,17 @@ public class SocketDataPort implements DataPort {
4848
protected Socket socket;
4949
protected boolean isSecure = false;
5050
protected int soLinger;
51+
protected int receiveBufferSize;
52+
protected int sendBufferSize;
5153

5254
protected InputStream in;
5355
protected OutputStream out;
5456

5557
@Override
5658
public void afterConstruct(Options options) {
5759
soLinger = options.getSocketSoLinger();
60+
receiveBufferSize = options.getReceiveBufferSize();
61+
sendBufferSize = options.getSendBufferSize();
5862
}
5963

6064
@Override
@@ -82,12 +86,20 @@ public void connect(@NonNull NatsConnection conn, @NonNull NatsUri nuri, long ti
8286
socket = createSocket(options);
8387
socket.connect(new InetSocketAddress(host, port), (int) timeout);
8488
}
89+
if (options.getSocketReadTimeoutMillis() > 0) {
90+
socket.setSoTimeout(options.getSocketReadTimeoutMillis());
91+
}
8592

86-
if (soLinger > -1) {
93+
if (soLinger > 0) {
8794
socket.setSoLinger(true, soLinger);
8895
}
89-
if (options.getSocketReadTimeoutMillis() > 0) {
90-
socket.setSoTimeout(options.getSocketReadTimeoutMillis());
96+
97+
if (receiveBufferSize > 0) {
98+
socket.setReceiveBufferSize(receiveBufferSize);
99+
}
100+
101+
if (sendBufferSize > 0) {
102+
socket.setSendBufferSize(sendBufferSize);
91103
}
92104

93105
if (isWebsocketScheme(nuri.getScheme())) {

0 commit comments

Comments
 (0)