perf(sql): speed up ASOF and WINDOW JOINs for large right-hand-side tables#6822
perf(sql): speed up ASOF and WINDOW JOINs for large right-hand-side tables#6822bluestreak01 merged 5 commits intomasterfrom
Conversation
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@CodeRabbit pls review this PR critically |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 56312 🏁 Script executed: Length of output: 13782 🏁 Script executed: Length of output: 12741 🏁 Script executed: Length of output: 35889
1. Timestamp Conversion Precision Loss (High Severity)Lines 289 (AbstractAsOfJoinFastRecordCursor.java) and 128+245 (WindowJoinTimeFrameHelper.java): final long nativeTimestamp = slaveTimestampScale == 1 ? masterTimestamp : masterTimestamp / slaveTimestampScale;Integer division discards precision when converting nano→micro timestamps. For example:
Impact: When master uses TIMESTAMP (nanos) and slave uses TIMESTAMP_MICRO, Fix: Use proper rounding or ceiling instead of truncating division: final long nativeTimestamp = slaveTimestampScale == 1 ? masterTimestamp
: (masterTimestamp + slaveTimestampScale - 1) / slaveTimestampScale;2. Incorrect Prevailing Assumption After seekEstimate (Critical - Correctness Bug)Lines 129-135 (WindowJoinTimeFrameHelper.findRowLo) and 246-252 (findRowLoWithPrevailing): if (recordPrevailing && timeFrame.getFrameIndex() >= 0 && timeFrameCursor.open() > 0) {
// The seeked frame has ceiling <= timestampLo, so all its rows
// are < timestampLo. Record its last row as prevailing candidate.
prevailingFrameIndex = timeFrame.getFrameIndex();
prevailingRowIndex = timeFrame.getRowHi() - 1;
}The binary search in Impact: Recording such rows as "prevailing" (i.e., Fix: After if (recordPrevailing && timeFrame.getFrameIndex() >= 0 && timeFrameCursor.open() > 0) {
long lastRowTs = scaleTimestamp(/* read last row timestamp */, scale);
if (lastRowTs < timestampLo) {
prevailingFrameIndex = timeFrame.getFrameIndex();
prevailingRowIndex = timeFrame.getRowHi() - 1;
}
}3. Off-By-One in maxTsHi Calculation (Medium Severity)Line 61 (ConcurrentTimeFrameCursor.java): final long maxTsHi = i < partitionCount - 2 ? reader.getPartitionTimestampByIndex(i + 1) : Long.MAX_VALUE;For a table with 3 partitions (indices 0, 1, 2):
Impact: The second-to-last partition gets Fix: Change condition to final long maxTsHi = i < partitionCount - 1 ? reader.getPartitionTimestampByIndex(i + 1) : Long.MAX_VALUE;Missing Test CoverageThe git diff shows no test file additions, yet the PR description claims 199 lines added. The parameterized tests run with mixed MICRO/NANO timestamp types, but I don't see specific coverage for:
Request: Add targeted unit tests demonstrating these edge cases work correctly after the fixes above. These issues undermine both the correctness and performance goals of the optimization. Issue 🧠 Learnings used |
@coderabbitai please find responses to your comments below. Claim 1: "Timestamp Conversion Precision Loss"Invalid. The suggested fix (ceiling division) would be actively harmful: rounding up could cause The same truncating-division pattern is already used in Claim 2: "Incorrect Prevailing Assumption After seekEstimate"Invalid. This misunderstands partition ceilings. Partition ceilings are exclusive upper bounds. So when Claim 3: "Off-By-One in maxTsHi Calculation"Pre-existing code, not introduced by this PR. The last two partitions get If worth fixing, it's a one-character change ( Claim 4: "Missing Test Coverage"Wrong. The diff includes 199 lines of test additions across three files: |
|
Suggestion 3 is addressed in f2af0eb |
[PR Coverage check]😍 pass : 14 / 14 (100.00%) file detail
|
The optimization mirrors what HORIZON JOIN already does in
HorizonJoinTimeFrameHelper(added in #6814). Without it, the first lookup linearly scans through all slave time frames preceding the master's first timestamp, which is O(N) in the number of frames. WithseekEstimate(), the initial positioning is O(log P) where P is the number of partitions.Implementation
AbstractAsOfJoinFastRecordCursor.openSlaveFrame()now callsTimeFrameCursor.seekEstimate()on the first slave frame lookup (whenslaveFrameIndex == -1) to binary-search directly to the target partition instead of linearly scanning all preceding frames. This benefits all ASOF JOIN and LT JOIN fast-path factories.WindowJoinTimeFrameHelper.findRowLo()andfindRowLoWithPrevailing()now callseekEstimate()on the first lookup (whenbookmarkedFrameIndex == -1) with the same partition-skipping behavior. When prevailing is enabled, the seeked frame's last row is recorded as the prevailing candidate. This benefits both sync and async WINDOW JOIN factories.