Skip to content

feat(core): optimize parquet partition read with late materialization, zero-copy page reading, and use raw array encoding#6675

Merged
bluestreak01 merged 108 commits intomasterfrom
parquet_filter_late_materization
Feb 3, 2026
Merged

feat(core): optimize parquet partition read with late materialization, zero-copy page reading, and use raw array encoding#6675
bluestreak01 merged 108 commits intomasterfrom
parquet_filter_late_materization

Conversation

@kafka1991
Copy link
Copy Markdown
Collaborator

@kafka1991 kafka1991 commented Jan 20, 2026

depend #6632

Late Materialization

Implements late materialization optimization for Parquet partitions when parallel executing queries with selective filters.
Instead of decoding all columns upfront, this optimization:

  1. First decodes only the filter column(s) to identify matching rows
  2. Then decodes other columns only for the rows that passed the filter

This significantly reduces Parquet decoding overhead for low-selectivity queries.

Performance
OHLC aggregation query on Parquet partition (8 days)

 SELECT                                                                                                                                                                                                                                
     timestamp_floor('5s', timestamp) AS interval_start,                                                                                                                                                                               
     symbol,                                                                                                                                                                                                                           
     first(price) as open, max(price) as high,                                                                                                                                                                                         
     min(price) as low, last(price) as close                                                                                                                                                                                           
 FROM trades                                                                                                                                                                                                                           
 WHERE symbol = 'sym500'                                                                                                                                                                                                               
   AND timestamp BETWEEN '2025-01-01' AND '2025-01-09'                                                                                                                                                                                 
 GROUP BY timestamp_floor('5s', timestamp), symbol                                                                                                                                                                                     
 ORDER BY interval_start 

Before(branch parquet_decode_opt): 600ms
Patch: 250ms

Zero-Copy Mmap Page Reading

Introduced SlicePageReader that reads Parquet pages directly from the mmap'd byte slice(parquet file already mmaped in jave side), bypassing the previous PageReader which copied page data via read_to_end() into an intermediate Vec<u8>.

Array

Default partitionEncoderParquetRawArrayEncoding to true. Partition-to-parquet conversion now uses raw array encoding by default for better performance, avoiding the overhead of Parquet's nested LIST decoding. Compatibility with external tools is not a concern since these parquet files are internal to QuestDB.

@kafka1991 kafka1991 changed the title perf(parquet): late parquet data materialization for parallel filtered-related queries feat(core): late parquet data materialization for parallel filtered-related queries Jan 30, 2026
@kafka1991 kafka1991 changed the title feat(core): late parquet data materialization for parallel filtered-related queries feat(core): optimize parquet partition read with late materialization, zero-copy page reading, and raw array encoding Jan 30, 2026
@kafka1991 kafka1991 changed the title feat(core): optimize parquet partition read with late materialization, zero-copy page reading, and raw array encoding feat(core): optimize parquet partition read with late materialization, zero-copy page reading, and use raw array encoding Jan 30, 2026
@kafka1991
Copy link
Copy Markdown
Collaborator Author

@CodeRabbit I made more changes, pls review again!

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Jan 30, 2026

@kafka1991: Thank you for making the changes! I'll trigger a fresh review now.

Please note that the Enterprise CI is still failing (Build 210949 on commit 4b64f40212e1bb76d371626e64f080a5c2f3b694). This will need to be investigated and resolved before the PR can be merged.

Starting the review of your latest changes!

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@puzpuzpuz
Copy link
Copy Markdown
Contributor

Compatibility with external tools is not a concern since these parquet files are internal to QuestDB.

I recall this was a concern when we chose this as the default. The files in Cold Storage are meant to be readable by 3rd-party SW.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (7)
core/src/main/java/io/questdb/PropServerConfiguration.java (1)

1655-1665: ⚠️ Potential issue | 🟡 Minor

Add lower-bound validation for interval knobs.
Non‑positive values for bracket depth or thresholds could yield surprising behavior even though the relative check passes. Consider guarding for >= 1 to fail fast.

🔧 Suggested validation addition
             this.sqlIntervalMaxBracketDepth = getInt(properties, env, PropertyKey.CAIRO_SQL_INTERVAL_MAX_BRACKET_DEPTH, 8);
             this.sqlIntervalMaxIntervalsAfterMerge = getInt(properties, env, PropertyKey.CAIRO_SQL_INTERVAL_MAX_INTERVALS_AFTER_MERGE, 1024);
             this.sqlIntervalIncrementalMergeThreshold = getInt(properties, env, PropertyKey.CAIRO_SQL_INTERVAL_INCREMENTAL_MERGE_THRESHOLD, 256);
+            if (this.sqlIntervalMaxBracketDepth < 1) {
+                throw new ServerConfigurationException(
+                        PropertyKey.CAIRO_SQL_INTERVAL_MAX_BRACKET_DEPTH.getPropertyPath()
+                                + " (" + this.sqlIntervalMaxBracketDepth + ") must be >= 1"
+                );
+            }
+            if (this.sqlIntervalIncrementalMergeThreshold < 1) {
+                throw new ServerConfigurationException(
+                        PropertyKey.CAIRO_SQL_INTERVAL_INCREMENTAL_MERGE_THRESHOLD.getPropertyPath()
+                                + " (" + this.sqlIntervalIncrementalMergeThreshold + ") must be >= 1"
+                );
+            }
             if (this.sqlIntervalMaxIntervalsAfterMerge <= this.sqlIntervalIncrementalMergeThreshold) {
                 throw new ServerConfigurationException(
                         PropertyKey.CAIRO_SQL_INTERVAL_MAX_INTERVALS_AFTER_MERGE.getPropertyPath()
core/src/main/java/io/questdb/griffin/engine/table/AsyncGroupByNotKeyedAtom.java (1)

172-188: ⚠️ Potential issue | 🟠 Major

Close leaks filtered memory records.

frameFilteredMemoryRecords and ownerPageFrameFilteredNoRandomAccessMemoryRecord are newly added but never freed in close(), which can retain native memory across cursor lifetimes.

🧹 Proposed fix
     Misc.free(ownerAllocator);
     Misc.freeObjList(perWorkerAllocators);
     Misc.free(ownerMapValue);
     Misc.freeObjList(perWorkerMapValues);
+    Misc.freeObjList(frameFilteredMemoryRecords);
+    Misc.free(ownerPageFrameFilteredNoRandomAccessMemoryRecord);
     if (perWorkerGroupByFunctions != null) {
         for (int i = 0, n = perWorkerGroupByFunctions.size(); i < n; i++) {
             Misc.freeObjList(perWorkerGroupByFunctions.getQuick(i));
         }
     }
core/src/main/java/io/questdb/griffin/engine/join/AsyncWindowJoinRecordCursorFactory.java (5)

959-978: ⚠️ Potential issue | 🟠 Major

Potential slot leak if exception occurs before try block.

The slotId is acquired at line 960, but the try block that releases it in finally doesn't start until line 978. If any operation between these points throws (e.g., populateFrameMemory(), record.init()), the slot will never be released.

🔧 Proposed fix: move try block earlier to cover slot acquisition
     final boolean owner = stealingFrameSequence != null && stealingFrameSequence == task.getFrameSequence();
     final int slotId = atom.maybeAcquire(workerId, owner, circuitBreaker);
 
+    try {
         final PageFrameMemory frameMemory;
         final boolean isParquetFrame = task.isParquetFrame();
         final boolean useLateMaterialization = atom.shouldUseLateMaterialization(slotId, isParquetFrame);
         if (useLateMaterialization) {
             frameMemory = task.populateFrameMemory(atom.getFilterUsedColumnIndexes());
         } else {
             frameMemory = task.populateFrameMemory();
         }
         record.init(frameMemory);
 
         final DirectLongList rows = task.getFilteredRows();
         rows.clear();
 
         final Function filter = atom.getMasterFilter(slotId);
         final CompiledFilter compiledFilter = atom.getCompiledMasterFilter();
 
-    try {
         if (compiledFilter == null || frameMemory.hasColumnTops()) {

1108-1127: ⚠️ Potential issue | 🟠 Major

Same slot leak pattern as filterAndAggregate.

Move the try block to immediately after slot acquisition to ensure proper cleanup.


1265-1284: ⚠️ Potential issue | 🟠 Major

Same slot leak pattern as other filter methods.

Move the try block to immediately after slot acquisition.


1421-1440: ⚠️ Potential issue | 🟠 Major

Same slot leak pattern as other filter methods.

Move the try block to immediately after slot acquisition.


1566-1585: ⚠️ Potential issue | 🟠 Major

Same slot leak pattern as other filter methods.

Move the try block to immediately after slot acquisition.

🤖 Fix all issues with AI agents
In `@core/rust/qdbr/parquet2/src/read/page/slice_reader.rs`:
- Around line 112-114: The uncompressed_page_size read from page_header (let
uncompressed_page_size = page_header.uncompressed_page_size.try_into()?) must be
guarded by the same max_page_size cap used for compressed sizes to avoid
oversized allocations on decode; after converting uncompressed_page_size, clamp
it against max_page_size (same constant/variable used elsewhere) and use the
clamped value for subsequent allocations/decoding logic in the PageType/page
handling code to prevent header-driven memory exhaustion.
- Around line 44-107: SlicePageReader currently only uses the start from
ColumnChunkMetaData::byte_range() and can read past the column chunk into
surrounding data; update the struct to include an end (column_end) computed from
byte_range() (start + length) in SlicePageReader::new and use that boundary in
all checks (e.g., when validating col_start, computing remaining/slicing in
read_next, and checking self.offset + read_size) so reads never exceed
column_end; adjust any references to self.data[self.offset..] and bounds checks
in read_next (and any other methods) to enforce self.offset <= column_end and
self.offset + read_size <= column_end.

In
`@core/src/main/java/io/questdb/griffin/engine/join/AsyncWindowJoinRecordCursorFactory.java`:
- Around line 1597-1601: The call to
task.populateRemainingColumns(atom.getFilterUsedColumnIndexes(), rows, true)
followed by record.init(frameMemory) is currently executed unconditionally when
useLateMaterialization is true, which decodes remaining columns even if no rows
pass the filter; move the populateRemainingColumns(...)/record.init(frameMemory)
sequence inside the existing if (filteredRowCount > 0 &&
!atom.isSkipAggregation()) block so it only runs when there are filtered rows
and aggregation is not skipped, keeping the same useLateMaterialization guard
around it (refer to useLateMaterialization, task.populateRemainingColumns,
atom.getFilterUsedColumnIndexes(), rows, record.init, frameMemory,
filteredRowCount, atom.isSkipAggregation()).

In
`@core/src/main/java/io/questdb/griffin/engine/table/parquet/PartitionDecoder.java`:
- Around line 110-166: Both decodeRowGroupWithRowFilter and
decodeRowGroupWithRowFilterFillNulls pass filteredRows.getAddress() to JNI but
do not ensure the DirectLongList is open; if its address is 0 the native Rust
code will assert and crash. Before calling the native methods check if
filteredRows.getAddress() == 0 and reopen/allocate it (e.g.,
filteredRows.reopen() or the equivalent method on DirectLongList) so
filteredRows.getAddress() returns a non‑zero pointer, then pass that address to
the native calls.

In
`@core/src/test/java/io/questdb/test/griffin/ParquetLateMaterializationFuzzTest.java`:
- Around line 847-998: The test mutates shared config via
node1.setProperty(CAIRO_PARTITION_ENCODER_PARQUET_RAW_ARRAY_ENCODING_ENABLED,
rawArrayEncoding) and must restore the default (true) afterward to avoid
cross-test leakage; wrap the TestUtils.execute(...) call inside a try/finally
(or set the property before and reset it after) in
testLateMaterializationWithArrayColumn so that node1.setProperty(..., true) is
always called in the finally block to restore the default.
🧹 Nitpick comments (1)
core/src/main/java/io/questdb/griffin/engine/join/AsyncWindowJoinFastRecordCursorFactory.java (1)

1229-1255: Consider extracting the repeated late‑materialization setup block.
The same frame‑population + filter application sequence appears across five reducers; a helper would reduce drift and simplify future changes.

♻️ Possible extraction (apply similarly to other filter* reducers)
@@
-            final PageFrameMemory frameMemory;
-            final boolean isParquetFrame = task.isParquetFrame();
-            final boolean useLateMaterialization = atom.shouldUseLateMaterialization(slotId, isParquetFrame);
-            if (useLateMaterialization) {
-                frameMemory = task.populateFrameMemory(atom.getFilterUsedColumnIndexes());
-            } else {
-                frameMemory = task.populateFrameMemory();
-            }
-            record.init(frameMemory);
-            final DirectLongList rows = task.getFilteredRows();
-            rows.clear();
-
-            final Function filter = atom.getMasterFilter(slotId);
-            final CompiledFilter compiledFilter = atom.getCompiledMasterFilter();
-            if (compiledFilter == null || frameMemory.hasColumnTops()) {
-                applyFilter(filter, rows, record, frameRowCount);
-            } else {
-                applyCompiledFilter(compiledFilter, atom.getBindVarMemory(), atom.getBindVarFunctions(), task);
-            }
-
-            if (isParquetFrame) {
-                atom.getSelectivityStats(slotId).update(rows.size(), frameRowCount);
-            }
-            if (useLateMaterialization && task.populateRemainingColumns(atom.getFilterUsedColumnIndexes(), rows, true)) {
-                record.init(frameMemory);
-            }
+            final DirectLongList rows = task.getFilteredRows();
+            final PageFrameMemory frameMemory =
+                    initFilteredFrame(atom, slotId, task, record, rows, frameRowCount);
@@
+    private static PageFrameMemory initFilteredFrame(
+            AsyncWindowJoinFastAtom atom,
+            int slotId,
+            PageFrameReduceTask task,
+            PageFrameMemoryRecord record,
+            DirectLongList rows,
+            long frameRowCount
+    ) {
+        final boolean isParquetFrame = task.isParquetFrame();
+        final boolean useLateMaterialization = atom.shouldUseLateMaterialization(slotId, isParquetFrame);
+        final PageFrameMemory frameMemory = useLateMaterialization
+                ? task.populateFrameMemory(atom.getFilterUsedColumnIndexes())
+                : task.populateFrameMemory();
+        record.init(frameMemory);
+        rows.clear();
+
+        final Function filter = atom.getMasterFilter(slotId);
+        final CompiledFilter compiledFilter = atom.getCompiledMasterFilter();
+        if (compiledFilter == null || frameMemory.hasColumnTops()) {
+            applyFilter(filter, rows, record, frameRowCount);
+        } else {
+            applyCompiledFilter(compiledFilter, atom.getBindVarMemory(), atom.getBindVarFunctions(), task);
+        }
+
+        if (isParquetFrame) {
+            atom.getSelectivityStats(slotId).update(rows.size(), frameRowCount);
+        }
+        if (useLateMaterialization && task.populateRemainingColumns(atom.getFilterUsedColumnIndexes(), rows, true)) {
+            record.init(frameMemory);
+        }
+        return frameMemory;
+    }

Also applies to: 1394-1422, 1562-1590, 1792-1820, 1997-2024

@kafka1991
Copy link
Copy Markdown
Collaborator Author

Compatibility with external tools is not a concern since these parquet files are internal to QuestDB.

I recall this was a concern when we chose this as the default. The files in Cold Storage are meant to be readable by 3rd-party SW.

Yeah, but in my tests the scan performance with rawEncoding is significantly better. Since this is only used internally within a table partition, I’d to switch to rawEncoding there. For Parquet export we still keep the array encoding as the standard List type. Let's ask @bluestreak01 's opinions.

@kafka1991
Copy link
Copy Markdown
Collaborator Author

@coderabbitai I've adresses your comments, please do a comprehensive review again.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Jan 30, 2026

@kafka1991: Thank you for addressing the previous review comments! I'll trigger a comprehensive review of all the changes now.

Please note that based on the earlier comment chain, the Enterprise CI was failing (Build 210949 on commit 4b64f40212e1bb76d371626e64f080a5c2f3b694). If this has been resolved, that's great! If not, please make sure to investigate and fix any CI failures before merging.

I'll focus on:

  • Thread safety and race conditions (given the parallel/async nature)
  • Resource management
  • Late materialization logic correctness
  • Parquet decoding optimizations
  • Any potential bugs or code smells

Starting the comprehensive review now!

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

GitHub Actions - Rebuild Native Libraries and others added 6 commits January 30, 2026 09:30
…' into parquet_filter_late_materization

# Conflicts:
#	core/src/main/resources/io/questdb/bin/darwin-aarch64/libquestdbr.dylib
#	core/src/main/resources/io/questdb/bin/darwin-x86-64/libquestdbr.dylib
#	core/src/main/resources/io/questdb/bin/linux-aarch64/libquestdbr.so
#	core/src/main/resources/io/questdb/bin/linux-x86-64/libquestdbr.so
#	core/src/main/resources/io/questdb/bin/windows-x86-64/questdbr.dll
@kafka1991 kafka1991 force-pushed the parquet_filter_late_materization branch from 7e65d4b to a24f1b5 Compare February 3, 2026 01:00
@glasstiger
Copy link
Copy Markdown
Contributor

[PR Coverage check]

😍 pass : 2565 / 3573 (71.79%)

file detail

path covered line new line coverage
🔵 qdbr/parquet2/src/page/mod.rs 0 3 00.00%
🔵 io/questdb/cairo/sql/PageFrameFilteredNoRandomAccessMemoryRecord.java 106 229 46.29%
🔵 io/questdb/griffin/engine/join/AsyncWindowJoinRecordCursorFactory.java 33 55 60.00%
🔵 qdbr/src/parquet_read/decode.rs 1672 2459 68.00%
🔵 io/questdb/griffin/SqlCodeGenerator.java 28 35 80.00%
🔵 qdbr/parquet2/src/read/page/slice_reader.rs 77 96 80.21%
🔵 qdbr/src/parquet_read/jni.rs 114 133 85.71%
🔵 io/questdb/cairo/sql/PageFrameMemoryPool.java 67 78 85.90%
🔵 io/questdb/griffin/engine/join/AsyncWindowJoinAtom.java 13 15 86.67%
🔵 io/questdb/cairo/sql/async/PageFrameReduceTask.java 7 8 87.50%
🔵 io/questdb/griffin/engine/table/parquet/PartitionDecoder.java 18 20 90.00%
🔵 qdbr/src/parquet_read/slicer/mod.rs 11 12 91.67%
🔵 io/questdb/griffin/engine/table/AsyncTopKAtom.java 14 15 93.33%
🔵 qdbr/src/parquet_read/column_sink/fixed.rs 14 15 93.33%
🔵 io/questdb/griffin/engine/table/SelectivityStats.java 16 17 94.12%
🔵 io/questdb/griffin/engine/table/AsyncFilterAtom.java 21 22 95.45%
🔵 io/questdb/griffin/engine/table/AsyncGroupByAtom.java 23 24 95.83%
🔵 io/questdb/griffin/engine/table/AsyncGroupByNotKeyedAtom.java 21 22 95.45%
🔵 qdbr/src/parquet_write/array.rs 101 105 96.19%
🔵 io/questdb/griffin/engine/table/AsyncJitFilteredRecordCursorFactory.java 42 43 97.67%
🔵 io/questdb/griffin/engine/join/AsyncWindowJoinFastRecordCursorFactory.java 30 30 100.00%
🔵 io/questdb/griffin/engine/table/AsyncTopKRecordCursorFactory.java 9 9 100.00%
🔵 io/questdb/cairo/sql/PageFrameMemoryRecord.java 50 50 100.00%
🔵 io/questdb/cairo/DefaultCairoConfiguration.java 1 1 100.00%
🔵 qdbr/src/parquet_read/column_sink/var.rs 5 5 100.00%
🔵 io/questdb/griffin/engine/table/AsyncFilteredRecordCursorFactory.java 11 11 100.00%
🔵 io/questdb/griffin/engine/table/AsyncGroupByNotKeyedRecordCursorFactory.java 14 14 100.00%
🔵 io/questdb/PropServerConfiguration.java 4 4 100.00%
🔵 qdbr/src/parquet_write/varchar.rs 8 8 100.00%
🔵 io/questdb/griffin/engine/table/AsyncGroupByRecordCursorFactory.java 15 15 100.00%
🔵 qdbr/src/parquet_read/column_sink/tests.rs 20 20 100.00%

@bluestreak01 bluestreak01 merged commit 3ebc757 into master Feb 3, 2026
39 checks passed
@bluestreak01 bluestreak01 deleted the parquet_filter_late_materization branch February 3, 2026 13:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants