Skip to content

Commit bf956c6

Browse files
authored
feat(sql): use vectorized computation in more non-keyed GROUP BY queries (#6805)
1 parent 8185fe0 commit bf956c6

File tree

104 files changed

+3002
-746
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

104 files changed

+3002
-746
lines changed

core/src/main/java/io/questdb/griffin/engine/functions/GroupByFunction.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,22 +53,27 @@ default void clear() {
5353
* {@link io.questdb.griffin.engine.groupby.GroupByColumnSink}, exposing the values in native
5454
* memory starting at {@code ptr}. Each entry has the fixed size implied by the function's
5555
* argument type. Implementations can use vectorised routines to consume the {@code count}
56-
* consecutive values and must write the resulting aggregate into {@code mapValue}.
56+
* consecutive values and must accumulate the result into {@code mapValue}.
57+
* <p>
58+
* This method may be called multiple times for the same group {@link MapValue} (e.g. once
59+
* per page frame). Implementations must accumulate into the existing state set by
60+
* {@link #setEmpty(MapValue)}, not overwrite it.
5761
* <p>
5862
* This method:
5963
* <ul>
60-
* <li>runs at most once per group {@link MapValue}, immediately after {@link #setEmpty(MapValue)};</li>
6164
* <li>runs without a preceding {@link #computeFirst(MapValue, Record, long)} invocation;</li>
6265
* <li>is not followed by {@link #merge(MapValue, MapValue)};</li>
63-
* <li>always receives a non-zero {@code ptr} pointing to readable memory;</li>
6466
* <li>is used only when {@link #supportsBatchComputation()} returns {@code true}.</li>
6567
* </ul>
6668
*
67-
* @param mapValue group state that must be updated with the aggregated result
68-
* @param ptr native memory address of the first buffered value for the group
69-
* @param count number of buffered values that can be read starting from {@code ptr}
69+
* @param mapValue group state that must be updated with the aggregated result
70+
* @param ptr native memory address of the first buffered value for the group, or 0 for
71+
* no-arg functions (e.g. count(*))
72+
* @param count number of buffered values that can be read starting from {@code ptr}
73+
* @param startRowId row id of the first record in the batch; the row id of the i-th
74+
* record is {@code startRowId + i}
7075
*/
71-
default void computeBatch(MapValue mapValue, long ptr, int count) {
76+
default void computeBatch(MapValue mapValue, long ptr, int count, long startRowId) {
7277
throw new UnsupportedOperationException();
7378
}
7479

@@ -146,7 +151,7 @@ default Function getComputeBatchArg() {
146151
* the argument function is LONG, but the aggregate function's argument type is
147152
* DOUBLE. This means that the input values need to be materialized in
148153
* an intermediate buffer via getDouble calls before to calling
149-
* {@link #computeBatch(MapValue, long, int)}.
154+
* {@link #computeBatch(MapValue, long, int, long)}.
150155
*
151156
* @return the column type of the batch argument
152157
*/
@@ -332,7 +337,7 @@ default void setShort(MapValue mapValue, short value) {
332337
}
333338

334339
/**
335-
* Indicates whether {@link #computeBatch(MapValue, long, int)}, {@link #getComputeBatchArg()},
340+
* Indicates whether {@link #computeBatch(MapValue, long, int, long)}, {@link #getComputeBatchArg()},
336341
* and {@link #getComputeBatchArgType()} are implemented for this function. When {@code true},
337342
* the engine may materialise the argument column into native memory buffers and invoke
338343
* {@code computeBatch} instead of per-row aggregation for compatible execution paths.

core/src/main/java/io/questdb/griffin/engine/functions/groupby/AvgDoubleGroupByFunction.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import io.questdb.griffin.engine.functions.DoubleFunction;
3333
import io.questdb.griffin.engine.functions.GroupByFunction;
3434
import io.questdb.griffin.engine.functions.UnaryFunction;
35-
import io.questdb.std.Numbers;
3635
import io.questdb.std.Vect;
3736
import org.jetbrains.annotations.NotNull;
3837

@@ -45,21 +44,31 @@ public AvgDoubleGroupByFunction(@NotNull Function arg) {
4544
}
4645

4746
@Override
48-
public void computeBatch(MapValue mapValue, long ptr, int count) {
47+
public void computeBatch(MapValue mapValue, long ptr, int count, long startRowId) {
4948
if (count > 0) {
5049
final long countPtr = mapValue.getAddress(valueIndex + 1);
51-
final double sum = Vect.sumDoubleAcc(ptr, count, countPtr);
52-
if (!Numbers.isNull(sum)) {
53-
mapValue.putDouble(valueIndex, sum);
54-
// the count is already updated by the sumDoubleAcc call, so we don't need to write it here
50+
final long prevCount = mapValue.getLong(valueIndex + 1);
51+
final double batchSum = Vect.sumDoubleAcc(ptr, count, countPtr);
52+
// sumDoubleAcc overwrites *countPtr with the batch count
53+
final long batchCount = mapValue.getLong(valueIndex + 1);
54+
if (batchCount > 0) {
55+
final double prevSum = mapValue.getDouble(valueIndex);
56+
if (prevCount > 0) {
57+
mapValue.putDouble(valueIndex, prevSum + batchSum);
58+
} else {
59+
mapValue.putDouble(valueIndex, batchSum);
60+
}
61+
mapValue.putLong(valueIndex + 1, prevCount + batchCount);
62+
} else {
63+
mapValue.putLong(valueIndex + 1, prevCount);
5564
}
5665
}
5766
}
5867

5968
@Override
6069
public void computeFirst(MapValue mapValue, Record record, long rowId) {
6170
final double d = arg.getDouble(record);
62-
if (Numbers.isFinite(d)) {
71+
if (!Double.isNaN(d)) {
6372
mapValue.putDouble(valueIndex, d);
6473
mapValue.putLong(valueIndex + 1, 1L);
6574
} else {
@@ -71,7 +80,7 @@ public void computeFirst(MapValue mapValue, Record record, long rowId) {
7180
@Override
7281
public void computeNext(MapValue mapValue, Record record, long rowId) {
7382
final double d = arg.getDouble(record);
74-
if (Numbers.isFinite(d)) {
83+
if (!Double.isNaN(d)) {
7584
mapValue.addDouble(valueIndex, d);
7685
mapValue.addLong(valueIndex + 1, 1L);
7786
}

core/src/main/java/io/questdb/griffin/engine/functions/groupby/AvgShortGroupByFunction.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,16 @@ public AvgShortGroupByFunction(@NotNull Function arg) {
4545
}
4646

4747
@Override
48-
public void computeBatch(MapValue mapValue, long ptr, int count) {
48+
public void computeBatch(MapValue mapValue, long ptr, int count, long startRowId) {
4949
if (count > 0) {
50-
mapValue.putLong(valueIndex, Vect.sumShort(ptr, count));
51-
mapValue.putLong(valueIndex + 1, count);
50+
final long batchSum = Vect.sumShort(ptr, count);
51+
final long existing = mapValue.getLong(valueIndex);
52+
if (existing == Numbers.LONG_NULL) {
53+
mapValue.putLong(valueIndex, batchSum);
54+
} else {
55+
mapValue.putLong(valueIndex, existing + batchSum);
56+
}
57+
mapValue.addLong(valueIndex + 1, count);
5258
}
5359
}
5460

core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountDoubleGroupByFunction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ public CountDoubleGroupByFunction(@NotNull Function arg) {
3939
}
4040

4141
@Override
42-
public void computeBatch(MapValue mapValue, long ptr, int count) {
42+
public void computeBatch(MapValue mapValue, long ptr, int count, long startRowId) {
4343
if (count > 0) {
4444
final long nonNullCount = Vect.countDouble(ptr, count);
4545
if (nonNullCount > 0) {
46-
mapValue.putLong(valueIndex, nonNullCount);
46+
mapValue.addLong(valueIndex, nonNullCount);
4747
}
4848
}
4949
}

core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountFloatGroupByFunction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public CountFloatGroupByFunction(@NotNull Function arg) {
3838
}
3939

4040
@Override
41-
public void computeBatch(MapValue mapValue, long ptr, int count) {
41+
public void computeBatch(MapValue mapValue, long ptr, int count, long startRowId) {
4242
if (count > 0) {
4343
long nonNullCount = 0;
4444
final long hi = ptr + count * (long) Float.BYTES;
@@ -49,7 +49,7 @@ public void computeBatch(MapValue mapValue, long ptr, int count) {
4949
}
5050
}
5151
if (nonNullCount > 0) {
52-
mapValue.putLong(valueIndex, nonNullCount);
52+
mapValue.addLong(valueIndex, nonNullCount);
5353
}
5454
}
5555
}

core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountGeoHashGroupByFunctionByte.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public CountGeoHashGroupByFunctionByte(@NotNull Function arg) {
3838
}
3939

4040
@Override
41-
public void computeBatch(MapValue mapValue, long ptr, int count) {
41+
public void computeBatch(MapValue mapValue, long ptr, int count, long startRowId) {
4242
if (count > 0) {
4343
long nonNullCount = 0;
4444
final long hi = ptr + count;
@@ -47,7 +47,7 @@ public void computeBatch(MapValue mapValue, long ptr, int count) {
4747
nonNullCount++;
4848
}
4949
}
50-
mapValue.putLong(valueIndex, nonNullCount);
50+
mapValue.addLong(valueIndex, nonNullCount);
5151
}
5252
}
5353

core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountGeoHashGroupByFunctionInt.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public CountGeoHashGroupByFunctionInt(@NotNull Function arg) {
3939
}
4040

4141
@Override
42-
public void computeBatch(MapValue mapValue, long ptr, int count) {
42+
public void computeBatch(MapValue mapValue, long ptr, int count, long startRowId) {
4343
if (count > 0) {
4444
long nonNullCount = 0;
4545
final long hi = ptr + count * 4L;
@@ -48,7 +48,7 @@ public void computeBatch(MapValue mapValue, long ptr, int count) {
4848
nonNullCount++;
4949
}
5050
}
51-
mapValue.putLong(valueIndex, nonNullCount);
51+
mapValue.addLong(valueIndex, nonNullCount);
5252
}
5353
}
5454

core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountGeoHashGroupByFunctionLong.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public CountGeoHashGroupByFunctionLong(@NotNull Function arg) {
3939
}
4040

4141
@Override
42-
public void computeBatch(MapValue mapValue, long ptr, int count) {
42+
public void computeBatch(MapValue mapValue, long ptr, int count, long startRowId) {
4343
if (count > 0) {
4444
long nonNullCount = 0;
4545
final long hi = ptr + count * 8L;
@@ -48,7 +48,7 @@ public void computeBatch(MapValue mapValue, long ptr, int count) {
4848
nonNullCount++;
4949
}
5050
}
51-
mapValue.putLong(valueIndex, nonNullCount);
51+
mapValue.addLong(valueIndex, nonNullCount);
5252
}
5353
}
5454

core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountGeoHashGroupByFunctionShort.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public CountGeoHashGroupByFunctionShort(@NotNull Function arg) {
3939
}
4040

4141
@Override
42-
public void computeBatch(MapValue mapValue, long ptr, int count) {
42+
public void computeBatch(MapValue mapValue, long ptr, int count, long startRowId) {
4343
if (count > 0) {
4444
long nonNullCount = 0;
4545
final long hi = ptr + count * 2L;
@@ -48,7 +48,7 @@ public void computeBatch(MapValue mapValue, long ptr, int count) {
4848
nonNullCount++;
4949
}
5050
}
51-
mapValue.putLong(valueIndex, nonNullCount);
51+
mapValue.addLong(valueIndex, nonNullCount);
5252
}
5353
}
5454

core/src/main/java/io/questdb/griffin/engine/functions/groupby/CountIPv4GroupByFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public CountIPv4GroupByFunction(@NotNull Function arg) {
3939
}
4040

4141
@Override
42-
public void computeBatch(MapValue mapValue, long ptr, int count) {
42+
public void computeBatch(MapValue mapValue, long ptr, int count, long startRowId) {
4343
if (count > 0) {
4444
long nonNullCount = 0;
4545
final long hi = ptr + count * (long) Integer.BYTES;

0 commit comments

Comments
 (0)