perf(sql): speed up parallel GROUP BY and Top K queries#6754
perf(sql): speed up parallel GROUP BY and Top K queries#6754bluestreak01 merged 56 commits intomasterfrom
Conversation
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughThis pull request introduces an unordered, head-of-line-blocking-free frame processing pathway for parallel GROUP BY and top-K operations. It adds new async reduction components (UnorderedPageFrameSequence, UnorderedPageFrameReduceTask/Reducer, UnorderedPageFrameReduceJob), consolidates filter configuration into AsyncFilterContext, refactors async group-by and top-K factories to use the new sequencing model, updates First/Last aggregate functions to handle out-of-order row processing, and removes explicit PostgreSQL driver loading from benchmarks. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@mtopolnik there is a number of downsides in reusing the same reduce job + task + sequence. The thing is that |
4152fb5 to
03f784b
Compare
- Free frameFilteredMemoryRecords and ownerPageFrameFiltered record in AsyncGroupByNotKeyedAtom.close() to match AsyncGroupByAtom - Free ownerRecordA in AsyncTopKAtom.clear() alongside ownerRecordB - Fix copyright year in UnorderedPageFrameReduceTask (2024 -> 2026) - Remove redundant circuit breaker init in reduceLocally() Co-Authored-By: Claude Opus 4.6 <[email protected]>
Wrap the dispatch loop in try/finally so that queuedCount is always set, even when reduceLocally() throws. Without this, await() in close() sees queuedCount == 0 and returns immediately, allowing reset() to free the frameAddressCache while worker threads may still be processing in-flight tasks. Also add frameSequenceId to UnorderedPageFrameReduceTask with an assert-level validation in consumeQueue() to detect stale tasks from a previous query execution (defense-in-depth, matching the pattern used by the ordered PageFrameReduceJob). Remove unnecessary volatile on queuedCount since it is only accessed by the owner thread. Co-Authored-By: Claude Opus 4.6 <[email protected]>
- Replace assert-only stale task check in UnorderedPageFrameReduceJob with a runtime guard that logs and skips the task without touching the latch or error state. - Remove unused toTop() from UnorderedPageFrameSequence. - Call setError() in reduceLocally() before re-throwing so that the error state is consistent with the worker path. Co-Authored-By: Claude Opus 4.6 <[email protected]>
The stealWork() method can process tasks from any UnorderedPageFrameSequence, not just the owner's, which re-initializes localRecord for a foreign symbol table. Add a comment warning callers not to rely on localRecord state after the call. Co-Authored-By: Claude Opus 4.6 <[email protected]>
Work-stealing via consumeQueue may re-initialize the owner's circuit breaker wrapper with a foreign sequence's delegate. Restore it after each stealWork() call to prevent false cancellation of the owner's query. Co-Authored-By: Claude Opus 4.6 <[email protected]>
The three async atom classes (AsyncGroupByAtom, AsyncGroupByNotKeyedAtom, AsyncTopKAtom) shared ~170 lines of identical filter and memory-pool infrastructure: 16 fields, 12 getter methods, constructor allocation, close/clear logic, and filter initialization. Extract the shared code into a new AsyncFilterContext composition helper. Each atom now holds a single filterCtx field and delegates filter-related operations to it. Call sites in the 3 factory classes and 3 cursor classes use atom.getFilterContext().getXxx() to access the shared methods. Co-Authored-By: Claude Opus 4.6 <[email protected]>
JDBC-based benchmark that measures query latency and throughput for GROUP BY and TOP-K queries under varying concurrency (1, 2, 4, 8). Creates a skewed-partition table with 500 hourly partitions: 10 large (500K rows) interleaved among 490 small (10K rows) to expose head-of-line blocking in the ordered PageFrameSequence. Co-Authored-By: Claude Opus 4.6 <[email protected]>
JDBC 4.0 auto-discovers drivers via ServiceLoader, so the explicit
Class.forName("org.postgresql.Driver") calls are unnecessary. Also
revert the module-info requires added in the previous commit since
there is no longer any reflection on the PG driver class.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
|
@puzpuzpuz I did a new implementation from scratch and force-pushed it. I think this tracks closer to the ideas you laid out in the issue. |
core/src/main/java/io/questdb/griffin/engine/table/AsyncGroupByAtom.java
Outdated
Show resolved
Hide resolved
core/src/main/java/io/questdb/griffin/engine/table/AsyncGroupByNotKeyedAtom.java
Outdated
Show resolved
Hide resolved
core/src/main/java/io/questdb/griffin/engine/table/AsyncFilterUtils.java
Outdated
Show resolved
Hide resolved
...c/main/java/io/questdb/griffin/engine/functions/groupby/FirstNotNullUuidGroupByFunction.java
Show resolved
Hide resolved
When reduceLocally() throws, call cancel() on the frame sequence so that other worker threads see it as inactive and skip remaining tasks instead of wasting CPU. Co-Authored-By: Claude Opus 4.6 <[email protected]>
When parallel GROUP BY merges results from different workers, merge() checked only destRowId == LONG_NULL to detect an uninitialized destination. But computeFirst() stores a valid rowId even when the value is null, so a shard where all rows have null values gets a real rowId. When a second shard with non-null values (but a higher rowId) merges into it, the condition srcRowId < destRowId fails and destRowId != LONG_NULL, so the non-null value is incorrectly discarded. The fix adds a dest-value null check to the merge condition in all 22 affected merge() methods across 14 FirstNotNull* GroupByFunction classes (char, date, double, float, int, long, str, symbol, timestamp, uuid, varchar, geohash×4, IPv4, decimal×6). The Array variant was already correct. Co-Authored-By: Claude Opus 4.6 <[email protected]>
[PR Coverage check]😍 pass : 744 / 853 (87.22%) file detail
|
Closes #6655
Summary
PageFrameSequencethat usesSOUnboundedCountDownLatchinstead of ordered collection viacollectSubSeq, eliminating head-of-line blocking for reducers that don't need ordered resultsdispatchAllAndAwait()call in each cursorstoreError()into a dedicatedCairoExceptioninstance to prevent thread-local exception recycling from corrupting stored error dataBenchmark results
JDBC client benchmark (
HolBlockingBenchmark) with a skewed-partition table designed to provoke HOL blocking: 50 hourly partitions (45 × 1K rows + 5 × 1M rows, ~5M rows total). Large partitions appear every 10th hour (1000:1 size ratio). Queries use expensive per-row math (sqrt,log) to amplify the processing time difference between large and small partitions. The benchmark also reports per-iteration "spread" (max − min latency across concurrent threads), which directly measures how much the slowest thread lags behind the fastest in each batch.Run with
mvn -pl benchmarks package -DskipTests && java -cp benchmarks/target/benchmarks.jar org.questdb.HolBlockingBenchmark.Raw numbers
master
This PR
Analysis
Per-iteration spread (max − min across threads)
Spread measures how much the slowest thread lags behind the fastest in each batch — a direct proxy for HOL blocking.
At concurrency 4 the reductions are smaller (24–51%).
Tail latency (p99/max)
Latency predictability (p99/p50 ratio)
Throughput at concurrency 8
GroupBy throughput is slightly lower on the PR at concurrency 8.
Throughput scaling from concurrency 1 to 2
On master, GroupBy and GroupByNotKeyed throughput drops when adding a second concurrent thread:
The PR scales monotonically; master regresses at concurrency 2 before recovering at concurrency 4.
Single-thread regression
GroupBy shows a 34% single-thread regression (14.2ms vs 10.6ms). GroupByFiltered, GroupByNotKeyed, and TopK are within noise. The GroupBy regression does not appear at concurrency 8 (92.9ms vs 90.0ms, +3%), suggesting the single-thread ordered path on master benefits from some caching or prefetch effect that the unordered path does not exploit. This warrants further investigation.
Test plan
ParallelTopKFuzzTest— includes new tests for fault tolerance (NPE in filter), query timeout, empty table, and concurrent execution (8 threads x 50 iterations)ParallelGroupByFuzzTest— verifies error message assertions match the new error path throughstoreError()/throwStoredError()PageFrameSequencebehavior (filter-only, window join paths remain unchanged)🤖 Generated with Claude Code