Skip to content

perf(sql): parallel group by with optional filtering#4032

Merged
ideoma merged 96 commits intomasterfrom
puzpuzpuz_parallel_group_by
Dec 23, 2023
Merged

perf(sql): parallel group by with optional filtering#4032
ideoma merged 96 commits intomasterfrom
puzpuzpuz_parallel_group_by

Conversation

@puzpuzpuz
Copy link
Copy Markdown
Contributor

@puzpuzpuz puzpuzpuz commented Dec 4, 2023

Currently, GROUP BY queries run in parallel only in a few cases:

  • Non-keyed GROUP BY with basic aggregate functions, e.g. select sum(value) from t.
  • Single INT or SYMBOL-keyed GROUP BY with basic aggregate functions, e.g. select int_key, sum(value) from t.

This patch extends cases where we go with multi-threaded GROUP BY execution. The implementation builds on top of parallel SQL filters (a.k.a. async offload), so the same scheduling and cancellation behavior applies. The work is split into page frame tasks, aggregated by the shared workers, and accumulated in FastMap (keyed GROUP BY) or SimpleMapValue (non-keyed GROUP BY). FastMap/SimpleMapValue is reused between different query executions (and different queries).

As an optional second step in the query processing, we merge sharded maps in parallel. Shards contain non-intersecting sets of groups, so that once we have full shards, we return their rows to the caller. This behavior kicks in only in case of large enough maps (cairo.sql.parallel.groupby.sharding.threshold, defaults to 10k).

The implementation also "steals" filter from the underlying factory, so both of the following sample queries will be executed by the new parallel GROUP BY framework:

-- column keys are supported
select str_col, sum(long_col) from t;
-- function and operation keys are supported; filters are also supported
select concat(str_col1, str_col2), sum(long_col) from t where long_col > 42;
-- non-keyed GROUP BY is also supported
select vwap(price, quantity) from t where quantity > 42;

Currently supported aggregate functions: count(*) and count(col), avg, sum, min/max, vwap (all for fixed-size types).

Benchmark results aren't included, but the improvement on my 4c/8t machine varies from 2x to 10x depending on the query.

The new behavior is enabled by default, but can be switched off with cairo.sql.parallel.groupby.enabled=false.

Also includes #4078 (single count_distinct re-write to a parallel GROUP BY for all supported types except symbol).

Other limitations

  • Aggregate functions that have an additional state, e.g. count_distinct, aren't yet supported.

Next steps

  • Port aggregate functions with additional state to the new framework.
  • Get rid of CompactMap and introduce UnorderedMap for the small fixed-size key-value case. That's to speed up key look-ups by avoiding extra access to FastMap's heap once we've determined the hash table slot.
  • Start building multi-threaded SAMPLE BY factories based on the same approach.

@puzpuzpuz puzpuzpuz added SQL Issues or changes relating to SQL execution Performance Performance improvements labels Dec 4, 2023
@puzpuzpuz puzpuzpuz self-assigned this Dec 4, 2023
@puzpuzpuz puzpuzpuz changed the title perf(sql): parallel group by and optional filtering perf(sql): parallel group by with optional filtering Dec 4, 2023
ideoma
ideoma previously approved these changes Dec 21, 2023
@ideoma
Copy link
Copy Markdown
Collaborator

ideoma commented Dec 22, 2023

[PR Coverage check]

😍 pass : 2012 / 2272 (88.56%)

file detail

path covered line new line coverage
🔵 io/questdb/cairo/TestSink.java 0 1 00.00%
🔵 io/questdb/griffin/engine/table/LatestByValueDeferredIndexedFilteredRecordCursorFactory.java 0 1 00.00%
🔵 io/questdb/cairo/map/MapRecord.java 0 3 00.00%
🔵 io/questdb/griffin/engine/groupby/vect/GroupByNotKeyedVectorRecordCursorFactory.java 0 1 00.00%
🔵 io/questdb/griffin/engine/table/LatestByRecordCursorFactory.java 0 1 00.00%
🔵 io/questdb/griffin/engine/groupby/SampleByInterpolateRecordCursorFactory.java 0 1 00.00%
🔵 io/questdb/cairo/map/MapKey.java 0 3 00.00%
🔵 io/questdb/griffin/engine/orderby/SortedLightRecordCursorFactory.java 0 1 00.00%
🔵 io/questdb/griffin/engine/join/RecordAsAFieldRecordCursorFactory.java 0 1 00.00%
🔵 io/questdb/griffin/engine/groupby/DistinctRecordCursorFactory.java 0 1 00.00%
🔵 io/questdb/griffin/engine/table/LatestByLightRecordCursorFactory.java 0 1 00.00%
🔵 io/questdb/cairo/map/MapValue.java 0 1 00.00%
🔵 io/questdb/cairo/map/Map.java 0 2 00.00%
🔵 io/questdb/cutlass/pgwire/CleartextPasswordPgWireAuthenticator.java 0 1 00.00%
🔵 io/questdb/griffin/engine/table/LatestByValueIndexedFilteredRecordCursorFactory.java 0 1 00.00%
🔵 io/questdb/griffin/engine/groupby/SampleByFirstLastRecordCursorFactory.java 0 1 00.00%
🔵 io/questdb/griffin/engine/orderby/SortedRecordCursorFactory.java 0 1 00.00%
🔵 io/questdb/griffin/engine/table/LatestByAllIndexedRecordCursorFactory.java 0 1 00.00%
🔵 io/questdb/griffin/model/RuntimeIntervalModel.java 0 1 00.00%
🔵 io/questdb/griffin/engine/LimitRecordCursorFactory.java 0 1 00.00%
🔵 io/questdb/griffin/engine/window/CachedWindowRecordCursorFactory.java 0 1 00.00%
🔵 io/questdb/griffin/engine/groupby/CountRecordCursorFactory.java 0 1 00.00%
🔵 io/questdb/cairo/CairoConfigurationWrapper.java 0 4 00.00%
🔵 io/questdb/cairo/map/CompactMapValue.java 0 1 00.00%
🔵 io/questdb/griffin/engine/functions/groupby/MaxDoubleGroupByFunction.java 1 6 16.67%
🔵 io/questdb/griffin/engine/functions/groupby/SumLongGroupByFunction.java 1 6 16.67%
🔵 io/questdb/griffin/engine/functions/groupby/MinFloatGroupByFunction.java 1 6 16.67%
🔵 io/questdb/griffin/engine/functions/groupby/MaxCharGroupByFunction.java 1 6 16.67%
🔵 io/questdb/griffin/engine/functions/groupby/MinIPv4GroupByFunction.java 1 6 16.67%
🔵 io/questdb/griffin/engine/functions/groupby/MinDoubleGroupByFunction.java 1 6 16.67%
🔵 io/questdb/griffin/engine/functions/groupby/MaxIPv4GroupByFunction.java 1 6 16.67%
🔵 io/questdb/griffin/engine/functions/groupby/MaxTimestampGroupByFunction.java 1 6 16.67%
🔵 io/questdb/griffin/engine/functions/groupby/MaxDateGroupByFunction.java 1 6 16.67%
🔵 io/questdb/griffin/engine/functions/groupby/MinLongGroupByFunction.java 1 6 16.67%
🔵 io/questdb/griffin/engine/functions/groupby/MinIntGroupByFunction.java 1 6 16.67%
🔵 io/questdb/griffin/engine/functions/groupby/SumIntGroupByFunction.java 1 6 16.67%
🔵 io/questdb/griffin/engine/functions/groupby/MaxIntGroupByFunction.java 1 6 16.67%
🔵 io/questdb/griffin/engine/functions/groupby/AbstractCountGroupByFunction.java 1 4 25.00%
🔵 io/questdb/griffin/engine/functions/groupby/MinTimestampGroupByFunction.java 2 7 28.57%
🔵 io/questdb/griffin/engine/functions/groupby/MaxFloatGroupByFunction.java 2 7 28.57%
🔵 io/questdb/griffin/engine/functions/groupby/MaxLongGroupByFunction.java 2 7 28.57%
🔵 io/questdb/griffin/engine/functions/groupby/MinDateGroupByFunction.java 2 7 28.57%
🔵 io/questdb/griffin/model/ExpressionNode.java 1 3 33.33%
🔵 io/questdb/cairo/sql/StatefulAtom.java 1 2 50.00%
🔵 io/questdb/griffin/engine/table/LatestBySubQueryRecordCursorFactory.java 1 2 50.00%
🔵 io/questdb/griffin/engine/functions/GroupByFunction.java 1 2 50.00%
🔵 io/questdb/griffin/engine/functions/groupby/SumLong256GroupByFunction.java 9 14 64.29%
🔵 io/questdb/griffin/engine/functions/groupby/FirstCharGroupByFunction.java 2 3 66.67%
🔵 io/questdb/griffin/engine/functions/groupby/FirstSymbolGroupByFunction.java 2 3 66.67%
🔵 io/questdb/griffin/engine/functions/groupby/FirstBooleanGroupByFunction.java 2 3 66.67%
🔵 io/questdb/griffin/engine/functions/groupby/FirstTimestampGroupByFunction.java 2 3 66.67%
🔵 io/questdb/cairo/map/ShardedMapCursor.java 63 92 68.48%
🔵 io/questdb/griffin/engine/functions/groupby/SumFloatGroupByFunction.java 13 18 72.22%
🔵 io/questdb/griffin/engine/groupby/GroupByNotKeyedRecordCursorFactory.java 12 16 75.00%
🔵 io/questdb/cairo/map/FastMapValue.java 9 12 75.00%
🔵 io/questdb/griffin/engine/groupby/GroupByMergeShardJob.java 16 20 80.00%
🔵 io/questdb/griffin/engine/groupby/vect/GroupByRecordCursorFactory.java 5 6 83.33%
🔵 io/questdb/griffin/engine/groupby/SimpleMapValue.java 7 8 87.50%
🔵 io/questdb/griffin/engine/table/AsyncGroupByNotKeyedRecordCursorFactory.java 45 51 88.24%
🔵 io/questdb/griffin/SqlCodeGenerator.java 104 116 89.66%
🔵 io/questdb/cairo/map/FastMapVarSizeRecord.java 185 201 92.04%
🔵 io/questdb/griffin/engine/table/AsyncGroupByNotKeyedRecordCursor.java 85 91 93.41%
🔵 io/questdb/griffin/engine/groupby/GroupByRecordCursorFactory.java 15 16 93.75%
🔵 io/questdb/griffin/engine/table/AsyncGroupByRecordCursorFactory.java 75 80 93.75%
🔵 io/questdb/griffin/engine/table/AsyncGroupByNotKeyedAtom.java 53 56 94.64%
🔵 io/questdb/cairo/RecordSinkFactory.java 209 218 95.87%
🔵 io/questdb/griffin/model/QueryModel.java 23 24 95.83%
🔵 io/questdb/griffin/engine/table/AsyncGroupByRecordCursor.java 152 160 95.00%
🔵 io/questdb/cairo/map/FastMapFixedSizeRecord.java 114 120 95.00%
🔵 io/questdb/griffin/engine/table/AsyncGroupByAtom.java 169 173 97.69%
🔵 io/questdb/griffin/engine/groupby/GroupByUtils.java 135 136 99.26%
🔵 io/questdb/cairo/map/FastMap.java 130 131 99.24%
🔵 io/questdb/griffin/engine/functions/groupby/FirstNotNullLongGroupByFunction.java 1 1 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/FirstShortGroupByFunction.java 3 3 100.00%
🔵 io/questdb/MessageBusImpl.java 11 11 100.00%
🔵 io/questdb/griffin/engine/table/AsyncFilteredNegativeLimitRecordCursor.java 1 1 100.00%
🔵 io/questdb/PropServerConfiguration.java 21 21 100.00%
🔵 io/questdb/cairo/sql/async/PageFrameSequence.java 8 8 100.00%
🔵 io/questdb/cairo/ArrayColumnTypes.java 2 2 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/VwapDoubleGroupByFunction.java 10 10 100.00%
🔵 io/questdb/griffin/engine/table/VirtualRecordCursorFactory.java 13 13 100.00%
🔵 io/questdb/griffin/engine/table/AsyncFilteredRecordCursor.java 1 1 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/FirstFloatGroupByFunction.java 3 3 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/FirstDoubleGroupByFunction.java 3 3 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/FirstNotNullTimestampGroupByFunction.java 1 1 100.00%
🔵 io/questdb/griffin/engine/table/SortedSymbolIndexRecordCursorFactory.java 1 1 100.00%
🔵 io/questdb/cutlass/http/HttpConnectionContext.java 1 1 100.00%
🔵 io/questdb/griffin/engine/table/AsyncFilterAtom.java 3 3 100.00%
🔵 io/questdb/griffin/engine/PerWorkerLocks.java 2 2 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/FirstNotNullGeoHashGroupByFunctionFactory.java 4 4 100.00%
🔵 io/questdb/griffin/engine/table/LatestByValuesIndexedFilteredRecordCursorFactory.java 1 1 100.00%
🔵 io/questdb/std/bytes/Bytes.java 1 1 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/FirstNotNullDateGroupByFunction.java 1 1 100.00%
🔵 io/questdb/griffin/engine/orderby/LimitedSizeLongTreeChain.java 7 7 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/LastNotNullSymbolGroupByFunction.java 1 1 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/FirstNotNullIPv4GroupByFunctionFactory.java 1 1 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/LastNotNullTimestampGroupByFunction.java 1 1 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/FirstGeoHashGroupByFunctionLong.java 5 5 100.00%
🔵 io/questdb/griffin/engine/table/FilterOnValuesRecordCursorFactory.java 3 3 100.00%
🔵 io/questdb/cairo/sql/async/PageFrameReduceJob.java 1 1 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/AvgDoubleGroupByFunction.java 8 8 100.00%
🔵 io/questdb/griffin/engine/window/WindowRecordCursorFactory.java 1 1 100.00%
🔵 io/questdb/cairo/DefaultCairoConfiguration.java 4 4 100.00%
🔵 io/questdb/griffin/engine/table/DataFrameRecordCursorFactory.java 1 1 100.00%
🔵 io/questdb/griffin/BasePlanSink.java 4 4 100.00%
🔵 io/questdb/cairo/map/FastMapCursor.java 10 10 100.00%
🔵 io/questdb/griffin/engine/table/FilteredRecordCursorFactory.java 1 1 100.00%
🔵 io/questdb/cairo/CairoEngine.java 1 1 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/LastNotNullLongGroupByFunction.java 1 1 100.00%
🔵 io/questdb/ServerMain.java 2 2 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/FirstNotNullIntGroupByFunction.java 1 1 100.00%
🔵 io/questdb/griffin/engine/table/SelectedRecordCursorFactory.java 1 1 100.00%
🔵 io/questdb/tasks/GroupByMergeShardTask.java 15 15 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/LastNotNullIntGroupByFunction.java 1 1 100.00%
🔵 io/questdb/PropertyKey.java 4 4 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/FirstNotNullCharGroupByFunction.java 1 1 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/LastNotNullDateGroupByFunction.java 1 1 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/FirstLongGroupByFunction.java 3 3 100.00%
🔵 io/questdb/cairo/sql/async/PageFrameReduceTask.java 10 10 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/LastGeoHashGroupByFunctionFactory.java 4 4 100.00%
🔵 io/questdb/griffin/engine/table/AsyncJitFilteredRecordCursorFactory.java 5 5 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/HaversineDistDegreeGroupByFunction.java 5 5 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/SumDoubleGroupByFunction.java 6 6 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/LastNotNullGeoHashGroupByFunctionFactory.java 4 4 100.00%
🔵 io/questdb/cairo/TableWriter.java 1 1 100.00%
🔵 io/questdb/griffin/engine/table/FilterOnExcludedValuesRecordCursorFactory.java 2 2 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/FirstNotNullSymbolGroupByFunction.java 1 1 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/FirstByteGroupByFunction.java 3 3 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/FirstGeoHashGroupByFunctionByte.java 5 5 100.00%
🔵 io/questdb/griffin/SqlOptimiser.java 72 72 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/CountLongConstGroupByFunction.java 5 5 100.00%
🔵 io/questdb/griffin/engine/table/LatestByValueIndexedFilteredRecordCursor.java 1 1 100.00%
🔵 io/questdb/griffin/engine/functions/eq/EqLong256StrFunctionFactory.java 5 5 100.00%
🔵 io/questdb/griffin/engine/groupby/GroupByFunctionsUpdaterFactory.java 19 19 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/LastNotNullCharGroupByFunction.java 1 1 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/FirstGeoHashGroupByFunctionShort.java 5 5 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/FirstDateGroupByFunction.java 3 3 100.00%
🔵 io/questdb/griffin/engine/table/AsyncFilteredRecordCursorFactory.java 5 5 100.00%
🔵 io/questdb/griffin/engine/functions/groupby/FirstGeoHashGroupByFunctionInt.java 5 5 100.00%
🔵 io/questdb/cairo/map/MapFactory.java 1 1 100.00%
🔵 io/questdb/cairo/sql/RecordCursorFactory.java 1 1 100.00%
🔵 io/questdb/griffin/engine/groupby/AbstractSampleByRecordCursorFactory.java 1 1 100.00%
🔵 io/questdb/griffin/SqlUtil.java 9 9 100.00%

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Performance Performance improvements SQL Issues or changes relating to SQL execution

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants