Skip to content

Commit 9484d00

Browse files
authored
chore(core): move context initialization out of connection-accepting thread (#5934)
1 parent c06f408 commit 9484d00

33 files changed

Lines changed: 466 additions & 283 deletions

core/src/main/java/io/questdb/PropServerConfiguration.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.questdb.cairo.SqlJitMode;
3535
import io.questdb.cairo.TableUtils;
3636
import io.questdb.cairo.sql.SqlExecutionCircuitBreakerConfiguration;
37+
import io.questdb.cutlass.auth.AuthUtils;
3738
import io.questdb.cutlass.http.HttpContextConfiguration;
3839
import io.questdb.cutlass.http.HttpFullFatServerConfiguration;
3940
import io.questdb.cutlass.http.HttpServerConfiguration;
@@ -139,6 +140,7 @@ public class PropServerConfiguration implements ServerConfiguration {
139140
public static final long COMMIT_INTERVAL_DEFAULT = 2000;
140141
public static final String CONFIG_DIRECTORY = "conf";
141142
public static final String DB_DIRECTORY = "db";
143+
public static final int MIN_TCP_ILP_BUF_SIZE = AuthUtils.CHALLENGE_LEN + 1;
142144
public static final String TMP_DIRECTORY = "tmp";
143145
private static final String ILP_PROTO_SUPPORT_VERSIONS = "[1,2]";
144146
private static final String ILP_PROTO_SUPPORT_VERSIONS_NAME = "line.proto.support.versions";
@@ -223,6 +225,7 @@ public class PropServerConfiguration implements ServerConfiguration {
223225
private final PropHttpConcurrentCacheConfiguration httpMinConcurrentCacheConfiguration = new PropHttpConcurrentCacheConfiguration();
224226
private final PropHttpContextConfiguration httpMinContextConfiguration;
225227
private final boolean httpMinServerEnabled;
228+
private final long httpNetAcceptLoopTimeout;
226229
private final boolean httpNetConnectionHint;
227230
private final String httpPassword;
228231
private final boolean httpPessimisticHealthCheckEnabled;
@@ -535,6 +538,7 @@ public class PropServerConfiguration implements ServerConfiguration {
535538
private short floatDefaultColumnType;
536539
private int httpMinBindIPv4Address;
537540
private int httpMinBindPort;
541+
private long httpMinNetAcceptLoopTimeout;
538542
private boolean httpMinNetConnectionHint;
539543
private int httpMinNetConnectionLimit;
540544
private long httpMinNetConnectionQueueTimeout;
@@ -576,6 +580,7 @@ public class PropServerConfiguration implements ServerConfiguration {
576580
private long lineTcpMaintenanceInterval;
577581
private int lineTcpMaxMeasurementSize;
578582
private long lineTcpMaxRecvBufferSize;
583+
private long lineTcpNetAcceptLoopTimeout;
579584
private int lineTcpNetBindIPv4Address;
580585
private int lineTcpNetBindPort;
581586
private long lineTcpNetConnectionHeartbeatInterval;
@@ -616,6 +621,7 @@ public class PropServerConfiguration implements ServerConfiguration {
616621
private int pgNamedStatementCacheCapacity;
617622
private int pgNamedStatementLimit;
618623
private int pgNamesStatementPoolCapacity;
624+
private long pgNetAcceptLoopTimeout;
619625
private int pgNetBindIPv4Address;
620626
private int pgNetBindPort;
621627
private boolean pgNetConnectionHint;
@@ -942,6 +948,7 @@ public PropServerConfiguration(
942948
httpMinBindPort = p;
943949
});
944950

951+
this.httpMinNetAcceptLoopTimeout = getMillis(properties, env, PropertyKey.HTTP_MIN_NET_ACCEPT_LOOP_TIMEOUT, 500);
945952
this.httpMinNetConnectionLimit = getInt(properties, env, PropertyKey.HTTP_MIN_NET_CONNECTION_LIMIT, 64);
946953

947954
// deprecated
@@ -1062,6 +1069,8 @@ public PropServerConfiguration(
10621069
boolean httpServerCookiesEnabled = getBoolean(properties, env, PropertyKey.HTTP_SERVER_KEEP_ALIVE, true);
10631070
boolean httpReadOnlySecurityContext = getBoolean(properties, env, PropertyKey.HTTP_SECURITY_READONLY, false);
10641071

1072+
this.httpNetAcceptLoopTimeout = getMillis(properties, env, PropertyKey.HTTP_NET_ACCEPT_LOOP_TIMEOUT, 500);
1073+
10651074
// maintain deprecated property name for the time being
10661075
this.httpNetConnectionLimit = getInt(properties, env, PropertyKey.HTTP_NET_ACTIVE_CONNECTION_LIMIT, 256);
10671076
this.httpNetConnectionLimit = getInt(properties, env, PropertyKey.HTTP_NET_CONNECTION_LIMIT, httpNetConnectionLimit);
@@ -1227,6 +1236,8 @@ public PropServerConfiguration(
12271236
pgNetBindPort = p;
12281237
});
12291238

1239+
this.pgNetAcceptLoopTimeout = getMillis(properties, env, PropertyKey.PG_NET_ACCEPT_LOOP_TIMEOUT, 500);
1240+
12301241
// deprecated
12311242
this.pgNetIdleConnectionTimeout = getMillis(properties, env, PropertyKey.PG_NET_IDLE_TIMEOUT, 300_000);
12321243
this.pgNetIdleConnectionTimeout = getMillis(properties, env, PropertyKey.PG_NET_CONNECTION_TIMEOUT, this.pgNetIdleConnectionTimeout);
@@ -1502,7 +1513,7 @@ public PropServerConfiguration(
15021513
this.telemetryDisableCompletely = getBoolean(properties, env, PropertyKey.TELEMETRY_DISABLE_COMPLETELY, false);
15031514
this.telemetryQueueCapacity = Numbers.ceilPow2(getInt(properties, env, PropertyKey.TELEMETRY_QUEUE_CAPACITY, 512));
15041515
this.telemetryHideTables = getBoolean(properties, env, PropertyKey.TELEMETRY_HIDE_TABLES, true);
1505-
this.telemetryDbSizeEstimateTimeout = getLong(properties, env, PropertyKey.TELEMETRY_DB_SIZE_ESTIMATE_TIMEOUT, Timestamps.SECOND_MILLIS);
1516+
this.telemetryDbSizeEstimateTimeout = getMillis(properties, env, PropertyKey.TELEMETRY_DB_SIZE_ESTIMATE_TIMEOUT, Timestamps.SECOND_MILLIS);
15061517
this.o3PartitionPurgeListCapacity = getInt(properties, env, PropertyKey.CAIRO_O3_PARTITION_PURGE_LIST_INITIAL_CAPACITY, 1);
15071518
this.ioURingEnabled = getBoolean(properties, env, PropertyKey.CAIRO_IO_URING_ENABLED, true);
15081519
this.cairoMaxCrashFiles = getInt(properties, env, PropertyKey.CAIRO_MAX_CRASH_FILES, 100);
@@ -1546,6 +1557,8 @@ public PropServerConfiguration(
15461557
lineTcpNetBindPort = p;
15471558
});
15481559

1560+
this.lineTcpNetAcceptLoopTimeout = getMillis(properties, env, PropertyKey.LINE_TCP_NET_ACCEPT_LOOP_TIMEOUT, 500);
1561+
15491562
// deprecated
15501563
this.lineTcpNetConnectionTimeout = getMillis(properties, env, PropertyKey.LINE_TCP_NET_IDLE_TIMEOUT, 0);
15511564
this.lineTcpNetConnectionTimeout = getMillis(properties, env, PropertyKey.LINE_TCP_NET_CONNECTION_TIMEOUT, this.lineTcpNetConnectionTimeout);
@@ -1570,6 +1583,11 @@ public PropServerConfiguration(
15701583
if (lineTcpRecvBufferSize > lineTcpMaxRecvBufferSize) {
15711584
lineTcpMaxRecvBufferSize = lineTcpRecvBufferSize;
15721585
}
1586+
if (lineTcpRecvBufferSize < MIN_TCP_ILP_BUF_SIZE) {
1587+
throw new ServerConfigurationException(
1588+
"TCP ILP buffer size is too small, should be at least " + MIN_TCP_ILP_BUF_SIZE + ", ["
1589+
+ PropertyKey.LINE_TCP_RECV_BUFFER_SIZE.getPropertyPath() + "=" + lineTcpRecvBufferSize + ']');
1590+
}
15731591

15741592
this.lineTcpWriterQueueCapacity = getQueueCapacity(properties, env, PropertyKey.LINE_TCP_WRITER_QUEUE_CAPACITY, 128);
15751593
this.lineTcpWriterWorkerCount = getInt(properties, env, PropertyKey.LINE_TCP_WRITER_WORKER_COUNT, 0);
@@ -3965,6 +3983,12 @@ public int getRows() {
39653983
}
39663984

39673985
public class PropHttpMinServerConfiguration implements HttpServerConfiguration {
3986+
3987+
@Override
3988+
public long getAcceptLoopTimeout() {
3989+
return httpMinNetAcceptLoopTimeout;
3990+
}
3991+
39683992
@Override
39693993
public int getBindIPv4Address() {
39703994
return httpMinBindIPv4Address;
@@ -4153,6 +4177,11 @@ public int workerPoolPriority() {
41534177

41544178
public class PropHttpServerConfiguration implements HttpFullFatServerConfiguration {
41554179

4180+
@Override
4181+
public long getAcceptLoopTimeout() {
4182+
return httpNetAcceptLoopTimeout;
4183+
}
4184+
41564185
@Override
41574186
public int getBindIPv4Address() {
41584187
return httpNetBindIPv4Address;
@@ -4579,6 +4608,12 @@ public boolean haltOnError() {
45794608
}
45804609

45814610
private class PropLineTcpReceiverConfiguration implements LineTcpReceiverConfiguration {
4611+
4612+
@Override
4613+
public long getAcceptLoopTimeout() {
4614+
return lineTcpNetAcceptLoopTimeout;
4615+
}
4616+
45824617
@Override
45834618
public String getAuthDB() {
45844619
return lineTcpAuthDB;
@@ -5077,6 +5112,11 @@ public int getRows() {
50775112

50785113
private class PropPGWireConfiguration implements PGWireConfiguration {
50795114

5115+
@Override
5116+
public long getAcceptLoopTimeout() {
5117+
return pgNetAcceptLoopTimeout;
5118+
}
5119+
50805120
@Override
50815121
public int getBinParamCountCapacity() {
50825122
return pgBinaryParamsCapacity;

core/src/main/java/io/questdb/PropertyKey.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,12 +301,14 @@ public enum PropertyKey implements ConfigPropertyKey {
301301
HTTP_MIN_WORKER_COUNT("http.min.worker.count"),
302302
HTTP_MIN_WORKER_POOL_PRIORITY("http.min.worker.priority"),
303303
HTTP_MIN_NET_CONNECTION_LIMIT("http.min.net.connection.limit"),
304+
HTTP_MIN_NET_ACCEPT_LOOP_TIMEOUT("http.min.net.accept.loop.timeout"),
304305
HTTP_NET_BIND_TO("http.net.bind.to"),
305306
HTTP_CONNECTION_POOL_INITIAL_CAPACITY("http.connection.pool.initial.capacity"),
306307
HTTP_CONNECTION_STRING_POOL_CAPACITY("http.connection.string.pool.capacity"),
307308
HTTP_WORKER_COUNT("http.worker.count"),
308309
HTTP_KEEP_ALIVE_TIMEOUT("http.keep-alive.timeout"),
309310
HTTP_KEEP_ALIVE_MAX("http.keep-alive.max"),
311+
HTTP_NET_ACCEPT_LOOP_TIMEOUT("http.net.accept.loop.timeout"),
310312
HTTP_NET_ACTIVE_CONNECTION_LIMIT("http.net.active.connection.limit"),
311313
HTTP_NET_CONNECTION_LIMIT("http.net.connection.limit"),
312314
HTTP_TEXT_DATE_ADAPTER_POOL_CAPACITY("http.text.date.adapter.pool.capacity"),
@@ -344,6 +346,7 @@ public enum PropertyKey implements ConfigPropertyKey {
344346
LINE_TCP_NET_CONNECTION_LIMIT("line.tcp.net.connection.limit"),
345347
LINE_TCP_NET_CONNECTION_HINT("line.tcp.net.connection.hint"),
346348
LINE_TCP_NET_BIND_TO("line.tcp.net.bind.to"),
349+
LINE_TCP_NET_ACCEPT_LOOP_TIMEOUT("line.tcp.net.accept.loop.timeout"),
347350
LINE_TCP_NET_IDLE_TIMEOUT("line.tcp.net.idle.timeout"),
348351
LINE_TCP_NET_CONNECTION_TIMEOUT("line.tcp.net.connection.timeout"),
349352
LINE_TCP_NET_CONNECTION_HEARTBEAT_INTERVAL("line.tcp.net.connection.heartbeat.interval"),
@@ -393,6 +396,7 @@ public enum PropertyKey implements ConfigPropertyKey {
393396
PG_ENABLED("pg.enabled"),
394397
PG_NET_CONNECTION_HINT("pg.net.connection.hint"),
395398
PG_NET_BIND_TO("pg.net.bind.to"),
399+
PG_NET_ACCEPT_LOOP_TIMEOUT("pg.net.accept.loop.timeout"),
396400
PG_NET_IDLE_TIMEOUT("pg.net.idle.timeout"),
397401
PG_NET_CONNECTION_TIMEOUT("pg.net.connection.timeout"),
398402
PG_NET_CONNECTION_QUEUE_TIMEOUT("pg.net.connection.queue.timeout"),

core/src/main/java/io/questdb/client/Sender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ final class LineSenderBuilder {
433433
private static final long DEFAULT_MAX_RETRY_NANOS = TimeUnit.SECONDS.toNanos(10); // keep sync with the contract of the configuration method
434434
private static final long DEFAULT_MIN_REQUEST_THROUGHPUT = 100 * 1024; // 100KB/s, keep in sync with the contract of the configuration method
435435
private static final int DEFAULT_TCP_PORT = 9009;
436-
private static final int MIN_BUFFER_SIZE = 512 + 1; // challenge size + 1;
436+
private static final int MIN_BUFFER_SIZE = AuthUtils.CHALLENGE_LEN + 1; // challenge size + 1;
437437
// The PARAMETER_NOT_SET_EXPLICITLY constant is used to detect if a parameter was set explicitly in configuration parameters
438438
// where it matters. This is needed to detect invalid combinations of parameters. Why?
439439
// We want to fail-fast even when an explicitly configured options happens to be same value as the default value,

core/src/main/java/io/questdb/cutlass/http/HttpConnectionContext.java

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import io.questdb.metrics.AtomicLongGauge;
4242
import io.questdb.network.HeartBeatException;
4343
import io.questdb.network.IOContext;
44-
import io.questdb.network.IODispatcher;
4544
import io.questdb.network.IOOperation;
4645
import io.questdb.network.Net;
4746
import io.questdb.network.NetworkFacade;
@@ -53,6 +52,7 @@
5352
import io.questdb.network.Socket;
5453
import io.questdb.network.SocketFactory;
5554
import io.questdb.network.SuspendEvent;
55+
import io.questdb.network.TlsSessionInitFailedException;
5656
import io.questdb.std.AssociativeCache;
5757
import io.questdb.std.MemoryTag;
5858
import io.questdb.std.Misc;
@@ -64,7 +64,6 @@
6464
import io.questdb.std.str.DirectUtf8String;
6565
import io.questdb.std.str.StdoutSink;
6666
import io.questdb.std.str.Utf8s;
67-
import org.jetbrains.annotations.NotNull;
6867
import org.jetbrains.annotations.TestOnly;
6968

7069
import static io.questdb.cutlass.http.HttpConstants.HEADER_CONTENT_ACCEPT_ENCODING;
@@ -330,38 +329,11 @@ public boolean handleClientOperation(int operation, HttpRequestProcessorSelector
330329
return useful;
331330
}
332331

333-
@Override
334-
public void init() {
335-
if (socket.supportsTls()) {
336-
if (socket.startTlsSession(null) != 0) {
337-
throw CairoException.nonCritical().put("failed to start TLS session");
338-
}
339-
}
340-
connectionCounted = false;
341-
}
342-
343332
@Override
344333
public boolean invalid() {
345334
return pendingRetry || receivedBytes > 0 || this.socket == null;
346335
}
347336

348-
@Override
349-
public HttpConnectionContext of(long fd, @NotNull IODispatcher<HttpConnectionContext> dispatcher) {
350-
super.of(fd, dispatcher);
351-
// The context is obtained from the pool, so we should initialize the memory.
352-
if (recvBuffer == 0) {
353-
// re-read recv buffer size in case the config was reloaded
354-
recvBufferSize = configuration.getRecvBufferSize();
355-
recvBufferReadSize = Math.min(forceFragmentationReceiveChunkSize, recvBufferSize);
356-
recvBuffer = Unsafe.malloc(recvBufferSize, MemoryTag.NATIVE_HTTP_CONN);
357-
}
358-
// re-read buffer sizes in case the config was reloaded
359-
responseSink.of(socket, configuration.getSendBufferSize());
360-
headerParser.reopen(configuration.getHttpContextConfiguration().getRequestHeaderBufferSize());
361-
multipartContentHeaderParser.reopen(configuration.getHttpContextConfiguration().getMultipartHeaderBufferSize());
362-
return this;
363-
}
364-
365337
public void reset() {
366338
LOG.debug().$("reset [fd=").$(getFd()).$(']').$();
367339
this.totalBytesSent += responseSink.getTotalBytesSent();
@@ -1034,4 +1006,24 @@ private void shiftReceiveBufferUnprocessedBytes(long start, int receivedBytes) {
10341006
Vect.memmove(recvBuffer, start, receivedBytes);
10351007
LOG.debug().$("peer is slow, waiting for bigger part to parse [multipart]").$();
10361008
}
1009+
1010+
@Override
1011+
protected void doInit() throws TlsSessionInitFailedException {
1012+
// the context is obtained from the pool, so we should initialize the memory
1013+
if (recvBuffer == 0) {
1014+
// re-read recv buffer size in case the config was reloaded
1015+
recvBufferSize = configuration.getRecvBufferSize();
1016+
recvBufferReadSize = Math.min(forceFragmentationReceiveChunkSize, recvBufferSize);
1017+
recvBuffer = Unsafe.malloc(recvBufferSize, MemoryTag.NATIVE_HTTP_CONN);
1018+
}
1019+
// re-read buffer sizes in case the config was reloaded
1020+
responseSink.of(socket, configuration.getSendBufferSize());
1021+
headerParser.reopen(configuration.getHttpContextConfiguration().getRequestHeaderBufferSize());
1022+
multipartContentHeaderParser.reopen(configuration.getHttpContextConfiguration().getMultipartHeaderBufferSize());
1023+
1024+
if (socket.supportsTls()) {
1025+
socket.startTlsSession(null);
1026+
}
1027+
connectionCounted = false;
1028+
}
10371029
}

core/src/main/java/io/questdb/cutlass/http/HttpMinServerConfigurationWrapper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ public HttpMinServerConfigurationWrapper(Metrics metrics) {
4545
delegate.set(null);
4646
}
4747

48+
@Override
49+
public long getAcceptLoopTimeout() {
50+
return getDelegate().getAcceptLoopTimeout();
51+
}
52+
4853
@Override
4954
public int getBindIPv4Address() {
5055
return getDelegate().getBindIPv4Address();

core/src/main/java/io/questdb/cutlass/http/HttpServerConfigurationWrapper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ public HttpServerConfigurationWrapper(Metrics metrics) {
5050
delegate.set(null);
5151
}
5252

53+
@Override
54+
public long getAcceptLoopTimeout() {
55+
return getDelegate().getAcceptLoopTimeout();
56+
}
57+
5358
@Override
5459
public int getBindIPv4Address() {
5560
return getDelegate().getBindIPv4Address();

core/src/main/java/io/questdb/cutlass/http/client/HttpClient.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.questdb.network.NetworkFacade;
3535
import io.questdb.network.Socket;
3636
import io.questdb.network.SocketFactory;
37+
import io.questdb.network.TlsSessionInitFailedException;
3738
import io.questdb.std.BinarySequence;
3839
import io.questdb.std.Chars;
3940
import io.questdb.std.MemoryTag;
@@ -609,10 +610,15 @@ private void connect(CharSequence host, int port) {
609610
}
610611

611612
if (socket.supportsTls()) {
612-
if (socket.startTlsSession(host) < 0) {
613+
try {
614+
socket.startTlsSession(host);
615+
} catch (TlsSessionInitFailedException e) {
613616
int errno = nf.errno();
614617
disconnect();
615-
throw new HttpClientException("could not start TLS session [fd=").put(fd).put(", errno=").put(errno).put(']');
618+
throw new HttpClientException("could not start TLS session [fd=").put(fd)
619+
.put(", error=").put(e.getFlyweightMessage())
620+
.put(", errno=").put(errno)
621+
.put(']');
616622
}
617623
}
618624
setupIoWait();

0 commit comments

Comments
 (0)