fix(sql): support more Parquet field type combinations in read_parquet#6069
fix(sql): support more Parquet field type combinations in read_parquet#6069bluestreak01 merged 16 commits intomasterfrom
Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the ✨ Finishing Touches🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
Status, Documentation and Community
|
|
GitHub Actions - Rebuild Native Libraries seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
There was a problem hiding this comment.
Pull Request Overview
This PR enhances the Parquet file reading functionality by supporting more field types and gracefully handling unsupported column types. Instead of failing when encountering unsupported field types, the read_parquet() SQL function now skips them and logs the skipped columns, allowing it to read whatever data it can from the file.
- Added support for more Parquet field types by relaxing type matching constraints
- Implemented graceful handling of unsupported column types by skipping them during processing
- Added external/internal distinction for Parquet files to differentiate between external files and table partition files
Reviewed Changes
Copilot reviewed 24 out of 30 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
core/src/test/resources/sqllogictest/test/parquet/map_duckdb.test |
Test case verifying unsupported column types are properly skipped |
core/src/main/java/io/questdb/griffin/engine/table/parquet/PartitionDecoder.java |
Modified metadata handling to skip unsupported columns and log them |
core/src/main/java/io/questdb/griffin/engine/functions/table/ReadParquetRecordCursor.java |
Updated cursor logic to handle unsupported columns in metadata comparison |
core/src/main/java/io/questdb/cairo/sql/PageFrameAddressCache.java |
Added external flag to distinguish between external and partition files |
core/rust/qdbr/src/parquet_read/meta.rs |
Enhanced type mapping to support more Parquet types and handle unsupported ones |
core/rust/qdb-core/src/col_type.rs |
Added Undefined column type for unsupported columns |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
core/src/main/java/io/questdb/griffin/engine/functions/table/ReadParquetRecordCursor.java
Outdated
Show resolved
Hide resolved
…types' into puzpuzpuz_unsupported_parquet_types
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (6)
core/src/main/java/io/questdb/griffin/engine/functions/table/ReadParquetPageFrameCursor.java (1)
124-143: Resource leak when metadata changes — close/unmap fd/addr before throwingIf metadataHasChanged(...) is true, TableReferenceOutOfDateException is thrown after opening and mapping the file. Because this happens inside of(LPSZ), the caller may not have a handle to close this cursor yet, leaking fd and mmap. Wrap initialization in a try/fail block and clean up on error.
Apply this diff to make resource acquisition exception-safe:
public void of(LPSZ path) { - // Reopen the file, it could have changed - this.fd = TableUtils.openRO(ff, path, LOG); - this.fileSize = ff.length(fd); - this.addr = TableUtils.mapRO(ff, fd, fileSize, MemoryTag.MMAP_PARQUET_PARTITION_DECODER); - decoder.of(addr, fileSize, MemoryTag.NATIVE_PARQUET_PARTITION_DECODER); - if (metadataHasChanged(metadata, decoder)) { - // We need to recompile the factory as the Parquet metadata has changed. - throw TableReferenceOutOfDateException.of(path); - } - - columnIndexes.clear(); - for (int i = 0, n = metadata.getColumnCount(); i < n; i++) { - columnIndexes.add(i); - } - this.rowCount = decoder.metadata().rowCount(); - this.rowGroupCount = decoder.metadata().rowGroupCount(); - - toTop(); + long newFd = -1; + long newAddr = 0; + long newFileSize = 0; + try { + // Reopen the file, it could have changed + newFd = TableUtils.openRO(ff, path, LOG); + newFileSize = ff.length(newFd); + newAddr = TableUtils.mapRO(ff, newFd, newFileSize, MemoryTag.MMAP_PARQUET_PARTITION_DECODER); + decoder.of(newAddr, newFileSize, MemoryTag.NATIVE_PARQUET_PARTITION_DECODER); + if (metadataHasChanged(metadata, decoder)) { + // Recompile: release newly opened resources before signaling to caller. + throw TableReferenceOutOfDateException.of(path); + } + + // Commit newly opened resources to fields only after success + this.fd = newFd; + this.addr = newAddr; + this.fileSize = newFileSize; + newFd = -1; // prevent cleanup in finally + newAddr = 0; + newFileSize = 0; + + columnIndexes.clear(); + for (int i = 0, n = metadata.getColumnCount(); i < n; i++) { + columnIndexes.add(i); + } + this.rowCount = decoder.metadata().rowCount(); + this.rowGroupCount = decoder.metadata().rowGroupCount(); + + toTop(); + } finally { + // Cleanup only if we failed before committing to fields + if (newAddr != 0) { + ff.munmap(newAddr, newFileSize, MemoryTag.MMAP_PARQUET_PARTITION_DECODER); + } + if (newFd != -1) { + ff.close(newFd); + } + } }Additionally, consider documenting in a Javadoc that external cursors may return null for symbol tables and bitmap indexes to set expectations for downstream consumers.
core/src/main/java/io/questdb/cairo/TableWriter.java (1)
1628-1656: Correct column indexing to avoid mismatched writes
ThecopyToSansUnsupported(...)call does not populate thewriterIndexon eachTableColumnMetadata—it defaults to –1 (you can see the 6-arg constructor calls the 11-arg overload withwriterIndex = -1)—so usingmetadata.getColumnMetadata(i).getWriterIndex()will always return –1. Instead, pull the true writer index directly from the Parquet decoder viaparquetMetadata.columnId(i).Please update
core/src/main/java/io/questdb/cairo/TableWriter.java(around lines 1630–1656) as follows:• Map the filtered metadata index to the table’s writer index before you call
getColumnNameTxn(...)and before you add IDs toparquetColumnIdsAndTypes.
• Use thatwriterColIndexeverywhere you previously usedifor the writer’s column ID.@@ core/src/main/java/io/questdb/cairo/TableWriter.java - for (int i = 0, n = metadata.getColumnCount(); i < n; i++) { + for (int i = 0, n = metadata.getColumnCount(); i < n; i++) { final int columnType = metadata.getColumnType(i); final String columnName = metadata.getColumnName(i); - final long columnNameTxn = getColumnNameTxn(partitionTimestamp, i); + // map filtered metadata index → physical writer index + final int writerColIndex = parquetMetadata.columnId(i); + final long columnNameTxn = getColumnNameTxn(partitionTimestamp, writerColIndex); - parquetColumnIdsAndTypes.add(i); + parquetColumnIdsAndTypes.add(writerColIndex); parquetColumnIdsAndTypes.add(columnType);With this change, both your transaction lookups (
getColumnNameTxn) and the arrays tracking file descriptors/types will use the correct, absolute writer column index rather than the filtered Parquet‐metadata index.core/src/main/java/io/questdb/griffin/engine/functions/table/ReadParquetFunctionFactory.java (1)
82-92: Add missing column type to skip log for unsupported columnsThe current log in PartitionDecoder only emits the column name when skipping an unsupported column, without its type. To help users reconcile missing fields, include both name and type in the log.
Affected location:
- core/src/main/java/io/questdb/griffin/engine/table/parquet/PartitionDecoder.java, around line 255
Suggested change:
- LOG.info().$("unsupported column type, skipping [column=").$(columnName).I$(); + LOG.info() + .$("unsupported column type, skipping [column=").$(columnName) + .$(", type=").$(ColumnType.nameOf(columnType)) + .I$();core/src/main/java/io/questdb/cairo/sql/PageFrameMemoryPool.java (1)
295-306: Guard against unmapped columns during final remapping to avoid accidental index 0 fallback.If addressCache metadata ever contains a column without a toParquet mapping (e.g., future decoder changes), getQuick(...) may return the default zero, silently misrouting to column 0. A small guard makes this path robust with negligible cost.
Apply this diff:
- // Now do the final remapping. + // Now do the final remapping. parquetColumns.clear(); fromParquetColumnIndexes.clear(); fromParquetColumnIndexes.setAll(parquetMetadata.columnCount(), -1); - for (int i = 0, n = addressCache.getColumnCount(); i < n; i++) { - final int columnIndex = addressCache.getColumnIndexes().getQuick(i); - final int parquetColumnIndex = toParquetColumnIndexes.getQuick(columnIndex); - final int columnType = addressCache.getColumnTypes().getQuick(i); - parquetColumns.add(parquetColumnIndex); - fromParquetColumnIndexes.setQuick(parquetColumnIndex, i); - parquetColumns.add(columnType); - } + final int parquetColumnCount = parquetMetadata.columnCount(); + for (int i = 0, n = addressCache.getColumnCount(); i < n; i++) { + final int columnIndex = addressCache.getColumnIndexes().getQuick(i); + final int parquetColumnIndex = toParquetColumnIndexes.getQuick(columnIndex); + if (parquetColumnIndex < 0 || parquetColumnIndex >= parquetColumnCount) { + // Column is not present or was filtered out; leave zeros for this column. + continue; + } + final int columnType = addressCache.getColumnTypes().getQuick(i); + parquetColumns.add(parquetColumnIndex); + fromParquetColumnIndexes.setQuick(parquetColumnIndex, i); + parquetColumns.add(columnType); + }core/rust/qdbr/src/parquet_read/decode.rs (2)
447-452: Wrong error sentinel type: Err(true) is not a ParquetErrorencoding_error is declared as a bool and used as an error. This won’t compile and masks the intent. Use a ParquetError sentinel via fmt_err! so all Err(...) arms are typed correctly.
Apply:
- let encoding_error = true; + // sentinel error used to bubble up and re-map to a richer Unsupported message below + let encoding_error = fmt_err!(Unsupported, "unsupported encoding sentinel");
1212-1227: Return type mismatch: decode_page() returns ParquetResult<()> but tries to return row_countdecoding_result must be mapped to Ok(()) on success. Returning Ok(row_count) breaks the signature.
Apply:
- match decoding_result { - Ok(row_count) => Ok(row_count), - Err(_) => Err(fmt_err!( + match decoding_result { + Ok(()) => Ok(()), + Err(_) => Err(fmt_err!( Unsupported, "encoding not supported, physical type: {:?}, \ encoding {:?}, \ logical type {:?}, \ converted type: {:?}, \ column type {:?}", page.descriptor.primitive_type.physical_type, page.encoding(), page.descriptor.primitive_type.logical_type, page.descriptor.primitive_type.converted_type, column_type, - )), + )), }
♻️ Duplicate comments (1)
core/src/main/java/io/questdb/griffin/engine/functions/table/ReadParquetRecordCursor.java (1)
100-125: Fix looks correct: metadata change detection now accounts for skipped/undefined columnsIterating parquet metadata and skipping Undefined while comparing against metadata, plus allowing SYMBOL→VARCHAR remap, addresses the earlier false positives. The final check metadataIndex != metadata.getColumnCount() is correct.
🧹 Nitpick comments (22)
core/rust/qdbr/src/parquet_write/schema.rs (1)
264-269: Good catch-all for unsupported/undefined types; consider tightening the error message and add a smoke test.Adding the default arm ensures Undefined/unknown tags don’t silently slip through and fail later. Optional: include the tag name to make logs easier to scan during ops.
Apply this diff to enrich the message:
- _ => Err(fmt_err!( - InvalidType, - "unexpected type {} for column {}", - column_type, - column_name, - )), + _ => Err(fmt_err!( + InvalidType, + "unexpected type {} ({}) for column {}", + column_type.tag().name(), + column_type, + column_name, + )),Add a unit/integration test that attempts to write a column with an unsupported type (e.g., Undefined or non-Double array) and asserts the exact error text. Do you want me to draft it?
core/rust/qdb-core/src/col_type.rs (2)
32-32: Undefined tag introduction looks sane; clarify intended usage and update docs.Defining
Undefined = 0and exposing it viaVALUESandname()is consistent. However, deserialization viaTryFrom<i32>still rejectsv <= 0, which means aColumnTypewith code 0 cannot be constructed from a serialized value (only vianew). If that’s intentional (Undefined only as an internal/ephemeral tag), please add a brief comment to the enum explaining thatUndefinedis not a valid on-the-wireColumnTypecode.Proposed doc tweak:
#[repr(u8)] #[derive(Debug, Copy, Clone, PartialEq)] pub enum ColumnTypeTag { - Undefined = 0, + /// Placeholder for unsupported/unknown types. + /// Note: ColumnType::try_from(i32) rejects code <= 0, so Undefined is not a valid serialized code. + Undefined = 0,Also applies to: 60-85, 123-147
214-215: Update comment forOption<ColumnType>layout change
You switched thecodefield fromNonZeroI32(which provides a niche forOption<ColumnType>) to a plaini32, so the existing comment on lines 213–215 is no longer accurate. Please update it to something like:- // Optimization so `Option<ColumnType>` is the same size as `ColumnType`. + // Note: With a plain `i32` code, `Option<ColumnType>` is now larger than `ColumnType` (no niche).Audit any FFI or performance-critical code paths that might rely on the smaller
Option<ColumnType>layout.
This update should also be applied at lines 221, 226, 231, 242 and 308 incore/rust/qdb-core/src/col_type.rs.core/rust/qdb-core/src/col_driver/mod.rs (1)
60-87: ✅try_lookup_driverfully migrated; next up: add Array/Undefined testsI ran a broad search and found no remaining references to the old
lookup_driverAPI—everything now correctly usestry_lookup_driver. As suggested, please add the following tests tocore/rust/qdb-core/src/col_driver/mod.rs(inside the existingmod testsblock):
- Positive coverage for
Array:fn test_lookup_driver() { let cases = vec![ @@ (ColumnTypeTag::Varchar.into_type(), "varchar"), + (ColumnTypeTag::Array.into_type(), "array"), ]; for (col_type, exp_descr) in cases.iter().copied() { let driver = try_lookup_driver(col_type).unwrap(); assert_eq!(driver.descr(), exp_descr); } }- Negative coverage for
Undefined:+ #[test] + fn test_lookup_driver_undefined_errors() { + // Undefined via code 0 should error + let undefined = ColumnType::new(ColumnTypeTag::Undefined, 0); + assert!(try_lookup_driver(undefined).is_err()); + }After adding these, we’ll have full confidence in the renamed API and its behavior at the type boundary.
core/src/main/java/io/questdb/griffin/engine/functions/table/ReadParquetPageFrameCursor.java (1)
80-90: Close order nit: unmap before freeing decoder (if decoder references mapped region)Currently, close() frees the decoder before unmapping the file. If PartitionDecoder holds pointers into the mapped region and tries to touch them during free(), this may be safe or may read unmapped memory depending on implementation. Verify that decoder.free() does not access the mapping. If unsure, swap the order: unmap mapping first, then free decoder state.
Proposed order (only if safe per PartitionDecoder semantics):
- Misc.free(decoder); - if (fd != -1) { - ff.close(fd); - fd = -1; - } - if (addr != 0) { - ff.munmap(addr, fileSize, MemoryTag.MMAP_PARQUET_PARTITION_DECODER); - addr = 0; - } + if (addr != 0) { + ff.munmap(addr, fileSize, MemoryTag.MMAP_PARQUET_PARTITION_DECODER); + addr = 0; + } + if (fd != -1) { + ff.close(fd); + fd = -1; + } + Misc.free(decoder);core/src/main/java/io/questdb/griffin/engine/functions/table/ReadParquetFunctionFactory.java (1)
86-93: Guard against “no supported columns” to avoid confusing empty schemas.If a file contains only unsupported columns, the cursor will be created with an empty
RecordMetadata. Consider failing fast with a clear error. Suggested patch:final GenericRecordMetadata metadata = new GenericRecordMetadata(); // `read_parquet` function will request symbols to be converted to varchar - decoder.metadata().copyToSansUnsupported(metadata, true); + decoder.metadata().copyToSansUnsupported(metadata, true); + if (metadata.getColumnCount() == 0) { + throw SqlException.$(argPos.getQuick(0), + "no supported columns found in parquet file: ").put(filePath); + }core/rust/qdbr/src/parquet_write/file.rs (2)
810-815: Return an error instead of panicking for unexpected primitive tags (and align Symbol branch).The new catch-all
InvalidTypeerror is good. For consistency and to avoid accidental panics, consider removing thepanic!onSymboland returning an error as well. Suggested patch:- ColumnTypeTag::Symbol => { - panic!("Symbol type is encoded in column_chunk_to_pages()") - } - _ => Err(fmt_err!( + ColumnTypeTag::Symbol => Err(fmt_err!( + InvalidType, + "unexpected symbol type in primitive encoder for column {} (should be handled earlier)", + column.name, + )), + _ => Err(fmt_err!( InvalidType, "unexpected type {} for column {}", column.data_type, column.name, )),
283-334: Unwraps defeat the new recoverable error path.Both code paths use
.expect("encoded_column"). AnyInvalidTypeyou return upstream will still crash here. If the intent is to bubble errors to the caller, replace unwraps with propagation.If you want, I can prepare a follow-up patch that:
- replaces the unwraps with
?- switches the parallel collection to collect
ParquetResult<_>and transpose after join- surfaces
ParquetError::InvalidTypeto the writer API.core/src/test/resources/sqllogictest/test/parquet/map_duckdb.test (1)
1-10: Good regression test for skipping unsupported columns.This validates that scanning proceeds when a DuckDB map column is present. Consider adding a companion test for the SQL function under change (
read_parquet) to ensure identical behavior and to cover both parallel and non-parallel read paths.core/rust/qdbr/src/parquet_write/array.rs (3)
640-654: 2D shape: move shape[0] update after the loop — clearer and avoids redundant writes.Behavior remains equivalent; minor improvement to readability and potentially fewer stores inside the loop.
653-673: 3D shape: same improvement applied — looks good.
694-699: 4D shape: same improvement applied — looks good.core/src/main/java/io/questdb/griffin/engine/table/TimeFrameRecordCursorImpl.java (2)
82-85: Optionally clear the frame address cache on close to release memory sooner.Not critical, but clearing the frameAddressCache can help drop cached lists promptly for long-lived cursor instances.
Apply this diff:
public void close() { - Misc.free(frameMemoryPool); - frameCursor = Misc.free(frameCursor); + Misc.free(frameMemoryPool); + frameAddressCache.clear(); + frameCursor = Misc.free(frameCursor); }
222-224: Minor typo in comment ("bulding" → "building").Low-impact polish.
Apply this diff:
- // TODO: bulding page frame cache assumes opening all partitions; + // TODO: building page frame cache assumes opening all partitions;core/src/main/java/io/questdb/griffin/engine/table/AbstractPageFrameRecordCursor.java (1)
61-66: Optionally clear the address cache on close.Similar to other cursors, clearing the cache can reclaim memory early if this cursor object is reused or lives longer than a single query.
Apply this diff:
public void close() { Misc.free(frameMemoryPool); Misc.free(recordA); Misc.free(recordB); + frameAddressCache.clear(); frameCursor = Misc.free(frameCursor); }core/src/main/java/io/questdb/cairo/sql/PageFrameAddressCache.java (1)
219-229: Initialize order: call clear() first, then set members (avoid subtle surprises).Functionally correct as-is, but setting columnCount/types/indexes before clear() relies on clear() never touching them. Invert the order to future-proof and improve readability.
Apply this diff:
- public void of(@Transient RecordMetadata metadata, @Transient IntList columnIndexes, boolean external) { - this.columnCount = metadata.getColumnCount(); - columnTypes.clear(); - for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) { - columnTypes.add(metadata.getColumnType(columnIndex)); - } - this.columnIndexes.clear(); - this.columnIndexes.addAll(columnIndexes); - clear(); // also clears external flag - this.external = external; - } + public void of(@Transient RecordMetadata metadata, @Transient IntList columnIndexes, boolean external) { + // Reset frame-derived state and external flag first. + clear(); + this.columnCount = metadata.getColumnCount(); + columnTypes.clear(); + for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) { + columnTypes.add(metadata.getColumnType(columnIndex)); + } + this.columnIndexes.clear(); + this.columnIndexes.addAll(columnIndexes); + this.external = external; + }core/src/main/java/io/questdb/cairo/sql/PageFrameMemoryPool.java (1)
279-294: Mapping strategy correctly diverges for internal vs external parquet; one small clarity nit.
- Skipping undefined parquet columns is the right fix; nice.
- Using columnId for internal partitions and a running ordinal for external files matches the problem statement.
- Nit: metadataIndex is really “accepted/recognized column ordinal”. Consider renaming for readability.
Optional rename (local scope):
- int metadataIndex = 0; + int acceptedOrdinal = 0; ... - if (!addressCache.isExternal()) { + if (!addressCache.isExternal()) { // We can only trust column ids in case of partition files. final int columnId = parquetMetadata.columnId(parquetIndex); assert columnId > -1 : "negative column id value for " + parquetMetadata.columnName(parquetIndex); toParquetColumnIndexes.extendAndSet(columnId, parquetIndex); } else { - toParquetColumnIndexes.extendAndSet(metadataIndex, parquetIndex); + toParquetColumnIndexes.extendAndSet(acceptedOrdinal, parquetIndex); } - metadataIndex++; + acceptedOrdinal++;core/src/main/java/io/questdb/cairo/sql/PageFrameCursor.java (1)
87-91: Add a defaultisExternal()toPageFrameCursorto preserve binary compatibilityProviding a default implementation in the interface ensures existing downstream implementations continue to work without requiring every implementor to add an override. Table-backed cursors will inherit the
falsedefault, while Parquet-backed ones can still override to returntrue.Key implementors of
PageFrameCursor:
ReadParquetPageFrameCursor(core/src/main/java/io/questdb/griffin/engine/functions/table/ReadParquetPageFrameCursor.java)
Overrides to returntrue.SelectedPageFrameCursor(core/src/main/java/io/questdb/griffin/engine/table/SelectedRecordCursorFactory.java)
Delegates to the wrapped cursor (baseCursor.isExternal()).Apply this change in
PageFrameCursor.java:--- a/core/src/main/java/io/questdb/cairo/sql/PageFrameCursor.java +++ b/core/src/main/java/io/questdb/cairo/sql/PageFrameCursor.java @@ - /** - * Returns true if the cursor belongs to an external parquet file, false in case of table partition files. - */ - boolean isExternal(); + /** + * Returns true if the cursor belongs to an external parquet file, false in case of table partition files. + * Default implementation returns false for table-backed cursors. + */ + default boolean isExternal() { + return false; + }All existing implementations that explicitly override
isExternal()will continue to function as before, and no downstream compilation breaks will occur.core/src/main/java/io/questdb/griffin/engine/table/parquet/PartitionDecoder.java (2)
229-231: Outdated comment contradicts new behaviorThe comment claims unsupported columns are included (as Undefined), but copyToSansUnsupported() below intentionally skips them. Update the comment to avoid confusion.
Apply:
- /** - * Unsupported columns are included into the metadata, but with Undefined column type. - */ + /** + * Note: low-level parquet metadata tracks all columns (unsupported are tagged as Undefined), + * but higher-level GenericRecordMetadata produced via copyToSansUnsupported() omits them. + */
247-275: Log more context for skipped columns (type code) and document symbol handlingThe info log is helpful; include the type code to aid triage. Also, since treatSymbolsAsVarchar affects column types, add a short Javadoc to copyToSansUnsupported() describing that behavior.
Apply:
- if (ColumnType.isUndefined(columnType)) { - LOG.info().$("unsupported column type, skipping [column=").$(columnName).I$(); + if (ColumnType.isUndefined(columnType)) { + LOG.info().$("unsupported column type, skipping [column=").$(columnName) + .$(", typeCode=").$(columnType).I$(); continue; }And add Javadoc:
- public void copyToSansUnsupported(GenericRecordMetadata metadata, boolean treatSymbolsAsVarchar) { + /** + * Copies supported columns into the provided metadata, skipping any with Undefined type. + * If treatSymbolsAsVarchar is true, symbol columns are exposed as VARCHAR. + */ + public void copyToSansUnsupported(GenericRecordMetadata metadata, boolean treatSymbolsAsVarchar) {core/src/main/java/io/questdb/griffin/engine/functions/table/ReadParquetRecordCursor.java (2)
154-159: Error message loses the original causeWrapping any CairoException into a generic "Parquet file is likely corrupted" drops useful context (encoding, column, row group). Consider appending ex.getMessage() for diagnostics.
Apply:
- } catch (CairoException ex) { - throw CairoException.nonCritical().put("Error reading. Parquet file is likely corrupted"); + } catch (CairoException ex) { + throw CairoException.nonCritical() + .put("Error reading parquet (likely corrupted or unsupported): ").put(ex.getMessage()); }
219-222: Sanity check: buffer count must match metadata column countGiven the changed column selection, ensure RowGroupBuffers exposes exactly metadata.getColumnCount() buffers. Your earlier metadataHasChanged() check should guarantee this, but add an assert to catch regressions.
Apply:
for (int i = 0, n = metadata.getColumnCount(); i < n; i++) { dataPtrs.add(rowGroupBuffers.getChunkDataPtr(i)); auxPtrs.add(rowGroupBuffers.getChunkAuxPtr(i)); } + assert dataPtrs.size() == metadata.getColumnCount() && auxPtrs.size() == metadata.getColumnCount();
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (6)
core/src/main/resources/io/questdb/bin/darwin-aarch64/libquestdbr.dylibis excluded by!**/*.dylibcore/src/main/resources/io/questdb/bin/darwin-x86-64/libquestdbr.dylibis excluded by!**/*.dylibcore/src/main/resources/io/questdb/bin/linux-aarch64/libquestdbr.sois excluded by!**/*.socore/src/main/resources/io/questdb/bin/linux-x86-64/libquestdbr.sois excluded by!**/*.socore/src/main/resources/io/questdb/bin/windows-x86-64/questdbr.dllis excluded by!**/*.dllcore/src/test/resources/sqllogictest/data/parquet-testing/map_duckdb.parquetis excluded by!**/*.parquet
📒 Files selected for processing (25)
core/rust/qdb-core/src/col_driver/mod.rs(3 hunks)core/rust/qdb-core/src/col_type.rs(7 hunks)core/rust/qdbr/src/parquet_read/decode.rs(13 hunks)core/rust/qdbr/src/parquet_read/meta.rs(3 hunks)core/rust/qdbr/src/parquet_write/array.rs(4 hunks)core/rust/qdbr/src/parquet_write/file.rs(1 hunks)core/rust/qdbr/src/parquet_write/schema.rs(1 hunks)core/src/main/java/io/questdb/cairo/TableWriter.java(1 hunks)core/src/main/java/io/questdb/cairo/sql/PageFrameAddressCache.java(4 hunks)core/src/main/java/io/questdb/cairo/sql/PageFrameCursor.java(1 hunks)core/src/main/java/io/questdb/cairo/sql/PageFrameMemoryPool.java(1 hunks)core/src/main/java/io/questdb/cairo/sql/TablePageFrameCursor.java(0 hunks)core/src/main/java/io/questdb/cairo/sql/async/PageFrameSequence.java(1 hunks)core/src/main/java/io/questdb/griffin/engine/functions/table/ReadParquetFunctionFactory.java(1 hunks)core/src/main/java/io/questdb/griffin/engine/functions/table/ReadParquetPageFrameCursor.java(1 hunks)core/src/main/java/io/questdb/griffin/engine/functions/table/ReadParquetRecordCursor.java(4 hunks)core/src/main/java/io/questdb/griffin/engine/groupby/SampleByFirstLastRecordCursorFactory.java(1 hunks)core/src/main/java/io/questdb/griffin/engine/groupby/vect/GroupByNotKeyedVectorRecordCursorFactory.java(1 hunks)core/src/main/java/io/questdb/griffin/engine/groupby/vect/GroupByRecordCursorFactory.java(1 hunks)core/src/main/java/io/questdb/griffin/engine/table/AbstractPageFrameRecordCursor.java(2 hunks)core/src/main/java/io/questdb/griffin/engine/table/SelectedRecordCursorFactory.java(1 hunks)core/src/main/java/io/questdb/griffin/engine/table/TablePageFrameCursor.java(1 hunks)core/src/main/java/io/questdb/griffin/engine/table/TimeFrameRecordCursorImpl.java(1 hunks)core/src/main/java/io/questdb/griffin/engine/table/parquet/PartitionDecoder.java(4 hunks)core/src/test/resources/sqllogictest/test/parquet/map_duckdb.test(1 hunks)
💤 Files with no reviewable changes (1)
- core/src/main/java/io/questdb/cairo/sql/TablePageFrameCursor.java
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (27)
- GitHub Check: New pull request (Coverage Report Coverage Report)
- GitHub Check: New pull request (SelfHosted Other tests on linux-x64-zfs)
- GitHub Check: New pull request (SelfHosted Griffin And Fuzz tests on linux-arm64)
- GitHub Check: New pull request (SelfHosted Other tests on linux-arm64)
- GitHub Check: New pull request (SelfHosted Griffin And Fuzz tests on linux-x64-zfs)
- GitHub Check: New pull request (Hosted Running tests on windows-other)
- GitHub Check: New pull request (Hosted Running tests on windows-pgwire)
- GitHub Check: New pull request (Hosted Running tests on windows-cairo)
- GitHub Check: New pull request (Hosted Running tests on windows-fuzz2)
- GitHub Check: New pull request (Hosted Running tests on windows-fuzz1)
- GitHub Check: New pull request (Hosted Running tests on windows-griffin)
- GitHub Check: New pull request (Hosted Running tests on mac-other)
- GitHub Check: New pull request (Hosted Running tests on mac-pgwire)
- GitHub Check: New pull request (Hosted Running tests on mac-cairo-fuzz)
- GitHub Check: New pull request (Hosted Running tests on mac-cairo)
- GitHub Check: New pull request (SelfHosted Other tests Start ARM Agent)
- GitHub Check: New pull request (Hosted Running tests on mac-griffin)
- GitHub Check: New pull request (Rust Test and Lint on linux-jdk17)
- GitHub Check: New pull request (SelfHosted Griffin And Fuzz tests Start ARM Agent)
- GitHub Check: New pull request (SelfHosted Griffin And Fuzz tests Start X64Zfs Agent)
- GitHub Check: New pull request (SelfHosted Other tests Start X64Zfs Agent)
- GitHub Check: New pull request (Hosted Running tests with cover on linux-other)
- GitHub Check: New pull request (Hosted Running tests with cover on linux-pgwire)
- GitHub Check: New pull request (Hosted Running tests with cover on linux-cairo)
- GitHub Check: New pull request (Hosted Running tests with cover on linux-fuzz)
- GitHub Check: New pull request (Hosted Running tests with cover on linux-griffin)
- GitHub Check: New pull request (Check Changes Check changes)
🔇 Additional comments (34)
core/rust/qdb-core/src/col_type.rs (2)
164-189: Dual mapping of IPv4 (21 and 25) — intentional alias or oversight? Add comment/tests.
TryFrom<u8>maps both21and25toIPv4while the enum value is25. If21is a legacy alias, keep it but document it and add tests; if not, drop it to avoid silently reinterpreting an unknown tag as IPv4.I recommend we verify and then either:
- Keep alias with comment and tests, or
- Remove the
21 =>arm.Here is a minimal diff to document the alias:
- 21 => Ok(ColumnTypeTag::IPv4), + // Accept legacy IPv4 tag code 21 for backward compatibility. + 21 => Ok(ColumnTypeTag::IPv4),And add a unit test asserting that both 21 and 25 parse to IPv4.
Would you like me to add the tests?
295-307: Deserialization rejects code <= 0 — verify this aligns with introducing Undefined.
TryFrom<i32>still rejectsv <= 0, which prevents deserializing a code-0ColumnType. If undefined/unsupported tags are only represented in sub-fields (e.g., array element type) and not as wholeColumnTypecodes, this is fine. Otherwise, we need a path to deserialize “undefined” safely.Can you confirm there’s no protocol or metadata path that emits a 0 code? If there is, I can propose a defensive decode that yields
Undefinedinstead of error.core/rust/qdb-core/src/col_driver/mod.rs (1)
34-35: Import changes LGTM.Importing
CoreResultandfmt_erraligns error handling with the rest of the module.core/src/main/java/io/questdb/griffin/engine/table/SelectedRecordCursorFactory.java (1)
342-346: Propagating isExternal() is correct and needed for downstream caches.Forwarding
isExternal()to the base cursor ensures address caches and consumers get consistent semantics. Looks good.core/src/main/java/io/questdb/griffin/engine/table/TablePageFrameCursor.java (1)
31-33: Solid default: in-house cursors are non-external by default.Adding the doc and a default
isExternal() = falsecreates a clear contract and won’t surprise existing implementors.Also applies to: 38-41
core/src/main/java/io/questdb/griffin/engine/groupby/SampleByFirstLastRecordCursorFactory.java (2)
712-724: Propagate external-frame awareness to address cache — looks correctPassing frameCursor.isExternal() into PageFrameAddressCache.of(...) aligns this factory with the new API and ensures proper handling of external page frames such as Parquet. No functional concerns with this specific change.
523-526: Verify external frame invariants before using SampleByFirstLastRecordCursorFactoryI didn’t see any
isExternalguard around the call tonew SampleByFirstLastRecordCursorFactory(…)in SqlCodeGenerator (around line 3991). That means a Parquet-backed frame—where
frame.getBitmapIndexReader(…)andframe.getSymbolTable(…)return null—could reach this cursor factory and trigger NPEs at lines 523–526 of
core/src/main/java/io/questdb/griffin/engine/groupby/SampleByFirstLastRecordCursorFactory.java.Please confirm or enforce the invariant that external (Parquet) frames are never passed here. If that guarantee isn’t already enforced upstream, either:
- Add an explicit
isExternalcheck in SqlCodeGenerator before constructing this factory, or- Guard against null from
getBitmapIndexReader()/getSymbolTable()inside SampleByFirstLastRecordCursorFactory and throw a clear exception.Locations to review:
- SampleByFirstLastRecordCursorFactory.java: lines 523–526
- SqlCodeGenerator.java: instantiation at ~line 3991
core/src/main/java/io/questdb/griffin/engine/functions/table/ReadParquetPageFrameCursor.java (1)
102-106: Explicit external cursor signal — LGTMReturning true from isExternal() is the right contract for Parquet-backed frames and unblocks downstream address-cache behavior.
core/src/main/java/io/questdb/cairo/sql/async/PageFrameSequence.java (2)
392-396: Pass-through of isExternal to address cache — approvedUpdating PageFrameSequence to propagate frameCursor.isExternal() into PageFrameAddressCache.of(...) keeps the async path consistent with the synchronous sites. Good catch.
394-395: AllPageFrameAddressCache.of(...)invocations include the requiredisExternalflag
I’ve checked every call site ofPageFrameAddressCache.of(...)across table, time‐frame, and group‐by cursors—each one passes three arguments, includingframeCursor.isExternal(). No changes are needed.core/src/main/java/io/questdb/griffin/engine/functions/table/ReadParquetFunctionFactory.java (1)
86-92: Switch to metadata.copyToSansUnsupported() is the right call for graceful degradation.Passing
treatSymbolsAsVarchar = truemaintains previous behavior for symbol handling, while filtering out unsupported Parquet columns. Nice, focused change.core/rust/qdbr/src/parquet_write/array.rs (1)
708-713: General case: usingmax(1)after resetting counts is fine and equivalent.Since
counts[dim]is reset to1immediately before,shape[dim] = shape[dim].max(1)is equivalent to the previousmax(counts[dim])and communicates intent.core/src/main/java/io/questdb/griffin/engine/groupby/vect/GroupByRecordCursorFactory.java (2)
382-386: PlumbingisExternalinto PageFrameAddressCache.of(...) is correct and future-proof.This keeps the vectorized group-by path aligned with the new external-frame semantics.
382-386: All call sites updated to the 3-arg signature
I ran searches for anyPageFrameAddressCache.of(...)calls with only two arguments and found no matches, confirming that every usage now includes theisExternalparameter. No further changes are needed here.core/src/main/java/io/questdb/griffin/engine/table/TimeFrameRecordCursorImpl.java (1)
145-145: Correctly propagates external frame awareness to the address cache.Passing frameCursor.isExternal() into frameAddressCache.of(...) is the right move to align with the new PageFrameAddressCache API and parquet/table partition semantics.
core/src/main/java/io/questdb/griffin/engine/groupby/vect/GroupByNotKeyedVectorRecordCursorFactory.java (1)
247-247: Good: frameAddressCache initialized with isExternal flag.This ensures correct parquet column-id mapping for external files vs internal partitions. The subsequent per-worker frameMemoryPools.of(frameAddressCache) remains correct.
core/src/main/java/io/questdb/griffin/engine/table/AbstractPageFrameRecordCursor.java (2)
28-36: Explicit imports improve clarity and avoid wildcard collisions.Nice cleanup replacing the wildcard with explicit imports; helps readability and tooling.
107-111: Correct: address cache init now respects external frames.Using frameCursor.isExternal() in frameAddressCache.of(...) aligns with the new API and parquet behavior.
core/src/main/java/io/questdb/cairo/sql/PageFrameAddressCache.java (5)
39-44: Javadoc update is accurate and helpful.The distinction between native and parquet frames is well-articulated and fits the added functionality.
71-72: Introducing the external flag is a sensible design choice.This lightweight switch lets downstream components choose the correct mapping strategy.
145-146: Resetting the external flag in clear() makes lifecycle semantics explicit.This protects against stale state when reusing the cache.
211-214: Accessor looks good.isExternal() conveys state without exposing mutability.
219-229: AllframeAddressCache.of(...)invocations include the new thirdexternalparameter; no missing call sites found.I ran a repository-wide search and confirmed that every
frameAddressCache.of(...)call now passes three arguments—metadata, the column indexes list, and theexternalflag—so the migration to the new signature is complete.core/src/main/java/io/questdb/cairo/sql/PageFrameMemoryPool.java (1)
279-306: Review metadata alignment in external frames– In the read_parquet path, Parquet metadata is pre-filtered by
PartitionDecoder.copyToSansUnsupported(metadata, true), so theGenericRecordMetadatapassed into page-frame allocation only contains supported columns.
– ThePageFrameMemoryPoolbranch for external frames relies onaddressCache.isExternal()being set totruefor read_parquet cursors—please confirm that theReadParquetPageFrameCursor(or its factory) indeed constructs itsPageFrameAddressCachewithisExternal==true.
– I wasn’t able to find any calls to aPageFrameAddressCache.of(...)factory; verify how the external flag is passed into the cache’s constructor or builder in your cursor implementation.
– If any custom pipeline variant bypassescopyToSansUnsupportedor mis-sets the external flag, gaps between metadata indexes and original Parquet column ordinals could occur.core/src/main/java/io/questdb/cairo/sql/PageFrameCursor.java (1)
92-103: Good addition: default next() convenience methodAdding a default next() that delegates to next(0) simplifies callers and keeps implementations focused on next(long). No concerns.
core/src/main/java/io/questdb/griffin/engine/table/parquet/PartitionDecoder.java (1)
247-275: No remaining internal callers of copyTo(GenericRecordMetadata,boolean) – migration completeVerified that within the
corecodebase there are no invocations of the oldpublic void copyTo(GenericRecordMetadata metadata, boolean treatSymbolsAsVarchar)signature:
PartitionDecodernow only exposescopyToSansUnsupported(...)- Both
ReadParquetFunctionFactoryandTableWriterinvoke the new API- No other references to the old method were found via repository-wide search
Since the rename doesn’t break any internal call sites, you can safely remove the old signature. If you expose this API for downstream modules or external users, you may optionally add a deprecated delegating overload for one release cycle:
+ /** + * @deprecated use {@link #copyToSansUnsupported(GenericRecordMetadata,boolean)} instead. + */ + @Deprecated + public void copyTo(GenericRecordMetadata metadata, boolean treatSymbolsAsVarchar) { + copyToSansUnsupported(metadata, treatSymbolsAsVarchar); + }core/rust/qdbr/src/parquet_read/decode.rs (4)
863-964: Nice: broader UTF-8/VARCHAR and dictionary supportAllowing DeltaLengthByteArray, DeltaByteArray, and dictionary-encoded varlen paths for String/Varchar significantly improves compatibility with real-world parquet producers. Looks good.
965-1033: Binary/Array dictionary and delta-length supportThe added dictionary path for Binary and support for DeltaLengthByteArray in Binary/Array fill important gaps. Solid addition.
1076-1139: Double: dictionary and array branches are a welcome expansionHandling dictionary-encoded doubles and array-of-double via both plain and dictionary paths resolves a major pain point reported in the linked issue. Good job.
1141-1210: Catch-all fixed-size types: tidy and correctThe consolidated fixed-size handling (Float/Boolean + dict) is clean and aligns with parquet encodings. No issues spotted.
core/rust/qdbr/src/parquet_read/meta.rs (4)
59-63: Name retention: good ownership patternStoring UTF-16 names per column and retaining ownership via name_vec while exposing a stable pointer is the right approach for FFI. LGTM.
136-144: Potential mismatch: Int64 Timestamp with milliseconds mapped to DateHere, Int64+Timestamp(Milliseconds) → Date. In decode.rs, Int64+Timestamp(any unit) is decoded as Timestamp with unit normalization (ms→μs). Mapping to Date here can lead to inconsistent semantics (Date likely ms, Timestamp μs) and may break metadata/type checks and consumers expecting Timestamp.
Would you consider aligning this to Timestamp as well?
- (PhysicalType::Int64, - Some(Timestamp { unit: TimeUnit::Milliseconds, is_adjusted_to_utc: _ }), - _, - ) => Some(ColumnType::new(ColumnTypeTag::Date, 0)), + (PhysicalType::Int64, + Some(Timestamp { unit: TimeUnit::Milliseconds, is_adjusted_to_utc: _ }), + _, + ) => Some(ColumnType::new(ColumnTypeTag::Timestamp, 0)),If Date is intentional for ms, please confirm the end-to-end behavior (Java accessors, downsampling, and formatters) so we can document the rationale and ensure decode paths don’t double-convert.
151-158: Compact Int16/Int8 mapping: looks correctConsolidating Int32+Integer(Int16/Int8) to Short/Byte simplifies mapping and matches parquet logical typing. LGTM.
165-184: Broadened Double/Float/ByteArray handling: good coverageRecognizing arrays via array_column_type and defaulting ByteArray to Binary unless UTF-8/String is present makes the mapping resilient to producers. No issues.
core/src/main/java/io/questdb/griffin/engine/functions/table/ReadParquetRecordCursor.java
Show resolved
Hide resolved
[PR Coverage check]😍 pass : 89 / 107 (83.18%) file detail
|
Fixes #5942
Some of the fields from the files from the bug report had
Unknownlogical type while we were only allowingNonein the Rust code. So, the fix relaxes the checks.Also, unsupported field types (think, columns) are now skipped by
read_parquet()SQL function. This way, the function reads whatever it can read from the given file. Skipped columns are logged in server logs.Other than that, cleans up
calculate_array_shapebased on recent suggestions from @RaphDal and optimizes the general case based on Sopel97's suggestion.