Skip to content

perf(core): streaming parquet export#6300

Merged
bluestreak01 merged 111 commits intomasterfrom
stream_export_parquet
Jan 14, 2026
Merged

perf(core): streaming parquet export#6300
bluestreak01 merged 111 commits intomasterfrom
stream_export_parquet

Conversation

@kafka1991
Copy link
Copy Markdown
Collaborator

@kafka1991 kafka1991 commented Oct 23, 2025

Tandem https://github.com/questdb/questdb-enterprise/pull/848

This PR implements streaming Parquet export for HTTP queries, eliminating temporary table/files and improving performance.

  • Direct streaming: Exports directly from PageFrames when supportsPageFrameCursor() is true
  • Fallback: Creates temp table then streams export when PageFrameCursor is not supported
  • Currently HTTP /exp endpoint only

Performance

Prepare Table Data

CREATE TABLE IF NOT EXISTS 'trades_native' ( 
        symbol SYMBOL CAPACITY 2048 CACHE,
        side SYMBOL CAPACITY 4 CACHE,
        price DOUBLE,
        amount DOUBLE,
        timestamp TIMESTAMP
) timestamp(timestamp) PARTITION BY HOUR;

INSERT INTO trades_native SELECT
    rnd_symbol_zipf(1_000, 2.0) AS symbol,
    rnd_symbol('buy', 'sell') as side,
    rnd_double() * 20 + 10 AS price,
    rnd_double() * 20 + 10 AS amount,
    generate_series as timestamp
  FROM generate_series('2025-01-01', '2025-01-11', '1720u');

Export on Master Branch

time curl -G --data-urlencode "query=trades_native" --data-urlencode "fmt=parquet" --data-urlencode "timeout=60000000" http://localhost:9000/exp > trades_native.parquet

Executed in  893.15 secs

Export on Current Branch

time curl -G --data-urlencode "query=trades_native" --data-urlencode "fmt=parquet" --data-urlencode "timeout=60000000" http://localhost:9000/exp > trades_native.parquet

Executed in  778.82 secs

The current performance improvement is modest because current single-thread Parquet export is CPU-bound. However,the new implementationmakes it possible to parallelize generate row group across multiple threads while maintaining single-threaded writes to the output buffer. If we decide to optimize further, there is significant room for performance gains.

TODO LIST

  • need add few tests

@kafka1991 kafka1991 self-assigned this Oct 23, 2025
@kafka1991 kafka1991 marked this pull request as draft October 23, 2025 02:40
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Oct 23, 2025

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

This PR adds streaming Parquet export end-to-end: Rust writer APIs and JNI bindings, Java streaming hooks in the HTTP export processor and task/worker plumbing, PageFrame cursor support for streaming, designated-timestamp ordering, and multiple related test/tool adjustments.

Changes

Cohort / File(s) Summary
Rust: Parquet write core & multi-partition row-groups
core/rust/qdbr/src/parquet_write/file.rs
Made DEFAULT_ROW_GROUP_SIZE public; changed chunk APIs to take &Partition; added create_row_group_from_partitions and write_row_group_from_partitions supporting multi-partition RowGroup creation; internal plumbing updated to use &Partition.
Rust: JNI streaming writer & helpers
core/rust/qdbr/src/parquet_write/jni.rs
Added StreamingParquetWriter, BufferWriter and streaming state/helpers; new JNI exports for create/write/finish/close and row-group writes; integrated streaming path alongside existing encoder.
Java: HTTP export streaming flow
core/src/main/java/io/questdb/cutlass/http/processors/ExportQueryProcessor.java, core/src/main/java/io/questdb/cutlass/http/processors/ExportQueryProcessorState.java
Introduced streaming export state and callback (StreamWriteParquetCallBack), new state constant QUERY_PARQUET_EXPORT_DATA, per-context currentContext, PageFrameCursor wiring, onWrite handlers and deferred init for streaming exports.
Java: Export task & exporter
core/src/main/java/io/questdb/cutlass/parquet/CopyExportRequestTask.java, core/src/main/java/io/questdb/cutlass/parquet/CopyExportRequestJob.java
Extended CopyExportRequestTask with metadata, pageFrameCursor, descending flag, writeCallback and StreamPartitionParquetExporter; updated of(...) signature and job invocation to pass new streaming parameters; adjusted status updates.
Java: Export result/context changes
core/src/main/java/io/questdb/cutlass/text/CopyExportResult.java, core/src/main/java/io/questdb/cutlass/text/CopyExportContext.java
Removed copyID/message/report plumbing from CopyExportResult; CopyExportContext.updateStatus signature simplified; added canStreamExportParquet helper delegating to factory support.
Java: Native PartitionEncoder API
core/src/main/java/io/questdb/griffin/engine/table/parquet/PartitionEncoder.java
Added native methods for streaming lifecycle: createStreamingParquetWriter, writeStreamingParquetChunk, writeStreamingParquetChunkFromRowGroup, finishStreamingParquetWrite, closeStreamingParquetWriter.
Java: PageFrame API & implementations
core/src/main/java/io/questdb/cairo/sql/PageFrame.java, core/src/main/java/io/questdb/griffin/engine/table/BwdTableReaderPageFrameCursor.java, core/src/main/java/io/questdb/griffin/engine/table/FwdTableReaderPageFrameCursor.java, core/src/main/java/io/questdb/griffin/engine/table/SelectedRecordCursorFactory.java
Added hasColumnData(int columnIndex) to PageFrame and implemented overrides in table-frame classes and selected cursor frame to indicate per-column data presence.
Java: QueryProgress & frame sizing
core/src/main/java/io/questdb/griffin/engine/QueryProgress.java, core/src/main/java/io/questdb/griffin/engine/table/AbstractPageFrameRecordCursorFactory.java
Added RegisteredPageFrameCursor, pageFrameCursor tracking, changePageFrameSizes(int,int) API and made page-frame size fields mutable for runtime resizing.
Java: Configuration & utilities
core/src/main/java/io/questdb/cairo/DefaultCairoConfiguration.java, core/src/main/java/io/questdb/std/DirectLongList.java
Increased default Parquet export row-group size to 100_000; added DirectLongList(long,int,boolean) constructor with deferred-allocation option.
Rust: Designated timestamp order & schema changes
core/rust/qdb-core/src/col_type.rs, core/rust/qdb-core/src/col_driver/mod.rs, core/rust/qdbr/src/parquet_write/schema.rs, core/rust/qdbr/src/parquet_read/meta.rs
Added TYPE_FLAG_DESIGNATED_TIMESTAMP_ORDER_DESCENDING, is_designated_timestamp_ascending(), into_designated_with_order(ascending); updated driver predicate and schema/decoder to honor ascending flag when assigning designated timestamp.
Rust: Parquet read/write call-site and tests
core/rust/qdbr/src/parquet_read/decode.rs, core/rust/qdbr/src/parquet_read/mod.rs, core/rust/qdbr/src/parquet_write/mod.rs, core/rust/qdbr/src/parquet_write/update.rs
Updated Column::from_raw_data call sites to accept extra boolean flags (designated timestamp ascending); added RowGroupBuffers::column_buffers() accessor; added tests covering designated timestamp ascending/descending behavior.
Rust: parquet-tools dump API
core/rust/qdbr/parquet2/parquet-tools/src/lib/dump.rs, core/rust/qdbr/parquet2/parquet-tools/src/main.rs, core/rust/qdbr/parquet2/parquet-tools/src/lib/meta.rs
Simplified dump_file API by removing sample_size, streamlined page iteration and output model; updated main invocation and minor formatting in meta.
Java tests & compat
core/src/test/java/io/questdb/test/cutlass/http/ExpParquetExportTest.java, compat/src/test/java/io/questdb/compat/HttpClientCompatTest.java
Adjusted expected Parquet export byte sizes across tests for new streaming/configuration; simplified HTTP compat test to use copyTextTo instead of fragment loop.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

Core

Suggested reviewers

  • puzpuzpuz
🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 24.87% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'perf(core): streaming parquet export' clearly and specifically describes the main change: implementing streaming Parquet export for performance improvement.
Description check ✅ Passed The description comprehensively explains the implementation details, performance improvements, and provides context through linked enterprise PR and performance benchmarks.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@kafka1991 kafka1991 marked this pull request as ready for review November 27, 2025 08:50
@bluestreak01 bluestreak01 changed the title feat(core): streaming parquet export perf(core): streaming parquet export Jan 13, 2026
@bluestreak01
Copy link
Copy Markdown
Member

@CodeRabbit review

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Jan 14, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
core/rust/qdb-core/src/col_driver/mod.rs (1)

61-70: Descending designated timestamps incorrectly use TimestampDriver instead of DesignatedTimestampDriver.

The match condition uses is_designated_timestamp_ascending() which returns false for descending designated timestamps, causing them to route to TimestampDriver (8 bytes per row) instead of DesignatedTimestampDriver (16 bytes: timestamp + insert index for O3 merge logic).

The order flag (ascending/descending) should not affect the storage layout. All designated timestamps require the insert index for O3 merge handling. The check should be col_type.is_designated() instead, so both ascending and descending designated timestamps route to DesignatedTimestampDriver. Alternatively, you need a third match arm for descending designated timestamps.

Note: The test suite only covers ascending designated timestamps (line 110-115) and lacks coverage for descending designated timestamps, which explains how this slipped through.

🤖 Fix all issues with AI agents
In `@core/rust/qdb-core/src/col_type.rs`:
- Around line 233-238: The new Rust flag
TYPE_FLAG_DESIGNATED_TIMESTAMP_ORDER_DESCENDING (bit 19) collides with Java's
TYPE_FLAG_ARRAY_WEAK_DIMS causing cross-language deserialization errors; fix by
moving the Rust timestamp-order flag to an unused bit (e.g., change
TYPE_FLAG_DESIGNATED_TIMESTAMP_ORDER_DESCENDING to use a different shift such as
20), update all usages and tests referencing
TYPE_FLAG_DESIGNATED_TIMESTAMP_ORDER_DESCENDING, and either (a) add a
protocol/version bump or migration comment in serde boundary code to handle the
old encoding during rollout or (b) coordinate the Java change so both sides use
distinct bits; ensure any serialization/deserialization paths that
construct/parse raw i32 column type masks are updated to the new constant and
add a compatibility test exercising Java-produced weak-dims masks vs Rust
timestamp-order masks.

In `@core/rust/qdbr/src/parquet_write/schema.rs`:
- Around line 394-398: The logic computing column_type mishandles designated
timestamps: it only calls into_designated_with_order when
column.designated_timestamp_ascending is true, so when
column.designated_timestamp is true but ascending is false the designated
flag/order is lost; update the conditional to call
column.data_type.into_designated_with_order(column.designated_timestamp_ascending)?
whenever column.designated_timestamp is true (regardless of ascending), using
column.designated_timestamp and column.designated_timestamp_ascending to decide
the branch and preserve the designated order via into_designated_with_order.
🧹 Nitpick comments (4)
core/rust/qdbr/parquet2/parquet-tools/src/lib/dump.rs (1)

86-96: Test coverage is minimal but functional.

The test only verifies that output contains "Group: 0" and "Rows: 100". While this confirms basic functionality, consider adding assertions for page-level output to ensure the page iteration logic works correctly (e.g., checking that "Page:" appears in output).

💡 Optional: Add page-level assertion
         let string_output = String::from_utf8(buf).unwrap();
         assert!(string_output.contains("Group: 0"));
         assert!(string_output.contains("Rows: 100"));
+        // Verify page iteration produces output
+        assert!(string_output.contains("Page:"));
     }
core/rust/qdbr/src/parquet_write/jni.rs (1)

929-997: Unused parameter _columns.

The _columns parameter is passed but not used in the function body. Consider removing it if it's not needed for future use.

Suggested fix
 fn convert_row_group_buffers_to_partition(
     partition_template: &Partition,
     row_group_bufs: &crate::parquet_read::RowGroupBuffers,
-    _columns: &[(i32, qdb_core::col_type::ColumnType)],
     row_count: usize,
     symbol_data_ptr: jlong,
 ) -> ParquetResult<Partition> {

And update the call site accordingly.

core/rust/qdb-core/src/col_type.rs (1)

273-291: Consider adding spacing consistency in function signature.

The implementation is correct. Minor style nit: there's inconsistent spacing around the colon in the parameter (ascending : bool vs typical ascending: bool).

Suggested fix
-    pub fn into_designated_with_order(self, ascending : bool) -> CoreResult<ColumnType> {
+    pub fn into_designated_with_order(self, ascending: bool) -> CoreResult<ColumnType> {
core/rust/qdb-core/src/col_driver/mod.rs (1)

99-144: Consider adding a test case for descending designated timestamps.

The existing tests cover non-designated and ascending designated timestamps, but there's no test verifying the driver selection for descending designated timestamps. Given the behavioral change, adding coverage would help prevent regressions.

Suggested test addition
            // Add after the designated-timestamp case (line ~116):
            (
                ColumnTypeTag::Timestamp
                    .into_type()
                    .into_designated_with_order(false)
                    .unwrap(),
                "timestamp", // or "designated-timestamp" depending on intended behavior
            ),
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 13a584d and 5aa1fe3.

📒 Files selected for processing (14)
  • compat/src/test/java/io/questdb/compat/HttpClientCompatTest.java
  • core/rust/qdb-core/src/col_driver/mod.rs
  • core/rust/qdb-core/src/col_type.rs
  • core/rust/qdbr/parquet2/parquet-tools/src/lib/dump.rs
  • core/rust/qdbr/parquet2/parquet-tools/src/lib/meta.rs
  • core/rust/qdbr/parquet2/parquet-tools/src/main.rs
  • core/rust/qdbr/src/parquet_read/decode.rs
  • core/rust/qdbr/src/parquet_read/meta.rs
  • core/rust/qdbr/src/parquet_read/mod.rs
  • core/rust/qdbr/src/parquet_write/file.rs
  • core/rust/qdbr/src/parquet_write/jni.rs
  • core/rust/qdbr/src/parquet_write/mod.rs
  • core/rust/qdbr/src/parquet_write/schema.rs
  • core/rust/qdbr/src/parquet_write/update.rs
✅ Files skipped from review due to trivial changes (1)
  • core/rust/qdbr/parquet2/parquet-tools/src/lib/meta.rs
🧰 Additional context used
🧬 Code graph analysis (3)
core/rust/qdbr/parquet2/parquet-tools/src/main.rs (1)
core/rust/qdbr/parquet2/parquet-tools/src/lib/dump.rs (1)
  • dump_file (14-80)
core/rust/qdbr/src/parquet_write/jni.rs (2)
core/rust/qdbr/src/parquet_write/file.rs (5)
  • new (69-84)
  • new (261-263)
  • new (509-525)
  • partition (321-327)
  • partition (350-356)
core/rust/qdbr/src/parquet_write/schema.rs (4)
  • to_parquet_schema (368-411)
  • to_encodings (413-419)
  • partition (372-384)
  • from_raw_data (302-360)
core/rust/qdb-core/src/col_type.rs (1)
core/src/main/java/io/questdb/cairo/ColumnType.java (1)
  • ColumnType (58-1168)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (35)
  • GitHub Check: New pull request (Coverage Report Coverage Report)
  • GitHub Check: New pull request (SelfHosted Running tests with cover on linux-other)
  • GitHub Check: New pull request (SelfHosted Running tests with cover on linux-pgwire)
  • GitHub Check: New pull request (SelfHosted Running tests with cover on linux-cairo-sub)
  • GitHub Check: New pull request (SelfHosted Running tests with cover on linux-cairo-root)
  • GitHub Check: New pull request (SelfHosted Running tests with cover on linux-fuzz2)
  • GitHub Check: New pull request (SelfHosted Running tests with cover on linux-fuzz1)
  • GitHub Check: New pull request (SelfHosted Running tests with cover on linux-griffin-sub)
  • GitHub Check: New pull request (SelfHosted Running tests with cover on linux-griffin-root)
  • GitHub Check: New pull request (Rust Test and Lint on linux-jdk17)
  • GitHub Check: New pull request (Hosted Running tests on windows-other-2)
  • GitHub Check: New pull request (Hosted Running tests on windows-other-1)
  • GitHub Check: New pull request (Hosted Running tests on windows-pgwire)
  • GitHub Check: New pull request (Hosted Running tests on windows-cairo-2)
  • GitHub Check: New pull request (Hosted Running tests on windows-cairo-1)
  • GitHub Check: New pull request (Hosted Running tests on windows-fuzz2)
  • GitHub Check: New pull request (Hosted Running tests on windows-fuzz1)
  • GitHub Check: New pull request (Hosted Running tests on windows-griffin-sub)
  • GitHub Check: New pull request (Hosted Running tests on windows-griffin-base)
  • GitHub Check: New pull request (Hosted Running tests on mac-other)
  • GitHub Check: New pull request (Hosted Running tests on mac-pgwire)
  • GitHub Check: New pull request (Hosted Running tests on mac-cairo-fuzz)
  • GitHub Check: New pull request (Hosted Running tests on mac-cairo)
  • GitHub Check: New pull request (Hosted Running tests on mac-griffin)
  • GitHub Check: New pull request (SelfHosted Other tests on linux-x86-graal)
  • GitHub Check: New pull request (SelfHosted Other tests on linux-arm64)
  • GitHub Check: New pull request (SelfHosted Other tests on linux-x64-zfs)
  • GitHub Check: New pull request (Trigger Enterprise CI Trigger Enterprise Pipeline)
  • GitHub Check: New pull request (SelfHosted Griffin tests on linux-x86-graal)
  • GitHub Check: New pull request (SelfHosted Cairo tests on linux-arm64)
  • GitHub Check: New pull request (SelfHosted Cairo tests on linux-x86-graal)
  • GitHub Check: New pull request (SelfHosted Griffin tests on linux-x64-zfs)
  • GitHub Check: New pull request (SelfHosted Cairo tests on linux-x64-zfs)
  • GitHub Check: New pull request (SelfHosted Griffin tests on linux-arm64)
  • GitHub Check: New pull request (Check Changes Check changes)
🔇 Additional comments (30)
compat/src/test/java/io/questdb/compat/HttpClientCompatTest.java (1)

90-98: LGTM!

The refactor to use copyTextTo(sink) instead of manually iterating over response fragments is a clean simplification. This reduces code complexity while maintaining the same test behavior.

core/rust/qdbr/src/parquet_read/mod.rs (1)

64-68: LGTM!

Clean, idiomatic getter that provides read-only access to the internal column buffers. The naming convention (column_buffers vs column_bufs) appropriately distinguishes the public API from the internal field.

core/rust/qdbr/parquet2/parquet-tools/src/lib/dump.rs (3)

14-14: LGTM on the simplified API.

Removing sample_size streamlines the function signature. The optional columns parameter with automatic fallback to all columns (lines 23-29) is a clean approach.


52-74: Clean page type handling.

The CompressedPage enum matching is straightforward and correctly distinguishes between data pages (V1/V2) and dictionary pages. Output formatting is consistent.


43-49: No action required—usize::MAX is intentional for this diagnostic tool.

The get_page_iterator parameter max_page_size: usize::MAX is correct. This utility is a diagnostic parquet-tools dumper that inspects metadata and sample data, and needs to handle pages of any size without artificial limits. The safety check (WouldOverAllocate error) remains in place; using usize::MAX simply disables the arbitrary allocation limit, which is appropriate for a tool that must be able to examine files regardless of page size. This pattern is also used in the library's test suite.

core/rust/qdbr/parquet2/parquet-tools/src/main.rs (1)

47-47: LGTM!

The dump_file call correctly aligns with the updated function signature that no longer requires a sample_size parameter.

core/rust/qdbr/src/parquet_read/decode.rs (1)

2197-2254: LGTM!

Test helper functions correctly updated to pass the new designated_timestamp_ascending boolean parameter to Column::from_raw_data. Using false is appropriate for these non-timestamp test columns.

core/rust/qdbr/src/parquet_write/file.rs (5)

30-30: LGTM!

Making DEFAULT_ROW_GROUP_SIZE public is appropriate for reuse in the streaming writer JNI layer.


196-226: LGTM!

Good change to take &Partition by reference instead of by value, avoiding unnecessary clones when the partition data is only read.


228-246: LGTM!

The new write_row_group_from_partitions method cleanly delegates to the standalone function and handles the partial partition ranges correctly.


362-471: LGTM!

The create_row_group_from_partitions function correctly merges data from multiple partitions into a single RowGroup. The parallel and non-parallel paths are consistent, and the partition_slice_range helper correctly handles edge cases for first, middle, and last partitions.


473-497: LGTM!

The partition_slice_range helper correctly computes offsets and lengths for first, middle, last, and single-partition cases.

core/rust/qdbr/src/parquet_write/jni.rs (9)

351-352: Hardcoded true for designated_timestamp_ascending.

In create_partition_descriptor, the designated_timestamp_ascending parameter is hardcoded to true. This appears intentional for the encodePartition code path which writes from QuestDB tables (where timestamps are ascending), but worth documenting this assumption.


406-446: BufferWriter implementation relies on stable pointer from Box<Vec<...>>.

The unsafe implementation is sound because:

  1. StreamingParquetWriter owns current_buffer: Box<Vec<u8, QdbAllocator>>, ensuring the Vec has a stable heap address.
  2. BufferWriter stores the raw pointer to this Vec.
  3. The Box is never moved or reallocated while BufferWriter is in use.

The manual buffer management in write() is correct: it handles capacity growth, direct memory copy, and length updates properly. The init_offset of 8 reserves space for the length header.


448-465: LGTM!

Good design keeping pending_row_group_buffers synchronized with pending_partitions to ensure decoded data remains valid while partitions hold references to it. The comment on line 461-463 clearly documents this lifetime relationship.


467-564: LGTM!

The streaming writer initialization correctly:

  • Creates a partition template with schema but no data
  • Sets up the chunked writer with an 8-byte offset for the length header
  • Captures sorting column configuration with proper descending flag handling
  • Stores additional metadata for the finish call

566-604: LGTM!

The chunk writing logic correctly:

  • Clones the partition template and updates with actual data pointers
  • Tracks accumulated rows for row group size management
  • Synchronizes pending_row_group_buffers with pending_partitions (pushing None since data is caller-managed)

606-689: LGTM!

The row group flushing logic correctly handles:

  • Determining partition boundaries for the row group
  • Draining fully-consumed partitions and their backing buffers together
  • Updating first_partition_start for partially-consumed first partitions
  • Recalculating accumulated_rows after the drain

691-753: LGTM!

The finish and close functions correctly:

  • Write any remaining accumulated rows as a final (potentially smaller) row group
  • Call chunked_writer.finish() to write the Parquet footer
  • Clean up the writer via Box::from_raw

755-802: LGTM!

The partition template creation correctly handles the inverted boolean convention where timestamp_descending == 0 means ascending order.


849-927: LGTM!

The row group conversion path correctly:

  • Decodes the source Parquet row group into buffers
  • Converts buffers to a partition referencing the decoded data
  • Pushes Some(row_group_bufs) to keep the backing data alive while the partition references it
core/rust/qdbr/src/parquet_write/update.rs (1)

181-198: LGTM!

Test helper make_column correctly updated to pass the new designated_timestamp_ascending parameter to Column::from_raw_data.

core/rust/qdbr/src/parquet_read/meta.rs (2)

76-82: LGTM!

The additional is_designated_timestamp_ascending() check correctly ensures that only ascending designated timestamps are reported via timestamp_index. This aligns with QuestDB's requirement that the timestamp index column be in ascending order for efficient time-series queries. Descending timestamp columns will still be readable but won't be treated as the primary timestamp index.


440-443: LGTM!

Test helper correctly updated to pass the new designated_timestamp_ascending parameter.

core/rust/qdbr/src/parquet_write/mod.rs (1)

171-171: LGTM!

Test call sites correctly updated to pass the new designated_timestamp_ascending parameter. Using false is appropriate since these tests don't exercise designated timestamp ordering functionality.

Also applies to: 282-282, 347-347, 364-364, 423-423

core/rust/qdbr/src/parquet_write/schema.rs (2)

286-298: LGTM!

The new designated_timestamp_ascending field cleanly extends the Column struct to track timestamp ordering.


302-360: LGTM!

The from_raw_data function correctly propagates the new designated_timestamp_ascending parameter to the Column struct.

core/rust/qdb-core/src/col_type.rs (3)

269-271: LGTM!

The method correctly checks both conditions: the column must be designated AND the descending flag must NOT be set. Clear and correct implementation.


293-303: LGTM!

The into_non_designated method correctly clears both the designated flag and the order flag, ensuring a clean reset regardless of the original order setting. This maintains idempotency as tested.


511-538: LGTM! Comprehensive test coverage for the new functionality.

The test properly covers:

  • Non-timestamp columns return false for is_designated_timestamp_ascending()
  • Non-designated timestamp returns false
  • Designated ascending returns true
  • Designated descending returns false
  • Clearing designation removes the ascending status for both cases

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

@bluestreak01
Copy link
Copy Markdown
Member

@CodeRabbit review

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Jan 14, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@core/rust/qdbr/src/parquet_write/jni.rs`:
- Around line 697-702: The null-pointer error branch in
StreamingPartitionEncoder.finish uses throw::<jlong> even though the function
returns *const u8; change the throw invocation to throw::<*const u8> so the
exception type matches the function return type (update the call on the encoder
null check that currently invokes fmt_err!(InvalidType, ...) and
err.into_cairo_exception().throw::<jlong>(&mut env) to use throw::<*const u8>),
keeping the existing error message/context and the subsequent return
std::ptr::null().
🧹 Nitpick comments (5)
core/rust/qdb-core/src/col_type.rs (1)

293-303: Consider extracting the combined flag mask for readability.

The logic correctly clears both the designated and order flags. However, the line is quite long and could be improved for readability.

✨ Suggested refactor
     pub fn into_non_designated(self) -> CoreResult<ColumnType> {
         if self.tag() != ColumnTypeTag::Timestamp {
             return Err(fmt_err!(
                 InvalidType,
                 "invalid column type {}, only timestamp columns can have designated flag",
                 self
             ));
         }
-        let code = NonZeroI32::new(self.code() & !(TYPE_FLAG_DESIGNATED_TIMESTAMP | TYPE_FLAG_DESIGNATED_TIMESTAMP_ORDER_DESCENDING)).unwrap();
+        const DESIGNATED_FLAGS_MASK: i32 =
+            TYPE_FLAG_DESIGNATED_TIMESTAMP | TYPE_FLAG_DESIGNATED_TIMESTAMP_ORDER_DESCENDING;
+        let code = NonZeroI32::new(self.code() & !DESIGNATED_FLAGS_MASK).unwrap();
         Ok(Self { code })
     }
core/rust/qdbr/src/parquet_write/mod.rs (1)

578-634: Good test coverage; minor redundant import.

The test correctly validates ascending timestamp sorting metadata. However, line 580 has a redundant import of SortingColumn which is already imported at line 132.

Remove redundant import
     #[test]
     fn test_write_parquet_with_designated_timestamp_ascending() {
-        use parquet2::metadata::SortingColumn;
-
         let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
core/rust/qdbr/src/parquet_write/file.rs (2)

385-387: Prefer returning an error over panicking in library code.

The assert! will cause a panic if partitions is empty. Since this is a public function returning ParquetResult, consider returning an error instead for more graceful handling by callers.

Suggested fix
-    assert!(!partitions.is_empty(), "partitions cannot be empty");
+    if partitions.is_empty() {
+        return Err(fmt_err!(InvalidArgument, "partitions cannot be empty"));
+    }

473-497: Potential arithmetic underflow on invalid inputs.

This function assumes valid inputs (e.g., first_partition_start <= row_count, last_partition_end >= first_partition_start for single partitions). If these invariants are violated, usize subtraction will underflow and wrap around, leading to invalid slice ranges downstream.

If the caller guarantees validity, consider adding a debug assertion to catch violations during development:

Optional: add debug assertions
 fn partition_slice_range(
     part_idx: usize,
     num_partitions: usize,
     row_count: usize,
     first_partition_start: usize,
     last_partition_end: usize,
 ) -> (usize, usize) {
+    debug_assert!(
+        num_partitions == 1 || first_partition_start <= row_count,
+        "first_partition_start exceeds row_count"
+    );
+    debug_assert!(
+        num_partitions == 1 || last_partition_end <= row_count,
+        "last_partition_end exceeds row_count"
+    );
     if num_partitions == 1 {
+        debug_assert!(last_partition_end >= first_partition_start);
         // Single partition: use start and end directly
core/rust/qdbr/src/parquet_write/jni.rs (1)

629-633: Consider defensive assertion for underflow prevention.

Line 630 uses partition_rows.sub(first_start) which will panic on underflow. While the invariant first_start <= partition_rows appears to be maintained by the logic (since first_partition_start is only set from last_partition_end), a debug assertion would make this contract explicit and catch any future regressions.

🛡️ Suggested defensive assertion
         let available_rows = if idx == 0 {
+            debug_assert!(
+                partition_rows >= first_start,
+                "first_partition_start ({}) exceeds partition row count ({})",
+                first_start,
+                partition_rows
+            );
             partition_rows.sub(first_start)
         } else {
             partition_rows
         };
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5aa1fe3 and 53ef4a0.

⛔ Files ignored due to path filters (5)
  • core/src/main/resources/io/questdb/bin/darwin-aarch64/libquestdbr.dylib is excluded by !**/*.dylib
  • core/src/main/resources/io/questdb/bin/darwin-x86-64/libquestdbr.dylib is excluded by !**/*.dylib
  • core/src/main/resources/io/questdb/bin/linux-aarch64/libquestdbr.so is excluded by !**/*.so
  • core/src/main/resources/io/questdb/bin/linux-x86-64/libquestdbr.so is excluded by !**/*.so
  • core/src/main/resources/io/questdb/bin/windows-x86-64/questdbr.dll is excluded by !**/*.dll
📒 Files selected for processing (5)
  • core/rust/qdb-core/src/col_type.rs
  • core/rust/qdbr/src/parquet_write/file.rs
  • core/rust/qdbr/src/parquet_write/jni.rs
  • core/rust/qdbr/src/parquet_write/mod.rs
  • core/rust/qdbr/src/parquet_write/schema.rs
🧰 Additional context used
🧬 Code graph analysis (2)
core/rust/qdbr/src/parquet_write/jni.rs (1)
core/rust/qdbr/src/parquet_write/schema.rs (4)
  • to_parquet_schema (368-413)
  • to_encodings (415-421)
  • partition (372-384)
  • from_raw_data (302-360)
core/rust/qdbr/src/parquet_write/mod.rs (2)
core/rust/qdbr/src/parquet_write/schema.rs (1)
  • from_raw_data (302-360)
core/rust/qdbr/src/parquet_read/meta.rs (1)
  • read (37-157)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (35)
  • GitHub Check: New pull request (Coverage Report Coverage Report)
  • GitHub Check: New pull request (SelfHosted Running tests with cover on linux-other)
  • GitHub Check: New pull request (SelfHosted Running tests with cover on linux-pgwire)
  • GitHub Check: New pull request (SelfHosted Running tests with cover on linux-cairo-sub)
  • GitHub Check: New pull request (SelfHosted Running tests with cover on linux-cairo-root)
  • GitHub Check: New pull request (SelfHosted Running tests with cover on linux-fuzz2)
  • GitHub Check: New pull request (SelfHosted Running tests with cover on linux-fuzz1)
  • GitHub Check: New pull request (SelfHosted Running tests with cover on linux-griffin-sub)
  • GitHub Check: New pull request (SelfHosted Running tests with cover on linux-griffin-root)
  • GitHub Check: New pull request (Hosted Running tests on windows-other-2)
  • GitHub Check: New pull request (Hosted Running tests on windows-other-1)
  • GitHub Check: New pull request (Hosted Running tests on windows-pgwire)
  • GitHub Check: New pull request (Hosted Running tests on windows-cairo-2)
  • GitHub Check: New pull request (Hosted Running tests on windows-cairo-1)
  • GitHub Check: New pull request (Hosted Running tests on windows-fuzz2)
  • GitHub Check: New pull request (Hosted Running tests on windows-fuzz1)
  • GitHub Check: New pull request (Hosted Running tests on windows-griffin-sub)
  • GitHub Check: New pull request (Hosted Running tests on windows-griffin-base)
  • GitHub Check: New pull request (Hosted Running tests on mac-other)
  • GitHub Check: New pull request (Hosted Running tests on mac-pgwire)
  • GitHub Check: New pull request (Hosted Running tests on mac-cairo-fuzz)
  • GitHub Check: New pull request (Hosted Running tests on mac-cairo)
  • GitHub Check: New pull request (Hosted Running tests on mac-griffin)
  • GitHub Check: New pull request (Rust Test and Lint on linux-jdk17)
  • GitHub Check: New pull request (SelfHosted Other tests on linux-x86-graal)
  • GitHub Check: New pull request (SelfHosted Other tests on linux-arm64)
  • GitHub Check: New pull request (Trigger Enterprise CI Trigger Enterprise Pipeline)
  • GitHub Check: New pull request (SelfHosted Griffin tests on linux-x86-graal)
  • GitHub Check: New pull request (SelfHosted Other tests on linux-x64-zfs)
  • GitHub Check: New pull request (SelfHosted Griffin tests on linux-x64-zfs)
  • GitHub Check: New pull request (SelfHosted Cairo tests on linux-x86-graal)
  • GitHub Check: New pull request (SelfHosted Griffin tests on linux-arm64)
  • GitHub Check: New pull request (SelfHosted Cairo tests on linux-arm64)
  • GitHub Check: New pull request (SelfHosted Cairo tests on linux-x64-zfs)
  • GitHub Check: New pull request (Check Changes Check changes)
🔇 Additional comments (21)
core/rust/qdb-core/src/col_type.rs (4)

233-238: LGTM! Bit collision issue properly addressed.

The flag has been correctly moved to bit 20 per the previous review feedback, avoiding the collision with Java's TYPE_FLAG_ARRAY_WEAK_DIMS (bit 19). The documentation clearly explains the flag semantics.


269-271: LGTM!

The method correctly checks both the designated status and the absence of the descending flag to determine ascending order.


273-291: LGTM!

Clean refactor with into_designated() delegating to the new into_designated_with_order() method. The type validation and flag manipulation logic are correct. The unwrap() on line 289 is safe since OR-ing a non-zero value with flags preserves the non-zero property.


511-538: LGTM!

Comprehensive test coverage for the new designated timestamp ordering functionality:

  • Validates non-Timestamp types return false
  • Verifies non-designated Timestamp returns false
  • Tests both ascending (default) and descending orders
  • Confirms into_non_designated() properly clears both flag states
core/rust/qdbr/src/parquet_write/schema.rs (1)

394-400: LGTM! The designated timestamp handling is now correct.

The condition properly calls into_designated_with_order whenever designated_timestamp is true, correctly preserving the designated flag for both ascending and descending timestamps. This addresses the previously flagged issue.

core/rust/qdbr/src/parquet_write/mod.rs (2)

172-172: LGTM!

The additional false parameter correctly sets designated_timestamp_ascending for existing tests where designated timestamp ordering doesn't apply.


523-576: Good test coverage for descending timestamp ordering.

The test correctly validates that when designated_timestamp_ascending=false, the sorting metadata reflects descending=true. The assertions verify all expected properties of the sorting column metadata.

core/rust/qdbr/src/parquet_write/file.rs (4)

30-30: LGTM!

Making this constant public is appropriate for external callers that need to coordinate row group sizing with the writer.


196-226: LGTM!

Changing to &Partition is the right approach—it avoids unnecessary copies and aligns with the new multi-partition APIs that also use references.


228-246: LGTM!

Clean delegation to create_row_group_from_partitions with proper error propagation.


499-580: Error handling correctly addresses prior feedback.

The pending_error field properly captures errors from column_chunk_to_pages and surfaces them on the subsequent next() call. The implementation checks for pending errors both at the start of next() (line 561) and after advance_to_next_partition() fails within the loop (line 573), ensuring no errors are silently discarded.

core/rust/qdbr/src/parquet_write/jni.rs (10)

406-446: LGTM - BufferWriter implementation.

The BufferWriter correctly implements a Write adapter for a heap-allocated buffer with reserved header space. The safety invariants (stable heap address via Box<Vec>, proper capacity management) are correctly maintained. The init_offset mechanism properly reserves 8 bytes for the length prefix written at flush time.


448-465: LGTM - StreamingParquetWriter design.

The struct correctly manages memory lifetime for the streaming path. The Box<Vec> ensures a stable heap address for BufferWriter's raw pointer, and pending_row_group_buffers keeps decoded data alive while partitions reference it. Good documentation of the design rationale.


467-564: LGTM - createStreamingParquetWriter.

The JNI entry point correctly initializes the streaming writer with proper schema derivation, compression options, and sorting column configuration. Memory management follows established patterns with Box::into_raw.


566-604: LGTM - writeStreamingParquetChunk.

Correctly handles chunk accumulation with proper null checks and error handling. The clone() on line 586 copies column metadata (not data), and update_partition_data sets the actual data pointers. This design properly separates schema from data.


740-753: LGTM - closeStreamingParquetWriter.

Standard JNI cleanup pattern correctly implemented with proper null check and Box::from_raw to reclaim ownership and drop the encoder.


755-802: LGTM - create_partition_template.

Correctly creates a schema template with column metadata and no data. The designated timestamp handling (line 781, 795) properly derives the ascending flag from the timestamp_descending parameter.


804-847: LGTM - update_partition_data.

Correctly updates partition data pointers with proper null safety. The COL_DATA_ENTRY_SIZE of 7 (vs 9 in create_partition_descriptor) is appropriate since the streaming path separates metadata from data.


849-927: LGTM - writeStreamingParquetChunkFromRowGroup.

This fallback path correctly decodes source Parquet data and converts it for re-encoding. The memory lifetime is properly managed by storing row_group_bufs in pending_row_group_buffers, ensuring the partition's data slices remain valid.


351-352: No action needed. The hardcoding of designated_timestamp_ascending to true at line 351 is intentional, not a bug. The encodePartition JNI function signature does not include a timestamp_descending parameter—this non-streaming API simply doesn't support descending timestamps. Only the separate streaming API (createStreamingParquetWriter and related functions) supports the timestamp_descending parameter. This is a deliberate design where timestamp ordering is a feature of the streaming path, not the legacy partition encoder path.


929-996: LGTM - convert_row_group_buffers_to_partition.

Correctly converts decoded row group buffers to the Partition format. The symbol column handling properly extracts metadata from the Java-provided symbol_data_ptr. Memory safety is ensured: RowGroupBuffers are explicitly stored in pending_row_group_buffers and kept alive alongside partitions in pending_partitions, with both vectors drained together during flush_pending_partitions, ensuring all slice references remain valid.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

@bluestreak01 bluestreak01 merged commit e3b4bb3 into master Jan 14, 2026
33 of 44 checks passed
@bluestreak01 bluestreak01 deleted the stream_export_parquet branch January 14, 2026 14:35
@glasstiger
Copy link
Copy Markdown
Contributor

[PR Coverage check]

😍 pass : 0 / 0 (0%)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants