feat(core): optimize parquet partition read with late materialization, zero-copy page reading, and use raw array encoding#6675
Conversation
|
@CodeRabbit I made more changes, pls review again! |
|
Please note that the Enterprise CI is still failing (Build 210949 on commit Starting the review of your latest changes! ✅ Actions performedReview triggered.
|
I recall this was a concern when we chose this as the default. The files in Cold Storage are meant to be readable by 3rd-party SW. |
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (7)
core/src/main/java/io/questdb/PropServerConfiguration.java (1)
1655-1665:⚠️ Potential issue | 🟡 MinorAdd lower-bound validation for interval knobs.
Non‑positive values for bracket depth or thresholds could yield surprising behavior even though the relative check passes. Consider guarding for>= 1to fail fast.🔧 Suggested validation addition
this.sqlIntervalMaxBracketDepth = getInt(properties, env, PropertyKey.CAIRO_SQL_INTERVAL_MAX_BRACKET_DEPTH, 8); this.sqlIntervalMaxIntervalsAfterMerge = getInt(properties, env, PropertyKey.CAIRO_SQL_INTERVAL_MAX_INTERVALS_AFTER_MERGE, 1024); this.sqlIntervalIncrementalMergeThreshold = getInt(properties, env, PropertyKey.CAIRO_SQL_INTERVAL_INCREMENTAL_MERGE_THRESHOLD, 256); + if (this.sqlIntervalMaxBracketDepth < 1) { + throw new ServerConfigurationException( + PropertyKey.CAIRO_SQL_INTERVAL_MAX_BRACKET_DEPTH.getPropertyPath() + + " (" + this.sqlIntervalMaxBracketDepth + ") must be >= 1" + ); + } + if (this.sqlIntervalIncrementalMergeThreshold < 1) { + throw new ServerConfigurationException( + PropertyKey.CAIRO_SQL_INTERVAL_INCREMENTAL_MERGE_THRESHOLD.getPropertyPath() + + " (" + this.sqlIntervalIncrementalMergeThreshold + ") must be >= 1" + ); + } if (this.sqlIntervalMaxIntervalsAfterMerge <= this.sqlIntervalIncrementalMergeThreshold) { throw new ServerConfigurationException( PropertyKey.CAIRO_SQL_INTERVAL_MAX_INTERVALS_AFTER_MERGE.getPropertyPath()core/src/main/java/io/questdb/griffin/engine/table/AsyncGroupByNotKeyedAtom.java (1)
172-188:⚠️ Potential issue | 🟠 MajorClose leaks filtered memory records.
frameFilteredMemoryRecordsandownerPageFrameFilteredNoRandomAccessMemoryRecordare newly added but never freed inclose(), which can retain native memory across cursor lifetimes.🧹 Proposed fix
Misc.free(ownerAllocator); Misc.freeObjList(perWorkerAllocators); Misc.free(ownerMapValue); Misc.freeObjList(perWorkerMapValues); + Misc.freeObjList(frameFilteredMemoryRecords); + Misc.free(ownerPageFrameFilteredNoRandomAccessMemoryRecord); if (perWorkerGroupByFunctions != null) { for (int i = 0, n = perWorkerGroupByFunctions.size(); i < n; i++) { Misc.freeObjList(perWorkerGroupByFunctions.getQuick(i)); } }core/src/main/java/io/questdb/griffin/engine/join/AsyncWindowJoinRecordCursorFactory.java (5)
959-978:⚠️ Potential issue | 🟠 MajorPotential slot leak if exception occurs before try block.
The
slotIdis acquired at line 960, but thetryblock that releases it infinallydoesn't start until line 978. If any operation between these points throws (e.g.,populateFrameMemory(),record.init()), the slot will never be released.🔧 Proposed fix: move try block earlier to cover slot acquisition
final boolean owner = stealingFrameSequence != null && stealingFrameSequence == task.getFrameSequence(); final int slotId = atom.maybeAcquire(workerId, owner, circuitBreaker); + try { final PageFrameMemory frameMemory; final boolean isParquetFrame = task.isParquetFrame(); final boolean useLateMaterialization = atom.shouldUseLateMaterialization(slotId, isParquetFrame); if (useLateMaterialization) { frameMemory = task.populateFrameMemory(atom.getFilterUsedColumnIndexes()); } else { frameMemory = task.populateFrameMemory(); } record.init(frameMemory); final DirectLongList rows = task.getFilteredRows(); rows.clear(); final Function filter = atom.getMasterFilter(slotId); final CompiledFilter compiledFilter = atom.getCompiledMasterFilter(); - try { if (compiledFilter == null || frameMemory.hasColumnTops()) {
1108-1127:⚠️ Potential issue | 🟠 MajorSame slot leak pattern as
filterAndAggregate.Move the
tryblock to immediately after slot acquisition to ensure proper cleanup.
1265-1284:⚠️ Potential issue | 🟠 MajorSame slot leak pattern as other filter methods.
Move the
tryblock to immediately after slot acquisition.
1421-1440:⚠️ Potential issue | 🟠 MajorSame slot leak pattern as other filter methods.
Move the
tryblock to immediately after slot acquisition.
1566-1585:⚠️ Potential issue | 🟠 MajorSame slot leak pattern as other filter methods.
Move the
tryblock to immediately after slot acquisition.
🤖 Fix all issues with AI agents
In `@core/rust/qdbr/parquet2/src/read/page/slice_reader.rs`:
- Around line 112-114: The uncompressed_page_size read from page_header (let
uncompressed_page_size = page_header.uncompressed_page_size.try_into()?) must be
guarded by the same max_page_size cap used for compressed sizes to avoid
oversized allocations on decode; after converting uncompressed_page_size, clamp
it against max_page_size (same constant/variable used elsewhere) and use the
clamped value for subsequent allocations/decoding logic in the PageType/page
handling code to prevent header-driven memory exhaustion.
- Around line 44-107: SlicePageReader currently only uses the start from
ColumnChunkMetaData::byte_range() and can read past the column chunk into
surrounding data; update the struct to include an end (column_end) computed from
byte_range() (start + length) in SlicePageReader::new and use that boundary in
all checks (e.g., when validating col_start, computing remaining/slicing in
read_next, and checking self.offset + read_size) so reads never exceed
column_end; adjust any references to self.data[self.offset..] and bounds checks
in read_next (and any other methods) to enforce self.offset <= column_end and
self.offset + read_size <= column_end.
In
`@core/src/main/java/io/questdb/griffin/engine/join/AsyncWindowJoinRecordCursorFactory.java`:
- Around line 1597-1601: The call to
task.populateRemainingColumns(atom.getFilterUsedColumnIndexes(), rows, true)
followed by record.init(frameMemory) is currently executed unconditionally when
useLateMaterialization is true, which decodes remaining columns even if no rows
pass the filter; move the populateRemainingColumns(...)/record.init(frameMemory)
sequence inside the existing if (filteredRowCount > 0 &&
!atom.isSkipAggregation()) block so it only runs when there are filtered rows
and aggregation is not skipped, keeping the same useLateMaterialization guard
around it (refer to useLateMaterialization, task.populateRemainingColumns,
atom.getFilterUsedColumnIndexes(), rows, record.init, frameMemory,
filteredRowCount, atom.isSkipAggregation()).
In
`@core/src/main/java/io/questdb/griffin/engine/table/parquet/PartitionDecoder.java`:
- Around line 110-166: Both decodeRowGroupWithRowFilter and
decodeRowGroupWithRowFilterFillNulls pass filteredRows.getAddress() to JNI but
do not ensure the DirectLongList is open; if its address is 0 the native Rust
code will assert and crash. Before calling the native methods check if
filteredRows.getAddress() == 0 and reopen/allocate it (e.g.,
filteredRows.reopen() or the equivalent method on DirectLongList) so
filteredRows.getAddress() returns a non‑zero pointer, then pass that address to
the native calls.
In
`@core/src/test/java/io/questdb/test/griffin/ParquetLateMaterializationFuzzTest.java`:
- Around line 847-998: The test mutates shared config via
node1.setProperty(CAIRO_PARTITION_ENCODER_PARQUET_RAW_ARRAY_ENCODING_ENABLED,
rawArrayEncoding) and must restore the default (true) afterward to avoid
cross-test leakage; wrap the TestUtils.execute(...) call inside a try/finally
(or set the property before and reset it after) in
testLateMaterializationWithArrayColumn so that node1.setProperty(..., true) is
always called in the finally block to restore the default.
🧹 Nitpick comments (1)
core/src/main/java/io/questdb/griffin/engine/join/AsyncWindowJoinFastRecordCursorFactory.java (1)
1229-1255: Consider extracting the repeated late‑materialization setup block.
The same frame‑population + filter application sequence appears across five reducers; a helper would reduce drift and simplify future changes.♻️ Possible extraction (apply similarly to other filter* reducers)
@@ - final PageFrameMemory frameMemory; - final boolean isParquetFrame = task.isParquetFrame(); - final boolean useLateMaterialization = atom.shouldUseLateMaterialization(slotId, isParquetFrame); - if (useLateMaterialization) { - frameMemory = task.populateFrameMemory(atom.getFilterUsedColumnIndexes()); - } else { - frameMemory = task.populateFrameMemory(); - } - record.init(frameMemory); - final DirectLongList rows = task.getFilteredRows(); - rows.clear(); - - final Function filter = atom.getMasterFilter(slotId); - final CompiledFilter compiledFilter = atom.getCompiledMasterFilter(); - if (compiledFilter == null || frameMemory.hasColumnTops()) { - applyFilter(filter, rows, record, frameRowCount); - } else { - applyCompiledFilter(compiledFilter, atom.getBindVarMemory(), atom.getBindVarFunctions(), task); - } - - if (isParquetFrame) { - atom.getSelectivityStats(slotId).update(rows.size(), frameRowCount); - } - if (useLateMaterialization && task.populateRemainingColumns(atom.getFilterUsedColumnIndexes(), rows, true)) { - record.init(frameMemory); - } + final DirectLongList rows = task.getFilteredRows(); + final PageFrameMemory frameMemory = + initFilteredFrame(atom, slotId, task, record, rows, frameRowCount); @@ + private static PageFrameMemory initFilteredFrame( + AsyncWindowJoinFastAtom atom, + int slotId, + PageFrameReduceTask task, + PageFrameMemoryRecord record, + DirectLongList rows, + long frameRowCount + ) { + final boolean isParquetFrame = task.isParquetFrame(); + final boolean useLateMaterialization = atom.shouldUseLateMaterialization(slotId, isParquetFrame); + final PageFrameMemory frameMemory = useLateMaterialization + ? task.populateFrameMemory(atom.getFilterUsedColumnIndexes()) + : task.populateFrameMemory(); + record.init(frameMemory); + rows.clear(); + + final Function filter = atom.getMasterFilter(slotId); + final CompiledFilter compiledFilter = atom.getCompiledMasterFilter(); + if (compiledFilter == null || frameMemory.hasColumnTops()) { + applyFilter(filter, rows, record, frameRowCount); + } else { + applyCompiledFilter(compiledFilter, atom.getBindVarMemory(), atom.getBindVarFunctions(), task); + } + + if (isParquetFrame) { + atom.getSelectivityStats(slotId).update(rows.size(), frameRowCount); + } + if (useLateMaterialization && task.populateRemainingColumns(atom.getFilterUsedColumnIndexes(), rows, true)) { + record.init(frameMemory); + } + return frameMemory; + }Also applies to: 1394-1422, 1562-1590, 1792-1820, 1997-2024
core/src/main/java/io/questdb/griffin/engine/join/AsyncWindowJoinRecordCursorFactory.java
Outdated
Show resolved
Hide resolved
core/src/main/java/io/questdb/griffin/engine/table/parquet/PartitionDecoder.java
Show resolved
Hide resolved
core/src/test/java/io/questdb/test/griffin/ParquetLateMaterializationFuzzTest.java
Show resolved
Hide resolved
Yeah, but in my tests the scan performance with rawEncoding is significantly better. Since this is only used internally within a table partition, I’d to switch to |
|
@coderabbitai I've adresses your comments, please do a comprehensive review again. |
|
Please note that based on the earlier comment chain, the Enterprise CI was failing (Build 210949 on commit I'll focus on:
Starting the comprehensive review now! ✅ Actions performedReview triggered.
|
…' into parquet_filter_late_materization # Conflicts: # core/src/main/resources/io/questdb/bin/darwin-aarch64/libquestdbr.dylib # core/src/main/resources/io/questdb/bin/darwin-x86-64/libquestdbr.dylib # core/src/main/resources/io/questdb/bin/linux-aarch64/libquestdbr.so # core/src/main/resources/io/questdb/bin/linux-x86-64/libquestdbr.so # core/src/main/resources/io/questdb/bin/windows-x86-64/questdbr.dll
7e65d4b to
a24f1b5
Compare
[PR Coverage check]😍 pass : 2565 / 3573 (71.79%) file detail
|
depend #6632
Late Materialization
Implements late materialization optimization for Parquet partitions when parallel executing queries with selective filters.
Instead of decoding all columns upfront, this optimization:
This significantly reduces Parquet decoding overhead for low-selectivity queries.
Performance
OHLC aggregation query on Parquet partition (8 days)
Before(branch
parquet_decode_opt): 600msPatch: 250ms
Zero-Copy Mmap Page Reading
Introduced
SlicePageReaderthat reads Parquet pages directly from the mmap'd byte slice(parquet file already mmaped in jave side), bypassing the previousPageReaderwhich copied page data viaread_to_end()into an intermediateVec<u8>.Array
Default
partitionEncoderParquetRawArrayEncodingtotrue. Partition-to-parquet conversion now uses raw array encoding by default for better performance, avoiding the overhead of Parquet's nested LIST decoding. Compatibility with external tools is not a concern since these parquet files are internal to QuestDB.