perf(parquet): speed up parquet writing#6735
Conversation
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the
WalkthroughImplements streaming Parquet export with partition release and SIMD optimizations. Adds explicit partition release mechanisms via Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In
`@core/src/main/java/io/questdb/griffin/engine/table/BwdTableReaderPageFrameCursor.java`:
- Around line 157-165: The releaseOpenPartitions() method in
BwdTableReaderPageFrameCursor must guard against being called before next() by
checking highestOpenPartitionIndex for an uninitialized value; add an early
return when highestOpenPartitionIndex is -1 (or otherwise <=
reenterPartitionIndex) so the loop does not iterate with invalid indices, i.e.
perform a conditional check at the start of releaseOpenPartitions() and only run
the for-loop that calls reader.closePartitionByIndex(i) when
highestOpenPartitionIndex >= 0 and > reenterPartitionIndex, then leave
highestOpenPartitionIndex set to reenterPartitionIndex.
In
`@core/src/test/java/io/questdb/test/griffin/engine/table/parquet/StreamingParquetBenchmarkTest.java`:
- Around line 223-273: The test calls ThreadMXBean.getCurrentThreadCpuTime()
unguarded which can return -1 or be unsupported; modify the ThreadMXBean usage
in StreamingParquetBenchmarkTest so you first call
threadMXBean.isCurrentThreadCpuTimeSupported() and only call
getCurrentThreadCpuTime() (and compute cpuTimeNs/cpuUtilization) when
supported—also try to enable with threadMXBean.setThreadCpuTimeEnabled(true) if
supported but disabled; ensure when unsupported you skip CPU-time math (set
cpuTimeNs/cpuUtilization to a sentinel or skip emitting CPU metrics) and keep
elapsed time/bytes/rows calculations intact so
totalSum/totalBytesRead/totalRows/frameCount logic remains unchanged.
🧹 Nitpick comments (6)
core/src/main/java/io/questdb/cairo/vm/MemoryCMRImpl.java (1)
174-174: Guardmadvisecalls for consistency and clarity.Lines 174 and 188 call
ff.madvise()unconditionally withmadviseOpts, even when it equals-1. WhileFilesFacadeImpl.madvise()safely ignores-1(checksadvise > -1), this is inconsistent with line 183, which guards the mapping call withif (madviseOpts != -1). Add guards for clarity:Suggested fix
size = newSize; - ff.madvise(pageAddress, size, madviseOpts); + if (madviseOpts != -1) { + ff.madvise(pageAddress, size, madviseOpts); + }And in
map()at line 188:this.pageAddress = TableUtils.mapRO(ff, fd, size, memoryTag); } - ff.madvise(pageAddress, size, madviseOpts); + if (madviseOpts != -1) { + ff.madvise(pageAddress, size, madviseOpts); + }core/src/main/java/io/questdb/cairo/sql/PageFrameCursor.java (1)
147-149: Consider adding Javadoc forreleaseOpenPartitions().The
setStreamingModemethod has comprehensive documentation, butreleaseOpenPartitions()lacks any description of its purpose, when to call it, or what partitions it releases. For API consistency and discoverability, consider adding similar documentation.📝 Suggested documentation
+ /** + * Releases partitions that have already been processed and are no longer needed. + * This is useful during streaming operations to reduce memory pressure by allowing + * the underlying TableReader to close partitions that won't be accessed again. + * <p> + * Default implementation is a no-op. Subclasses backed by TableReader should override + * this method to release partitions up to (but not including) the current partition. + */ default void releaseOpenPartitions() { // no-op by default }core/src/main/java/io/questdb/griffin/engine/table/FwdTableReaderPageFrameCursor.java (1)
156-163: Verify behavior whenreleaseOpenPartitions()is called beforenext().If
releaseOpenPartitions()is called beforenext(),reenterPartitionIndexmay be uninitialized (default 0). In this case, the loopfor (int i = 0; i < 0; i++)would not execute, which is safe. However, for consistency with the backward cursor and defensive coding, consider adding a guard or documenting the expected usage.The current implementation is safe because the loop won't execute when
lowestOpenPartitionIndex == reenterPartitionIndex == 0, but adding explicit documentation or a guard would make the contract clearer:`@Override` public void releaseOpenPartitions() { + // Close all partitions from lowestOpenPartitionIndex up to (but not including) current partition + // Safe to call before next() - loop simply won't execute for (int i = lowestOpenPartitionIndex; i < reenterPartitionIndex; i++) { reader.closePartitionByIndex(i); } lowestOpenPartitionIndex = reenterPartitionIndex; }core/src/test/java/io/questdb/test/griffin/engine/table/parquet/StreamingParquetBenchmarkTest.java (3)
117-166: Allow re-running the benchmark without rebuilding the table.
ensureTableExistsalways creates and inserts, so re-running in the same DB root will fail or redo a long insert. Consider checking for an existing table and skipping data generation (or truncating) when it already exists.
168-191: Consider soft-failing when cache drop isn’t permitted.Throwing on non-root or non-Linux makes the benchmark unusable in those environments. Logging a warning and continuing (or making this opt-in) keeps the benchmark runnable while still documenting limitations.
369-416: Release open partitions per frame to match streaming semantics.The streaming benchmark keeps partitions open for the full scan. Calling
releaseOpenPartitions()(or enabling streaming mode) after each frame keeps the open-partition count bounded and better reflects production streaming behavior.♻️ Suggested release after each frame
@@ totalRows += frameRowCount; frameCount++; + pageFrameCursor.releaseOpenPartitions(); }
core/src/main/java/io/questdb/griffin/engine/table/BwdTableReaderPageFrameCursor.java
Show resolved
Hide resolved
...rc/test/java/io/questdb/test/griffin/engine/table/parquet/StreamingParquetBenchmarkTest.java
Outdated
Show resolved
Hide resolved
|
@CodeRabbit any updates? Find bugs pls |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain
🏁 Script executed: Length of output: 9708 🏁 Script executed: Length of output: 1088 🏁 Script executed: Length of output: 62016 🏁 Script executed: Length of output: 9087 🏁 Script executed: Length of output: 3597 🏁 Script executed: Length of output: 16501 🏁 Script executed: Length of output: 10234 🏁 Script executed: Length of output: 509 🏁 Script executed: Length of output: 2045 🏁 Script executed: Length of output: 6821 🏁 Script executed: Length of output: 101 🏁 Script executed: Length of output: 4357 🏁 Script executed: Length of output: 110 🏁 Script executed: Length of output: 8872
🔴 Critical Issues1. Thread-Safety Bug: Non-Volatile
|
| Severity | Count | Must Fix Before Merge |
|---|---|---|
| Critical | 5 | ✅ Yes |
| High | 3 | ✅ Yes |
| Moderate | 2 |
Top 3 Must-Fix:
- Make
streamingModevolatile (thread-safety) - Reset
streamingModein finally blocks (state cleanup) - Investigate
MmapCache.isSingleUse()semantic change (potential undefined behavior)
The enterprise CI failure strongly suggests these aren't theoretical issues. I recommend:
- Add thread-safety annotations/fixes
- Add comprehensive error handling
- Write stress tests for concurrent partition access
- Review all callers of the new NoCache mapping APIs
🧠 Learnings used
Learnt from: jerrinot
Repo: questdb/questdb PR: 0
File: :0-0
Timestamp: 2026-02-04T11:18:36.475Z
Learning: For PRs by jerrinot, always use a thorough and critical review mode: question design decisions, challenge assumptions, scrutinize implementations for edge cases and potential issues, and avoid superficial validation. They prefer hard-core reviews that push back on the code.
[PR Coverage check]😍 pass : 1991 / 2093 (95.13%) file detail
|
|
@coderabbitai I've reviewed each of your comments against the actual codebase. Here's my assessment: Critical Issues Assessment1. Thread-Safety Bug: Non-Volatile
|
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain
🏁 Script executed: Length of output: 14792 🏁 Script executed: Length of output: 1775 🏁 Script executed: Length of output: 1505 🏁 Script executed: Length of output: 4576
My Mistakes1. Thread-Safety - You're Correct ✅I completely missed QuestDB's single-owner pooling model. The scripts confirm:
2. State Cleanup - You're Correct ✅I missed the try-with-resources pattern at line 138: try (TableReader reader = cairoEngine.getReader(tableToken)) {
reader.setStreamingMode(true);
3. MmapCache.isSingleUse() - You're Correct ✅The change to 4. Null Guards - You're Correct ✅
5-8. Other Issues - You're Correct ✅All theoretical scenarios without evidence in the actual codebase design. The Real IssueHowever, there's one objective fact I'd like to understand: Enterprise CI build 212862 failed (per questdb-butler's comment). That's not theoretical - it's a real failure in a production-like environment. Given that all my static analysis concerns were off-base, the CI failure likely points to something I didn't catch. Could you share:
I clearly need to better understand QuestDB's architecture before reviewing. Thank you for the detailed corrections - they're valuable learning points about the codebase's design patterns. ✏️ Learnings added
|
Summary
This PR speeds up Parquet export through two optimizations:
madvisehints to prevent page cache exhaustion under memory pressureChanges
SIMD Encoding (Rust)
portable_simdfeature for cross-platform SIMD supporti32_slice_to_page_simdandi64_slice_to_page_simdfunctions that vectorize definition level encoding for nullable columnsStreaming Mode (Java)
Introduces streaming mode for
TableReaderthat applies two Linux kernel hints to memory-mapped partition files:MADV_SEQUENTIALMADV_DONTNEEDWhy both hints are needed:
SEQUENTIALonlySEQUENTIAL+DONTNEEDImplementation:
MADV_SEQUENTIALapplied inMemoryCMRImpl.map()andTableUtilswhen opening column filesMADV_DONTNEEDapplied inclosePartitionColumn()andMappedMemoryPartitionDescriptor.clear()beforemunmap()MmapCacheviammapNoCache()so each partition mapping is independent and fully releasableTableReader.setStreamingMode()andPageFrameCursor.setStreamingMode()APIsSQLSerialParquetExporter,CopyExportRequestTask) automatically enable streaming modePerformance Results
SIMD Encoding (no memory pressure)
LZ4RAW sees the biggest gains (~16% faster throughput, 20% less CPU). Uncompressed is I/O-bound so wall time is similar, but CPU usage dropped ~17%.
madvise Hints (under memory pressure)
Test Environment:
Test Methodology:
stress-ngto consume 28 GB, leaving ~4 GB free (CPU stress disabled)sync && echo 3 > /proc/sys/vm/drop_cachesThis sequence ensures cold-cache conditions under realistic memory pressure, simulating a production server with other workloads consuming memory.
MADV_SEQUENTIALonlyMADV_SEQUENTIAL+MADV_DONTNEEDUnder memory pressure, the combination of both hints recovers 94% of baseline performance.
The benchmark export scirpt is attached:
bench_parquet_export.sh