Skip to content

Commit 2e0a9f8

Browse files
authored
Merge branch 'master' into vi_writer_stats
2 parents b6eb93c + 031ff0f commit 2e0a9f8

25 files changed

+659
-37
lines changed

core/src/main/java/io/questdb/MessageBus.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import io.questdb.tasks.ColumnIndexerTask;
4141
import io.questdb.tasks.ColumnPurgeTask;
4242
import io.questdb.tasks.ColumnTask;
43+
import io.questdb.tasks.GroupByLongTopKTask;
4344
import io.questdb.tasks.GroupByMergeShardTask;
4445
import io.questdb.tasks.LatestByTask;
4546
import io.questdb.tasks.O3CopyTask;
@@ -88,6 +89,12 @@ public interface MessageBus extends Closeable {
8889

8990
MCSequence getCopyImportSubSeq();
9091

92+
MPSequence getGroupByLongTopKPubSeq();
93+
94+
RingQueue<GroupByLongTopKTask> getGroupByLongTopKQueue();
95+
96+
MCSequence getGroupByLongTopKSubSeq();
97+
9198
MPSequence getGroupByMergeShardPubSeq();
9299

93100
RingQueue<GroupByMergeShardTask> getGroupByMergeShardQueue();

core/src/main/java/io/questdb/MessageBusImpl.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import io.questdb.tasks.ColumnIndexerTask;
4343
import io.questdb.tasks.ColumnPurgeTask;
4444
import io.questdb.tasks.ColumnTask;
45+
import io.questdb.tasks.GroupByLongTopKTask;
4546
import io.questdb.tasks.GroupByMergeShardTask;
4647
import io.questdb.tasks.LatestByTask;
4748
import io.questdb.tasks.O3CopyTask;
@@ -72,6 +73,9 @@ public class MessageBusImpl implements MessageBus {
7273
private final RingQueue<CopyImportRequestTask> copyImportRequestQueue;
7374
private final SCSequence copyImportRequestSubSeq;
7475
private final MCSequence copyImportSubSeq;
76+
private final MPSequence groupByLongTopKPubSeq;
77+
private final RingQueue<GroupByLongTopKTask> groupByLongTopKQueue;
78+
private final MCSequence groupByLongTopKSubSeq;
7579
private final MPSequence groupByMergeShardPubSeq;
7680
private final RingQueue<GroupByMergeShardTask> groupByMergeShardQueue;
7781
private final MCSequence groupByMergeShardSubSeq;
@@ -219,6 +223,11 @@ public MessageBusImpl(@NotNull CairoConfiguration configuration) {
219223
this.groupByMergeShardSubSeq = new MCSequence(groupByMergeShardQueue.getCycle());
220224
groupByMergeShardPubSeq.then(groupByMergeShardSubSeq).then(groupByMergeShardPubSeq);
221225

226+
this.groupByLongTopKQueue = new RingQueue<>(GroupByLongTopKTask::new, configuration.getGroupByTopKQueueCapacity());
227+
this.groupByLongTopKPubSeq = new MPSequence(groupByLongTopKQueue.getCycle());
228+
this.groupByLongTopKSubSeq = new MCSequence(groupByLongTopKQueue.getCycle());
229+
groupByLongTopKPubSeq.then(groupByLongTopKSubSeq).then(groupByLongTopKPubSeq);
230+
222231
this.queryCacheEventPubSeq = new MPSequence(configuration.getQueryCacheEventQueueCapacity());
223232
this.queryCacheEventSubSeq = new MCSequence(configuration.getQueryCacheEventQueueCapacity());
224233
queryCacheEventPubSeq.then(queryCacheEventSubSeq).then(queryCacheEventPubSeq);
@@ -355,6 +364,21 @@ public MCSequence getCopyImportSubSeq() {
355364
return copyImportSubSeq;
356365
}
357366

367+
@Override
368+
public MPSequence getGroupByLongTopKPubSeq() {
369+
return groupByLongTopKPubSeq;
370+
}
371+
372+
@Override
373+
public RingQueue<GroupByLongTopKTask> getGroupByLongTopKQueue() {
374+
return groupByLongTopKQueue;
375+
}
376+
377+
@Override
378+
public MCSequence getGroupByLongTopKSubSeq() {
379+
return groupByLongTopKSubSeq;
380+
}
381+
358382
@Override
359383
public MPSequence getGroupByMergeShardPubSeq() {
360384
return groupByMergeShardPubSeq;

core/src/main/java/io/questdb/PropServerConfiguration.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,8 @@ public class PropServerConfiguration implements ServerConfiguration {
164164
private final long cairoGroupByPresizeMaxCapacity;
165165
private final long cairoGroupByPresizeMaxHeapSize;
166166
private final int cairoGroupByShardingThreshold;
167+
private final int cairoGroupByTopKQueueCapacity;
168+
private final long cairoGroupByTopKThreshold;
167169
private final int cairoMaxCrashFiles;
168170
private final int cairoPageFrameReduceColumnListCapacity;
169171
private final int cairoPageFrameReduceQueueCapacity;
@@ -1841,10 +1843,12 @@ public PropServerConfiguration(
18411843
this.cairoPageFrameReduceQueueCapacity = Numbers.ceilPow2(getInt(properties, env, PropertyKey.CAIRO_PAGE_FRAME_REDUCE_QUEUE_CAPACITY, defaultReduceQueueCapacity));
18421844
this.cairoGroupByMergeShardQueueCapacity = Numbers.ceilPow2(getInt(properties, env, PropertyKey.CAIRO_SQL_PARALLEL_GROUPBY_MERGE_QUEUE_CAPACITY, defaultReduceQueueCapacity));
18431845
this.vectorAggregateQueueCapacity = Numbers.ceilPow2(getInt(properties, env, PropertyKey.CAIRO_VECTOR_AGGREGATE_QUEUE_CAPACITY, defaultReduceQueueCapacity));
1846+
this.cairoGroupByTopKQueueCapacity = Numbers.ceilPow2(getInt(properties, env, PropertyKey.CAIRO_SQL_PARALLEL_GROUPBY_TOP_K_QUEUE_CAPACITY, defaultReduceQueueCapacity));
18441847
this.cairoGroupByShardingThreshold = getInt(properties, env, PropertyKey.CAIRO_SQL_PARALLEL_GROUPBY_SHARDING_THRESHOLD, 10_000);
18451848
this.cairoGroupByPresizeEnabled = getBoolean(properties, env, PropertyKey.CAIRO_SQL_PARALLEL_GROUPBY_PRESIZE_ENABLED, true);
18461849
this.cairoGroupByPresizeMaxCapacity = getLong(properties, env, PropertyKey.CAIRO_SQL_PARALLEL_GROUPBY_PRESIZE_MAX_CAPACITY, 100_000_000);
18471850
this.cairoGroupByPresizeMaxHeapSize = getLongSize(properties, env, PropertyKey.CAIRO_SQL_PARALLEL_GROUPBY_PRESIZE_MAX_HEAP_SIZE, Numbers.SIZE_1GB);
1851+
this.cairoGroupByTopKThreshold = getLong(properties, env, PropertyKey.CAIRO_SQL_PARALLEL_GROUPBY_TOP_K_THRESHOLD, 5_000_000);
18481852
this.cairoPageFrameReduceRowIdListCapacity = Numbers.ceilPow2(getInt(properties, env, PropertyKey.CAIRO_PAGE_FRAME_ROWID_LIST_CAPACITY, 256));
18491853
this.cairoPageFrameReduceColumnListCapacity = Numbers.ceilPow2(getInt(properties, env, PropertyKey.CAIRO_PAGE_FRAME_COLUMN_LIST_CAPACITY, 16));
18501854
final int defaultReduceShardCount = queryWorkers > 0 ? Math.min(queryWorkers, 4) : 0;
@@ -3113,6 +3117,11 @@ public int getCommitMode() {
31133117
return confRoot;
31143118
}
31153119

3120+
@Override
3121+
public int getCopierType() {
3122+
return copierType;
3123+
}
3124+
31163125
@Override
31173126
public @NotNull LongSupplier getCopyIDSupplier() {
31183127
if (cairoSQLCopyIdSupplier == 0) {
@@ -3266,6 +3275,11 @@ public int getGroupByMergeShardQueueCapacity() {
32663275
return cairoGroupByMergeShardQueueCapacity;
32673276
}
32683277

3278+
@Override
3279+
public long getGroupByParallelTopKThreshold() {
3280+
return cairoGroupByTopKThreshold;
3281+
}
3282+
32693283
@Override
32703284
public int getGroupByPoolCapacity() {
32713285
return sqlGroupByPoolCapacity;
@@ -3286,6 +3300,11 @@ public int getGroupByShardingThreshold() {
32863300
return cairoGroupByShardingThreshold;
32873301
}
32883302

3303+
@Override
3304+
public int getGroupByTopKQueueCapacity() {
3305+
return cairoGroupByTopKQueueCapacity;
3306+
}
3307+
32893308
@Override
32903309
public int getIdGenerateBatchStep() {
32913310
return idGenerateBatchStep;
@@ -4236,11 +4255,6 @@ public boolean isColumnAliasExpressionEnabled() {
42364255
return cairoSqlColumnAliasExpressionEnabled;
42374256
}
42384257

4239-
@Override
4240-
public int getCopierType() {
4241-
return copierType;
4242-
}
4243-
42444258
@Override
42454259
public boolean isCopierChunkedEnabled() {
42464260
return copierChunkedEnabled;

core/src/main/java/io/questdb/PropertyKey.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ public enum PropertyKey implements ConfigPropertyKey {
108108
CAIRO_SQL_PARALLEL_GROUPBY_PRESIZE_ENABLED("cairo.sql.parallel.groupby.presize.enabled"),
109109
CAIRO_SQL_PARALLEL_GROUPBY_PRESIZE_MAX_CAPACITY("cairo.sql.parallel.groupby.presize.max.capacity"),
110110
CAIRO_SQL_PARALLEL_GROUPBY_PRESIZE_MAX_HEAP_SIZE("cairo.sql.parallel.groupby.presize.max.heap.size"),
111+
CAIRO_SQL_PARALLEL_GROUPBY_TOP_K_THRESHOLD("cairo.sql.parallel.groupby.topk.threshold"),
112+
CAIRO_SQL_PARALLEL_GROUPBY_TOP_K_QUEUE_CAPACITY("cairo.sql.parallel.groupby.topk.queue.capacity"),
111113
CAIRO_SQL_PARALLEL_WORK_STEALING_THRESHOLD("cairo.sql.parallel.work.stealing.threshold"),
112114
CAIRO_SQL_PARALLEL_READ_PARQUET_ENABLED("cairo.sql.parallel.read.parquet.enabled"),
113115
CAIRO_SQL_PARQUET_FRAME_CACHE_CAPACITY("cairo.sql.parquet.frame.cache.capacity"),

core/src/main/java/io/questdb/cairo/CairoConfiguration.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,8 @@ default Map<String, String> getEnv() {
259259

260260
int getGroupByMergeShardQueueCapacity();
261261

262+
long getGroupByParallelTopKThreshold();
263+
262264
int getGroupByPoolCapacity();
263265

264266
long getGroupByPresizeMaxCapacity();
@@ -267,6 +269,8 @@ default Map<String, String> getEnv() {
267269

268270
int getGroupByShardingThreshold();
269271

272+
int getGroupByTopKQueueCapacity();
273+
270274
@NotNull
271275
default IOURingFacade getIOURingFacade() {
272276
return IOURingFacadeImpl.INSTANCE;

core/src/main/java/io/questdb/cairo/CairoConfigurationWrapper.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,11 @@ public int getGroupByMergeShardQueueCapacity() {
361361
return getDelegate().getGroupByMergeShardQueueCapacity();
362362
}
363363

364+
@Override
365+
public long getGroupByParallelTopKThreshold() {
366+
return getDelegate().getGroupByParallelTopKThreshold();
367+
}
368+
364369
@Override
365370
public int getGroupByPoolCapacity() {
366371
return getDelegate().getGroupByPoolCapacity();
@@ -381,6 +386,11 @@ public int getGroupByShardingThreshold() {
381386
return getDelegate().getGroupByShardingThreshold();
382387
}
383388

389+
@Override
390+
public int getGroupByTopKQueueCapacity() {
391+
return getDelegate().getGroupByTopKQueueCapacity();
392+
}
393+
384394
@Override
385395
public int getIdGenerateBatchStep() {
386396
return getDelegate().getIdGenerateBatchStep();

core/src/main/java/io/questdb/cairo/DefaultCairoConfiguration.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,11 @@ public int getGroupByMergeShardQueueCapacity() {
391391
return 32;
392392
}
393393

394+
@Override
395+
public long getGroupByParallelTopKThreshold() {
396+
return 5_000_000;
397+
}
398+
394399
@Override
395400
public int getGroupByPoolCapacity() {
396401
return 1024;
@@ -411,6 +416,11 @@ public int getGroupByShardingThreshold() {
411416
return 1000;
412417
}
413418

419+
@Override
420+
public int getGroupByTopKQueueCapacity() {
421+
return 32;
422+
}
423+
414424
@Override
415425
public int getIdGenerateBatchStep() {
416426
return 512;

core/src/main/java/io/questdb/cairo/map/OrderedMapFixedSizeCursor.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ public void longTopK(DirectLongLongSortedList list, Function recordFunction) {
9898
}
9999
}
100100

101+
@Override
102+
public long preComputedStateSize() {
103+
// no pre-calculated state
104+
return 0;
105+
}
106+
101107
@Override
102108
public void recordAt(Record record, long atRowId) {
103109
((OrderedMapFixedSizeRecord) record).of(atRowId);
@@ -108,12 +114,6 @@ public long size() {
108114
return size;
109115
}
110116

111-
@Override
112-
public long preComputedStateSize() {
113-
// no pre-calculated state
114-
return 0;
115-
}
116-
117117
@Override
118118
public void toTop() {
119119
heapAddr = heapStart;

core/src/main/java/io/questdb/cutlass/http/HttpServerConfigurationWrapper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ public ObjHashSet<String> getContextPathSettings() {
110110
return getDelegate().getContextPathSettings();
111111
}
112112

113+
@Override
114+
public ObjHashSet<String> getContextPathSqlValidation() {
115+
return getDelegate().getContextPathSqlValidation();
116+
}
117+
113118
@Override
114119
public ObjHashSet<String> getContextPathTableStatus() {
115120
return getDelegate().getContextPathTableStatus();

core/src/main/java/io/questdb/griffin/engine/functions/GroupByFunction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,12 +265,12 @@ default void setByte(MapValue mapValue, byte value) {
265265
throw new UnsupportedOperationException();
266266
}
267267

268-
// to be used when doing interpolation
268+
// TODO(RaphDal): to be used when doing interpolation
269269
default void setDecimal128(MapValue mapValue, Decimal128 value) {
270270
throw new UnsupportedOperationException();
271271
}
272272

273-
// to be used when doing interpolation
273+
// TODO(RaphDal): to be used when doing interpolation
274274
default void setDecimal256(MapValue mapValue, Decimal256 value) {
275275
throw new UnsupportedOperationException();
276276
}

0 commit comments

Comments
 (0)