feat(core): control parquet writing file size growth, reduce write amplification, improve write speed#6819
Conversation
…et-writing-size-grow
…et-writing-size-grow
|
@coderabbitai, critically review the PR |
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughIntroduces a Parquet-aware O3 (out-of-order) merge strategy with rewrite capability for QuestDB. Core changes include a new O3ParquetMergeStrategy for planning merge actions, extended PartitionUpdater and PartitionDecoder APIs for row-group operations, incremental Parquet footer serialization via FooterCache, unused-bytes tracking metadata, and rewrite thresholds configuration. Spans Java configuration, merge logic, partition handling, and Rust-side Parquet read/write/metadata operations with comprehensive testing. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
I'll focus on:
🧠 Learnings used✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 20
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
core/src/main/java/io/questdb/cairo/O3PartitionJob.java (1)
376-396:⚠️ Potential issue | 🟠 MajorRewrite-mode rollback misses failures after the inner rewrite block.
If an exception is thrown after the inner
try(for example inupdateParquetIndexes(...)), outer catch does not remove the rewrite txn directory. That can leave orphaned partition directories on failed commits.Suggested fix
} catch (Throwable th) { LOG.error().$("process partition error [table=").$(tableWriter.getTableToken()) .$(", e=").$(th) .I$(); - if (!isRewrite) { + if (isRewrite) { + Path newPath = Path.getThreadLocal2(pathToTable); + setPathForNativePartition(newPath, timestampType, partitionBy, partitionTimestamp, txn); + if (!ff.rmdir(newPath.slash())) { + LOG.error().$("could not remove rewritten partition dir after failure [path=").$(newPath).I$(); + } + } else { // Update mode: the file is re-opened here because PartitionUpdater owns the file descriptor. // Truncate to the previous uncorrupted size. path.of(pathToTable); setPathForParquetPartition(path, timestampType, partitionBy, partitionTimestamp, srcNameTxn); final long fd = TableUtils.openRW(ff, path.$(), LOG, cairoConfiguration.getWriterFileOpenOpts());Also applies to: 397-413
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/src/main/java/io/questdb/cairo/O3PartitionJob.java` around lines 376 - 396, The rewrite-mode rollback currently only removes the rewrite txn directory inside the inner try but misses exceptions thrown after that (e.g. in updateParquetIndexes), leaving orphaned txn directories; update O3PartitionJob to ensure the rewrite txn directory (the txn named dir chosen via txnName when isRewrite is true) is always removed on failure by adding cleanup in a finally or in the outer catch that checks isRewrite and removes the txn directory referenced by path/pathToTable (use the same logic as setPathForParquetPartition to locate the directory) when any exception occurs after the inner rewrite block (including failures from updateParquetIndexes), and reuse the same path/ff utilities to delete the directory to avoid duplication.core/src/main/java/io/questdb/cairo/TableWriter.java (1)
7721-7737:⚠️ Potential issue | 🟠 MajorGuard the lag fast-apply path at line 7792 for Parquet last partitions.
The code at lines 7735 and 7737 properly guard
canFastCommitandcanFastCommitNewwith!lastPartitionIsParquetchecks. The calls toapplyFromWalLagToLastPartition()at lines 7743 and 7784 are therefore protected via these boolean conditions. However, the unconditional call at line 7792 lacks this guard and can still execute native append-position updates against a Parquet last partition when lag rows exist.Proposed fix
- if (applyFromWalLagToLastPartition(Math.min(o3TimestampMin, commitToTimestamp), true) != Long.MIN_VALUE) { + if (!lastPartitionIsParquet + && applyFromWalLagToLastPartition(Math.min(o3TimestampMin, commitToTimestamp), true) != Long.MIN_VALUE) { walLagRowCount = txWriter.getLagRowCount(); totalUncommitted = walLagRowCount + commitRowCount; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/src/main/java/io/questdb/cairo/TableWriter.java` around lines 7721 - 7737, The unconditional call to applyFromWalLagToLastPartition(...) must be guarded against Parquet last partitions: check lastPartitionIsParquet (or call isLastPartitionParquet()) before invoking applyFromWalLagToLastPartition in the same way canFastCommit and canFastCommitNew do; only call applyFromWalLagToLastPartition when !lastPartitionIsParquet and the existing conditions (e.g., commitToTimestamp, totalUncommitted, lagOrderedNew) are met so we avoid performing native append-position updates on a Parquet last partition.
🧹 Nitpick comments (8)
benchmarks/src/main/java/org/questdb/LineTCPSender03MultiTableMain.java (1)
65-65: Add explicit parentheses around the modulo operand for clarity.Operator precedence is correct (
%binds tighter than+), but the intent is not immediately obvious at a glance.🔍 Suggested clarification
- final long ticks = DAY_START_US + clock.getTicks() % MICROS_PER_DAY; + final long ticks = DAY_START_US + (clock.getTicks() % MICROS_PER_DAY);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@benchmarks/src/main/java/org/questdb/LineTCPSender03MultiTableMain.java` at line 65, The expression computing ticks is ambiguous at a glance; update the assignment of the ticks variable so the modulo applies to the clock ticks explicitly by changing DAY_START_US + clock.getTicks() % MICROS_PER_DAY to DAY_START_US + (clock.getTicks() % MICROS_PER_DAY), referencing the ticks variable, DAY_START_US, clock.getTicks(), and MICROS_PER_DAY.core/rust/qdbr/parquet2/src/metadata/column_chunk_metadata.rs (1)
168-186: Expose getters is good; mirror invariant validation for these fields.Since these offsets/lengths are now public API, consider validating non-negative values in
try_from_thriftjust like other offset fields.🔒 Suggested hardening in
try_from_thriftif let Some(meta) = &column_chunk.meta_data { let _: u64 = meta.total_compressed_size.try_into()?; @@ let _: u64 = meta.data_page_offset.try_into()?; + if let Some(v) = column_chunk.column_index_offset { + let _: u64 = v.try_into()?; + } + if let Some(v) = column_chunk.column_index_length { + let _: u32 = v.try_into()?; + } + if let Some(v) = column_chunk.offset_index_offset { + let _: u64 = v.try_into()?; + } + if let Some(v) = column_chunk.offset_index_length { + let _: u32 = v.try_into()?; + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/rust/qdbr/parquet2/src/metadata/column_chunk_metadata.rs` around lines 168 - 186, The new getters column_index_offset, column_index_length, offset_index_offset, and offset_index_length are public API and need the same non-negative validation applied in try_from_thrift as other offsets; update the try_from_thrift conversion (the function named try_from_thrift) to check these four fields from the incoming Thrift column_chunk and return an error (or map to None only when appropriate) if any offset/length is negative, mirroring the existing invariant checks used for other offset fields so the public getters never expose invalid negative values.core/rust/qdbr/src/parquet_read/decode.rs (1)
994-1079: Add focused regression tests for IPv4/GeoInt null decoding.The wiring looks correct, but this behavior change should be pinned with explicit tests for
ColumnTypeTag::IPv4andColumnTypeTag::GeoInt(plain/delta/dictionary, filtered/unfiltered) to prevent future sentinel regressions.✅ Suggested test additions
+ #[test] + fn test_decode_int32_null_sentinel_ipv4_plain_and_dict() { + // Build int32 pages with nulls and assert: + // - IPv4 null bytes decode to 0i32 + // - GeoInt null bytes decode to -1i32 + // Cover decode_page + decode_page_filtered::<true>. + } + + #[test] + fn test_decode_int32_null_sentinel_geoint_delta() { + // Build DeltaBinaryPacked int32 page with nulls and assert GeoInt null sentinel == -1i32 + // for both filtered and unfiltered paths. + }Also applies to: 2291-2357
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/rust/qdbr/src/parquet_read/decode.rs` around lines 994 - 1079, Add focused regression tests that exercise null decoding for ColumnTypeTag::IPv4 and ColumnTypeTag::GeoInt across encodings (Plain, DeltaBinaryPacked, RleDictionary/PlainDictionary) and both filtered/unfiltered code paths so we detect sentinel regressions; specifically, add tests that build pages using the same builders used elsewhere in tests and call into decode_page0_filtered (and the unfiltered path) with sinks like FixedIntColumnSink backed by DataPageFixedSlicer, DeltaBinaryPackedSlicer, and RleDictionarySlicer (and FixedDictDecoder) to decode rows containing the sentinel null values (use int32_null(column_type.tag())/LONG_NULL as appropriate) and assert the output buffers contain the expected null marker values and non-null values in correct positions. Ensure separate tests for ColumnTypeTag::IPv4 and ColumnTypeTag::GeoInt and for filtered vs unfiltered so future changes to decode_page0_filtered, FixedIntColumnSink, DeltaBinaryPackedSlicer, or RleDictionarySlicer will fail if sentinel handling regresses.core/src/main/java/io/questdb/cairo/DefaultCairoConfiguration.java (1)
751-759: Reorder these overrides to maintain alphabetical method grouping.Place
getPartitionEncoderParquetO3RewriteUnusedMaxBytes()andgetPartitionEncoderParquetO3RewriteUnusedRatio()beforegetPartitionEncoderParquetRowGroupSize()in this class.As per coding guidelines, "Java class members are grouped by kind (static vs. instance) and visibility, and sorted alphabetically."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/src/main/java/io/questdb/cairo/DefaultCairoConfiguration.java` around lines 751 - 759, Move the two overrides getPartitionEncoderParquetO3RewriteUnusedMaxBytes() and getPartitionEncoderParquetO3RewriteUnusedRatio() so they appear before getPartitionEncoderParquetRowGroupSize() in DefaultCairoConfiguration to restore alphabetical grouping of instance methods; ensure their signatures and return values remain unchanged and that method ordering follows the class's existing alphabetical grouping convention.core/src/main/java/io/questdb/cairo/CairoConfigurationWrapper.java (1)
734-742: Place the new getters in alphabetical order with adjacent instance members.These methods should be moved before
getPartitionEncoderParquetRowGroupSize()to keep member ordering consistent with class sorting rules.As per coding guidelines, "Java class members are grouped by kind (static vs. instance) and visibility, and sorted alphabetically."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/src/main/java/io/questdb/cairo/CairoConfigurationWrapper.java` around lines 734 - 742, Move the two instance getter methods getPartitionEncoderParquetO3RewriteUnusedMaxBytes() and getPartitionEncoderParquetO3RewriteUnusedRatio() so they appear alphabetically among adjacent instance members: place both methods before getPartitionEncoderParquetRowGroupSize() in CairoConfigurationWrapper to satisfy the class member ordering rules (instance members grouped and sorted alphabetically).core/src/main/java/io/questdb/cairo/CairoConfiguration.java (1)
469-472: Keep interface method declarations alphabetically ordered.Move these two methods above
getPartitionEncoderParquetRowGroupSize()so thegetPartitionEncoderParquet*block remains consistently sorted.As per coding guidelines, "Java class members are grouped by kind (static vs. instance) and visibility, and sorted alphabetically."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/src/main/java/io/questdb/cairo/CairoConfiguration.java` around lines 469 - 472, The two interface methods getPartitionEncoderParquetO3RewriteUnusedMaxBytes() and getPartitionEncoderParquetO3RewriteUnusedRatio() are out of alphabetical order; move both declarations so they appear above getPartitionEncoderParquetRowGroupSize() in CairoConfiguration to keep the getPartitionEncoderParquet* block alphabetically sorted per coding guidelines.core/src/main/java/io/questdb/PropServerConfiguration.java (1)
3850-3858: Keep new getters in alphabetical order within thePropCairoConfigurationmember block.These two getters should be placed before
getPartitionEncoderParquetRowGroupSize()to preserve member sorting conventions.As per coding guidelines: "
**/*.java: Java class members are grouped by kind (static vs. instance) and visibility, and sorted alphabetically."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/src/main/java/io/questdb/PropServerConfiguration.java` around lines 3850 - 3858, Move the two new getters getPartitionEncoderParquetO3RewriteUnusedMaxBytes() and getPartitionEncoderParquetO3RewriteUnusedRatio() so they appear before getPartitionEncoderParquetRowGroupSize() within the PropCairoConfiguration member block; this keeps instance getters alphabetically ordered as required by the class member sorting convention.core/src/main/java/io/questdb/cairo/TableWriter.java (1)
7321-7324: Please align new boolean local names with the repository boolean naming rule.Examples in these lines (
partitionIsReadOnly,lastPartitionIsParquet,canFastCommit,canFastCommitNew) should useis.../has...prefixes for consistency with the repo standard.As per coding guidelines: "When choosing a name for a boolean variable, field or method, always use the is... or has... prefix, as appropriate."
Also applies to: 7721-7737
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/src/main/java/io/questdb/cairo/TableWriter.java` around lines 7321 - 7324, Rename boolean locals to follow the is/has prefix rule: change partitionIsReadOnly to isPartitionReadOnly and isParquet to isPartitionParquet; similarly rename lastPartitionIsParquet to isLastPartitionParquet, canFastCommit to isFastCommitPossible, and canFastCommitNew to isFastCommitNewPossible (or another is/has-prefixed name you prefer). Update all declarations and every reference/usages in the TableWriter class (e.g., inside methods that call txWriter.isPartitionReadOnlyByRawIndex, txWriter.isPartitionParquetByRawIndex, and the logic around fast commit) so compilation and semantics remain the same.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@benchmarks/src/main/java/org/questdb/LineTCPSender03MultiTableMain.java`:
- Line 36: DAY_START_US currently hardcodes a past date (1_770_854_400_000_000L)
which pins all benchmark inserts into a stale partition; update
LineTCPSender03MultiTableMain to either (A) make the intent explicit by
renaming/commenting DAY_START_US as a fixed historical benchmark anchor if you
truly want a static past partition, or (B) compute a dynamic start-of-day in
microseconds at runtime and use that value in doSend so each run targets "today"
(e.g., derive epoch millis for UTC midnight using System.currentTimeMillis(),
truncate to days, multiply to micros) and replace references to the hardcoded
DAY_START_US with the computed value.
In `@claude_pr.md`:
- Around line 69-86: Add explicit fenced-code language tags to the untagged code
blocks: for the ASCII diagram block that contains the markers srcOooLo,
srcOooHi, rg0.min/rg1.min/rg2.min mark the fence with "text"; for the Java
snippet mentioned around the second block (lines referencing the other snippet)
mark that fence with "java". Locate the fences that wrap the diagram containing
srcOooLo/srcOooHi and the other fence that contains the Java example and prepend
the appropriate language tag after the opening triple backticks.
In `@core/rust/qdbr/parquet2/src/read/metadata.rs`:
- Around line 162-177: The branch that checks if the speculative read contains
the whole footer uses `if (footer_len as usize) < buffer.len()` and thus
performs a redundant second read when `footer_len == buffer.len()`; change the
condition to `<=` so the exact-fit case deserializes from `buffer` (via
`deserialize_metadata`) and avoids executing the second-read path that calls
`reader.seek`/`reader.take`; update any comments accordingly and keep uses of
`buffer.drain(..remaining)` and `buffer.truncate(metadata_len as usize)` as-is.
In `@core/rust/qdbr/parquet2/src/write/file.rs`:
- Around line 806-814: The code calls end_file_incremental(&mut self.writer,
metadata, footer_cache, &sources) which updates the passed-in metadata in-place
but does not persist those changes back to the object's stored metadata, leaving
self.metadata stale (so metadata() / into_inner_and_metadata() may return old
data); after the successful call and before setting State::Finished, assign or
replace self.metadata with the updated metadata (e.g., move or clone the updated
value from the local metadata variable into self.metadata) so the object's
metadata reflects the finalized state.
- Around line 725-814: Mode::Update lacks the same re-entry guard as
Mode::Write, allowing a second end() to re-emit footer/metadata and corrupt
output; fix by checking and updating the writer state at the start of the
Mode::Update arm (the match arm handling Mode::Update in file.rs) the same way
as Mode::Write: if self.state == State::Finished return an appropriate
error/early return, otherwise set self.state = State::Finished before calling
end_file_incremental (or immediately after computing len but before returning)
to ensure subsequent end() calls are rejected; update the Mode::Update block
around end_file_incremental and self.state handling to mirror the re-entry
protection used for Mode::Write.
- Around line 636-649: In the insert method for File (fn insert(row_group:
RowGroupIter<'_, E>, position: i16) -> Result<(), E>) avoid casting negative i16
positions to usize; instead validate position >= 0 up front and return an Err
converted via E::from(Error::...) when position is negative, and only call
self.add_row_group(..., position as usize) after the check; this prevents
negative positions from becoming huge usize values and preserves insertion
semantics.
In `@core/rust/qdbr/parquet2/src/write/footer_cache.rs`:
- Around line 67-70: scan_row_group_offsets currently bails out and returns
Ok(vec![]) when it fails to find the row_groups marker, which masks
malformed/unsupported footers; replace the early break/Ok(vec![]) paths with a
failing Result that surfaces a clear error. Specifically, in function
scan_row_group_offsets in footer_cache.rs, change the branch that triggers when
byte == 0 to return Err(...) with a descriptive message (e.g., "row_groups not
found in footer" or similar using the crate's error type or anyhow::anyhow), and
make the identical change at the other occurrence referenced (around the second
location corresponding to lines ~102-103) so the function fails fast instead of
returning an empty Vec.
In `@core/rust/qdbr/src/parquet_write/decimal.rs`:
- Around line 37-38: DECIMAL128_NULL and DECIMAL256_NULL are constructed with
i64::MIN.to_le_bytes()/i128::MIN.to_le_bytes(), which is inconsistent with
Decimal128::to_bytes()/Decimal256::to_bytes that use big-endian (to_be_bytes());
change the null constant constructions (DECIMAL128_NULL, DECIMAL256_NULL) to use
to_be_bytes() so their byte[0] ordering matches NativeType::to_bytes() behavior
and remove the endian mismatch.
In `@core/rust/qdbr/src/parquet_write/jni.rs`:
- Around line 27-29: Replace the panic!/assert! null-pointer checks in the JNI
entrypoints that currently do "if updater.is_null() { panic!(...)}" (and similar
asserts at the other locations) with the same error-handling pattern used in
copyRowGroup and sliceRowGroup: use fmt_err! to format an error and throw a
CairoException back to Java (e.g., fmt_err!(env, CairoException, "ParquetUpdater
pointer is null") or similar), ensuring each JNI function checks
updater.is_null() and returns early after throwing the exception instead of
panicking.
In `@core/rust/qdbr/src/parquet_write/update.rs`:
- Around line 402-462: The slice_row_group function uses row_hi + 1 and row_hi +
1 - row_lo without validating bounds, which can underflow/overflow before being
converted to u32; add explicit checks before any unsigned arithmetic: ensure
rg_idx is valid (already done), then fetch the row group row count (e.g. from
self.file_metadata.row_groups[rg_idx].num_rows) and validate that row_lo <=
row_hi and row_hi < num_rows, and additionally ensure row_hi + 1 fits in u32 (or
use checked_add) and that (row_hi + 1) - row_lo does not underflow; after these
checks compute the u32 values passed to decoder.decode_row_group and compute
row_count using checked_sub so the subsequent build_partition_from_decoded uses
a safe, validated row_count.
In `@core/src/main/java/io/questdb/cairo/O3ParquetMergeStrategy.java`:
- Around line 130-137: In computeMergeActions, guard against non-positive
maxRowGroupSize to prevent an infinite loop: validate maxRowGroupSize > 0 at the
start (or coerce it to a minimum of 1) before the loop that uses cursor and
chunkHi so the condition chunkHi < cursor can’t repeatedly hold without
advancing cursor; alternatively, ensure the loop always advances cursor when
maxRowGroupSize <= 0 (e.g., increment cursor or set chunkHi = cursor - 1) so the
cursor moves forward; update references in the logic that compute chunkHi and
advance cursor (the variables cursor, chunkHi, and maxRowGroupSize inside
computeMergeActions) accordingly.
- Around line 195-201: The code uses rgMax + 1 in the call to
Vect.boundedBinarySearchIndexT which overflows when rgMax == Long.MAX_VALUE;
change the logic so you handle that case explicitly: if rgMax == Long.MAX_VALUE
set overlapHi = srcOooHi directly, otherwise call
Vect.boundedBinarySearchIndexT(sortedTimestampsAddr, rgMax + 1, o3Cursor,
srcOooHi, Vect.BIN_SEARCH_SCAN_DOWN). Update the code around the
Vect.boundedBinarySearchIndexT invocation (referencing sortedTimestampsAddr,
rgMax, o3Cursor, srcOooHi, overlapHi) to avoid performing rgMax + 1 when rgMax
is Long.MAX_VALUE.
In `@core/src/main/java/io/questdb/cairo/O3PartitionJob.java`:
- Around line 2065-2094: The code currently passes reusable buffer capacities
(mergeDstBufs[...] values) into chunkDescriptor.addColumn, which can include
stale bytes; modify the copy workflow so O3CopyJob.mergeCopy returns (or
exposes) the actual number of bytes written for data and aux for this chunk
(e.g., return a small struct/long pair or write back into mergeDstBufs
positions), then use those returned actual sizes instead of the original
dstDataSize/dstAuxSize when calling chunkDescriptor.addColumn; update the call
sites (the O3PartitionJob calls around O3CopyJob.mergeCopy and subsequent
chunkDescriptor.addColumn) to capture the producedDataSize and producedAuxSize
and pass them in place of dstDataSize and dstAuxSize.
In `@core/src/main/java/io/questdb/PropServerConfiguration.java`:
- Around line 1989-1990: Replace the brittle getLong/getDouble usage for
partitionEncoderParquetO3RewriteUnusedMaxBytes and
partitionEncoderParquetO3RewriteUnusedRatio with robust parsing and validation:
read the raw string for
PropertyKey.CAIRO_PARTITION_ENCODER_PARQUET_O3_REWRITE_UNUSED_MAX_BYTES and
parse it with a size-aware parser that supports suffixes (e.g., "k", "m", "g")
instead of getLong, ensure the parsed value assigned to
partitionEncoderParquetO3RewriteUnusedMaxBytes is non-negative (and at least 0
or a sensible minimum) and throw or surface a clear error if invalid; similarly,
parse the raw string for
PropertyKey.CAIRO_PARTITION_ENCODER_PARQUET_O3_REWRITE_UNUSED_RATIO into a
double, validate that partitionEncoderParquetO3RewriteUnusedRatio is within
[0.0,1.0] (or clamp/report), and use the existing default values only when
parsing is absent/empty—apply these checks where these fields are initialized
(referencing the field names partitionEncoderParquetO3RewriteUnusedMaxBytes,
partitionEncoderParquetO3RewriteUnusedRatio and the PropertyKey constants).
In
`@core/src/test/java/io/questdb/test/cairo/fuzz/O3ParquetMergeStrategyFuzzTest.java`:
- Around line 210-225: The comment above the Parquet row-group size check is
incorrect: it says "exceeds 2x" but the assertion in
O3ParquetMergeStrategyFuzzTest (within the try block using
engine.getReader(walTable), reader.openPartition(i), PartitionDecoder decoder =
reader.getAndInitParquetPartitionDecoders(i), and meta.getRowGroupSize(rg))
enforces a 1.5x bound (rowGroupSize + rowGroupSize / 2). Update the comment to
reflect the actual invariant (e.g., "Verify that no row group in the Parquet
partition exceeds 1.5x the configured size.") so the comment and the assertion
are consistent.
In `@core/src/test/java/io/questdb/test/cairo/O3ParquetMergeStrategyTest.java`:
- Around line 40-405: Wrap each test that calls allocateSortedTimestamps (e.g.,
testCustomSmallRowGroupThreshold, testMixedOverlapAndGap, testNoRowGroups,
testO3AfterAllRowGroups, testO3BeforeAllRowGroups,
testO3InGapBetweenLargeRowGroups, testO3InGapMergedIntoSmallNextRowGroup,
testO3InGapMergedIntoSmallPreviousRowGroup, testSingleRowGroupWithOverlap and
any others using allocateSortedTimestamps) with assertMemoryLeak(() -> { ... });
so native allocations from Unsafe.malloc are checked and freed; place the
existing test body inside the lambda and keep teardown (freeSortedTimestamps)
inside that lambda. Ensure tests that do not allocate native memory remain
unchanged.
In
`@core/src/test/java/io/questdb/test/griffin/AlterTableConvertPartitionTest.java`:
- Around line 172-182: Replace the two assertSql(...) calls in
AlterTableConvertPartitionTest with assertQueryNoLeakCheck(...) so the test uses
the leak-check-free assertion helper; specifically, swap the assertSql that
checks "count\n11\n" with assertQueryNoLeakCheck(expectedString, "SELECT count()
FROM x", sqlExecutionContextOrFactory) and likewise replace the assertSql that
checks "id\tdesignated_ts\n100\t..." for "SELECT id, designated_ts FROM x WHERE
id = 100" with assertQueryNoLeakCheck(expectedString, "SELECT id, designated_ts
FROM x WHERE id = 100", sqlExecutionContextOrFactory) (preserve the exact
expected strings and timestampType conditional), using the same context/factory
argument pattern used elsewhere in this test class.
- Around line 167-182: The current INSERT in AlterTableConvertPartitionTest (the
execute(...) call that writes '2024-06-15T00:00:00.000000Z') does not produce an
O3 path because that timestamp is after the generated range; change the inserted
timestamp in the execute("INSERT INTO x(id, designated_ts) VALUES (...)") call
to a value strictly less than the table's current max timestamp (e.g., a date
earlier than the generated partition range) so the write becomes out-of-order
and triggers the O3 merge path, keep drainWalQueue() and the subsequent
assertSql checks unchanged but adjust the expected designated_ts string
construction (the TestTimestampType branch) if you pick a different timestamp
format.
In `@core/src/test/java/io/questdb/test/griffin/ParquetWriteTest.java`:
- Around line 96-112: In ParquetWriteTest.java replace the test assertions that
call assertSql(...) with assertQueryNoLeakCheck(...), keeping the same
expected-result string and SQL query arguments (e.g., the block asserting
"SELECT * FROM x") so the semantics remain identical; update the three
occurrences (the shown block and the other two blocks referenced) to call
assertQueryNoLeakCheck(expected, "SELECT ...") instead of assertSql to comply
with the test helper requirement.
- Around line 42-243: Add assertions that explicitly verify a REWRITE happened
by checking rewrite-specific state after the second O3/convert operations in
ParquetWriteTest (methods testRewriteAbsoluteUnusedBytesThreshold,
testRewriteSingleRowGroup, testRewriteUnusedBytesRatioThreshold): for example,
obtain the partition metadata (via PartitionDecoder.Metadata) and assert
getUnusedBytes() was reset or that the partition's version/path changed after
drainWalQueue() following the operation expected to trigger a REWRITE; place
these checks right after the second drainWalQueue() (before the final assertSql)
so the test fails if only UPDATE-path behavior occurred.
---
Outside diff comments:
In `@core/src/main/java/io/questdb/cairo/O3PartitionJob.java`:
- Around line 376-396: The rewrite-mode rollback currently only removes the
rewrite txn directory inside the inner try but misses exceptions thrown after
that (e.g. in updateParquetIndexes), leaving orphaned txn directories; update
O3PartitionJob to ensure the rewrite txn directory (the txn named dir chosen via
txnName when isRewrite is true) is always removed on failure by adding cleanup
in a finally or in the outer catch that checks isRewrite and removes the txn
directory referenced by path/pathToTable (use the same logic as
setPathForParquetPartition to locate the directory) when any exception occurs
after the inner rewrite block (including failures from updateParquetIndexes),
and reuse the same path/ff utilities to delete the directory to avoid
duplication.
In `@core/src/main/java/io/questdb/cairo/TableWriter.java`:
- Around line 7721-7737: The unconditional call to
applyFromWalLagToLastPartition(...) must be guarded against Parquet last
partitions: check lastPartitionIsParquet (or call isLastPartitionParquet())
before invoking applyFromWalLagToLastPartition in the same way canFastCommit and
canFastCommitNew do; only call applyFromWalLagToLastPartition when
!lastPartitionIsParquet and the existing conditions (e.g., commitToTimestamp,
totalUncommitted, lagOrderedNew) are met so we avoid performing native
append-position updates on a Parquet last partition.
---
Nitpick comments:
In `@benchmarks/src/main/java/org/questdb/LineTCPSender03MultiTableMain.java`:
- Line 65: The expression computing ticks is ambiguous at a glance; update the
assignment of the ticks variable so the modulo applies to the clock ticks
explicitly by changing DAY_START_US + clock.getTicks() % MICROS_PER_DAY to
DAY_START_US + (clock.getTicks() % MICROS_PER_DAY), referencing the ticks
variable, DAY_START_US, clock.getTicks(), and MICROS_PER_DAY.
In `@core/rust/qdbr/parquet2/src/metadata/column_chunk_metadata.rs`:
- Around line 168-186: The new getters column_index_offset, column_index_length,
offset_index_offset, and offset_index_length are public API and need the same
non-negative validation applied in try_from_thrift as other offsets; update the
try_from_thrift conversion (the function named try_from_thrift) to check these
four fields from the incoming Thrift column_chunk and return an error (or map to
None only when appropriate) if any offset/length is negative, mirroring the
existing invariant checks used for other offset fields so the public getters
never expose invalid negative values.
In `@core/rust/qdbr/src/parquet_read/decode.rs`:
- Around line 994-1079: Add focused regression tests that exercise null decoding
for ColumnTypeTag::IPv4 and ColumnTypeTag::GeoInt across encodings (Plain,
DeltaBinaryPacked, RleDictionary/PlainDictionary) and both filtered/unfiltered
code paths so we detect sentinel regressions; specifically, add tests that build
pages using the same builders used elsewhere in tests and call into
decode_page0_filtered (and the unfiltered path) with sinks like
FixedIntColumnSink backed by DataPageFixedSlicer, DeltaBinaryPackedSlicer, and
RleDictionarySlicer (and FixedDictDecoder) to decode rows containing the
sentinel null values (use int32_null(column_type.tag())/LONG_NULL as
appropriate) and assert the output buffers contain the expected null marker
values and non-null values in correct positions. Ensure separate tests for
ColumnTypeTag::IPv4 and ColumnTypeTag::GeoInt and for filtered vs unfiltered so
future changes to decode_page0_filtered, FixedIntColumnSink,
DeltaBinaryPackedSlicer, or RleDictionarySlicer will fail if sentinel handling
regresses.
In `@core/src/main/java/io/questdb/cairo/CairoConfiguration.java`:
- Around line 469-472: The two interface methods
getPartitionEncoderParquetO3RewriteUnusedMaxBytes() and
getPartitionEncoderParquetO3RewriteUnusedRatio() are out of alphabetical order;
move both declarations so they appear above
getPartitionEncoderParquetRowGroupSize() in CairoConfiguration to keep the
getPartitionEncoderParquet* block alphabetically sorted per coding guidelines.
In `@core/src/main/java/io/questdb/cairo/CairoConfigurationWrapper.java`:
- Around line 734-742: Move the two instance getter methods
getPartitionEncoderParquetO3RewriteUnusedMaxBytes() and
getPartitionEncoderParquetO3RewriteUnusedRatio() so they appear alphabetically
among adjacent instance members: place both methods before
getPartitionEncoderParquetRowGroupSize() in CairoConfigurationWrapper to satisfy
the class member ordering rules (instance members grouped and sorted
alphabetically).
In `@core/src/main/java/io/questdb/cairo/DefaultCairoConfiguration.java`:
- Around line 751-759: Move the two overrides
getPartitionEncoderParquetO3RewriteUnusedMaxBytes() and
getPartitionEncoderParquetO3RewriteUnusedRatio() so they appear before
getPartitionEncoderParquetRowGroupSize() in DefaultCairoConfiguration to restore
alphabetical grouping of instance methods; ensure their signatures and return
values remain unchanged and that method ordering follows the class's existing
alphabetical grouping convention.
In `@core/src/main/java/io/questdb/cairo/TableWriter.java`:
- Around line 7321-7324: Rename boolean locals to follow the is/has prefix rule:
change partitionIsReadOnly to isPartitionReadOnly and isParquet to
isPartitionParquet; similarly rename lastPartitionIsParquet to
isLastPartitionParquet, canFastCommit to isFastCommitPossible, and
canFastCommitNew to isFastCommitNewPossible (or another is/has-prefixed name you
prefer). Update all declarations and every reference/usages in the TableWriter
class (e.g., inside methods that call txWriter.isPartitionReadOnlyByRawIndex,
txWriter.isPartitionParquetByRawIndex, and the logic around fast commit) so
compilation and semantics remain the same.
In `@core/src/main/java/io/questdb/PropServerConfiguration.java`:
- Around line 3850-3858: Move the two new getters
getPartitionEncoderParquetO3RewriteUnusedMaxBytes() and
getPartitionEncoderParquetO3RewriteUnusedRatio() so they appear before
getPartitionEncoderParquetRowGroupSize() within the PropCairoConfiguration
member block; this keeps instance getters alphabetically ordered as required by
the class member sorting convention.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (5)
core/src/main/resources/io/questdb/bin/darwin-aarch64/libquestdbr.dylibis excluded by!**/*.dylibcore/src/main/resources/io/questdb/bin/darwin-x86-64/libquestdbr.dylibis excluded by!**/*.dylibcore/src/main/resources/io/questdb/bin/linux-aarch64/libquestdbr.sois excluded by!**/*.socore/src/main/resources/io/questdb/bin/linux-x86-64/libquestdbr.sois excluded by!**/*.socore/src/main/resources/io/questdb/bin/windows-x86-64/questdbr.dllis excluded by!**/*.dll
📒 Files selected for processing (34)
benchmarks/src/main/java/org/questdb/LineTCPSender03MultiTableMain.javaclaude_pr.mdcore/rust/qdbr/parquet2/src/metadata/column_chunk_metadata.rscore/rust/qdbr/parquet2/src/metadata/row_metadata.rscore/rust/qdbr/parquet2/src/read/metadata.rscore/rust/qdbr/parquet2/src/read/mod.rscore/rust/qdbr/parquet2/src/write/file.rscore/rust/qdbr/parquet2/src/write/footer_cache.rscore/rust/qdbr/parquet2/src/write/mod.rscore/rust/qdbr/src/parquet/qdb_metadata.rscore/rust/qdbr/src/parquet_read/decode.rscore/rust/qdbr/src/parquet_read/jni.rscore/rust/qdbr/src/parquet_read/meta.rscore/rust/qdbr/src/parquet_read/mod.rscore/rust/qdbr/src/parquet_read/slicer/mod.rscore/rust/qdbr/src/parquet_write/decimal.rscore/rust/qdbr/src/parquet_write/file.rscore/rust/qdbr/src/parquet_write/jni.rscore/rust/qdbr/src/parquet_write/update.rscore/src/main/java/io/questdb/PropServerConfiguration.javacore/src/main/java/io/questdb/PropertyKey.javacore/src/main/java/io/questdb/cairo/CairoConfiguration.javacore/src/main/java/io/questdb/cairo/CairoConfigurationWrapper.javacore/src/main/java/io/questdb/cairo/DefaultCairoConfiguration.javacore/src/main/java/io/questdb/cairo/O3ParquetMergeStrategy.javacore/src/main/java/io/questdb/cairo/O3PartitionJob.javacore/src/main/java/io/questdb/cairo/TableWriter.javacore/src/main/java/io/questdb/griffin/engine/table/parquet/PartitionDecoder.javacore/src/main/java/io/questdb/griffin/engine/table/parquet/PartitionUpdater.javacore/src/test/java/io/questdb/test/cairo/O3ParquetMergeStrategyTest.javacore/src/test/java/io/questdb/test/cairo/fuzz/O3ParquetMergeStrategyFuzzTest.javacore/src/test/java/io/questdb/test/griffin/AlterTableConvertPartitionTest.javacore/src/test/java/io/questdb/test/griffin/ParquetWriteTest.javacore/src/test/java/io/questdb/test/griffin/engine/table/parquet/PartitionUpdaterTest.java
benchmarks/src/main/java/org/questdb/LineTCPSender03MultiTableMain.java
Outdated
Show resolved
Hide resolved
core/src/test/java/io/questdb/test/cairo/parquet/O3ParquetMergeStrategyTest.java
Show resolved
Hide resolved
core/src/test/java/io/questdb/test/griffin/AlterTableConvertPartitionTest.java
Show resolved
Hide resolved
core/src/test/java/io/questdb/test/griffin/AlterTableConvertPartitionTest.java
Show resolved
Hide resolved
core/src/test/java/io/questdb/test/cairo/parquet/ParquetWriteTest.java
Outdated
Show resolved
Hide resolved
core/src/test/java/io/questdb/test/cairo/parquet/ParquetWriteTest.java
Outdated
Show resolved
Hide resolved
…et-writing-size-grow
…et-writing-size-grow
|
@ideoma — code review findings (Critical + Moderate): CriticalC1. O3 merge crashes or reads garbage when parquet statistics are disabled
partitionDecoder.readRowGroupStats(rowGroupStatBuffers, parquetColumns, rg);
final long rgMin = rowGroupStatBuffers.getMinValueLong(0);
final long rgMax = rowGroupStatBuffers.getMaxValueLong(0);When
The existing Suggested fix: Use the safe ModerateM1. Rust: Negative
Suggested fix: Add M2. Rust:
debug_assert!(!partitions.is_empty(), "partitions cannot be empty");
let num_columns = partitions[0].columns.len();
Suggested fix: Replace with M3. Missing test: multi-chunk MERGE output splitting
M4. Missing test: COPY_O3 integration path (gap-only O3) No deterministic integration test inserts O3 data entirely between two existing row groups (producing only COPY_O3 actions, zero MERGE actions). The strategy computation is tested in M5. Missing test: rewrite failure during Only M6. No NULL timestamp test in merge strategy No test in |
|
Thanks for the thorough review. All items addressed: C1 — Fixed (383ad59)Two changes:
Test: M1 — Fixed (75d6cc6)Added M2 — Fixed (75d6cc6)Replaced
M3 — Test added (6bb247a)
M4 — No changeThe M5 — Test added (a53a49d)
M6 — No changeThe designated timestamp column cannot be NULL in QuestDB — this is enforced at ingestion time ( |
…et-writing-size-grow
[PR Coverage check]😍 pass : 2221 / 2502 (88.77%) file detail
|
Summary
unused_bytescounter tracked in parquet metadata drives the rewrite decision.O3ParquetMergeStrategyclass now computes merge/copy/split actions up-front using min/max timestamp overlap detection. Row groups that exceed the configured size get split into multiple output groups.Changes
Parquet file rewrite mode
O3PartitionJob.processParquetPartition()now evaluates whether to rewrite or update in-place. Rewrite triggers when the file has a single row group, whenunused_bytes / file_sizeexceeds a configurable ratio (default 0.5), or when absolute unused bytes exceeds a threshold (default 1 GB). In rewrite mode, thePartitionUpdaterreceives separate reader/writer file descriptors — the reader points to the old file, the writer to a fresh file. Untouched row groups are raw-copied (byte-range memcpy with adjusted thrift offsets, no decode/re-encode). After commit, the old partition directory is queued for removal.New config properties:
cairo.partition.encoder.parquet.o3.rewrite.unused.ratio(default0.5)cairo.partition.encoder.parquet.o3.rewrite.unused.max.bytes(default1g)Merge strategy with row group splitting
O3ParquetMergeStrategyreplaces the old iterative merge loop. It reads min/max stats for all row groups, detects true timestamp overlaps, identifies O3 data in gaps between row groups, and outputs a sequence ofMergeActions:MERGE— overlapping O3 data merged with a row group sliceCOPY_ROW_GROUP_SLICE— row group (or sub-range) copied without modificationCOPY_O3— new row group created from O3 data aloneSmall row groups (< 4096 rows) adjacent to a gap absorb the gap's O3 data to avoid proliferation of tiny row groups. Merged row groups that exceed the configured
rowGroupSizeget split into multiple output groups.Unused bytes tracking
ParquetUpdater(Rust) accumulates the compressed size of replaced row groups plus the old footer into anunused_bytescounter, serialized into the parquet file's QDB metadata.PartitionDecoderexposes this value to Java viagetUnusedBytes().Last partition Parquet support for WAL tables
TableWriternow allowsCONVERT PARTITION TO PARQUETon the last partition of WAL tables. Key guards:openLastPartitionAndSetAppendPosition()andinitLastPartition()skip native file operations when the last partition is ParquetprocessWalCommit()does not throw when the last partition is closed and Parquet; setsneedFullCommit = trueso all WAL data goes through O3 instead of LAGcanFastCommit/canFastCommitNew) is disabled for Parquet last partitionsprocessO3Block()setsappend = falsefor Parquet partitions to avoid mapping native column filesfinishO3Commit()skipsopenPartitionandsetAppendPositionfor Parquet last partitionsRust: new
ParquetUpdateroperationscopy_row_group()— raw-copies a row group from reader to writer, adjusting thrift metadata offsetsslice_row_group()— decodes a row sub-range, extracts symbol tables, re-encodesinsert_row_group()— inserts a new row group at a specific positionParquetFilegainsinsert(),write_raw_row_group(), andensure_started()methodsPerformance
TSBS run, writing to parquet:
Writing to native format:
Note that the speed variation is marginal; the difference is within the measurement error.
The biggest benefit is that parquet writing uses 1/3 of the IO throughput. In this run, parquet writes 150MB/s while native writes 500MB/s.