fix(http): prevent parquet export corruption under concurrent connections#6723
fix(http): prevent parquet export corruption under concurrent connections#6723bluestreak01 merged 16 commits intomasterfrom
Conversation
…ions Concurrent parquet exports could produce corrupted or truncated files when multiple clients exported simultaneously through the same HTTP worker. Affected clients would receive invalid parquet data, failing to open in downstream tools. Implementation details for reviewers: ExportQueryProcessor is instantiated per-worker, but stored per-connection state (currentContext, serialParquetExporter) in per-processor fields. When a worker parked connection A via PeerIsSlowToReadException and began serving connection B, these fields were overwritten. On resume, the onWrite callback would use the wrong connection context, writing parquet chunks to the wrong response. Solution: Move both fields to per-connection ExportQueryProcessorState.
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the
WalkthroughRefactors HTTP parquet export flow to use per-request state-managed exporters and callbacks (moving wiring into ExportQueryProcessorState), migrates per-connection circuit-breaker/SQL-context creation into HttpConnectionContext, updates multiple processors to obtain contexts on demand, and adds a concurrent parquet export test. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Hello @coderabbitai, please review this. Focus on concurrency control, consider the lifecycle of connection, http connection context, http request processors, states. |
🧩 Analysis chain🏁 Script executed: Length of output: 17338 🏁 Script executed: Length of output: 86851 🏁 Script executed: Length of output: 12123
✅ Core Fix - Correctly Resolves Parquet Export CorruptionThe solution properly addresses the root cause: Problem correctly identified: Solution is sound: Moving both fields to per-connection Key improvements:
Test validation: The test properly validates the fix by spawning 16 concurrent threads with small buffers (1024 bytes + random), forcing connection parking and worker context switches. Each thread validates parquet file integrity and row count matches.
|
|
There is another concurrency control issue in the master: Example: Reminder: HTTP Processor instances are bound to a worker (thread). Thus the Eventually, the socket is ready to accept WRITEs. So I/O dispatcher emits an event which can be handled by ANY I/O worker. Say Worker1 picks this event up. It calls the stored Current solution:
To be continued |
…reference to resume processor Reasoning: HTTP Request Processors are Worker-local. If we store a reference while running on on Worker0 and later are resume on Worker1 then the Worker1 will invoke a processor meant to be used by Worker0. If Worker0 concurrently handles another socket event then both Worker0 and Worker1 will use the same processor instance. This violates the contract of http request processor - they are NOT thread-safe.
|
@coderabbitai: please do another review round. I did life-cycle changes. note: processors are single-threaded. so a mutable errorSink or decimal256 fields are OK as long as they are used as scratchpads and they are not expected to survive cross request/resume boundaries |
🧩 Analysis chain🏁 Script executed: Length of output: 84295
✅ Core Architecture - Handler ID Resolution SystemProblem solved: Previously, Solution implemented:
This ensures Worker1 always accesses its own processor instance, never Worker0's. ✅ Per-Connection Resource IsolationThe move of circuit breaker and SQL execution context from processor-level to per-connection is critical: HttpConnectionContext now owns: All processors changed: Removed processor-level This prevents the configuration conflicts you mentioned (e.g., Worker0 and Worker1 both configuring the same circuit breaker instance). ✅ Lifecycle ManagementInitialization:
Cleanup:
✅ DocumentationThe extensive JavaDoc added to
🎯 Scratchpad Fields ClarificationThank you for clarifying that processors are guaranteed single-threaded. You're absolutely correct:
My previous concern about these fields was based on misunderstanding the threading guarantees. The real issue was the cross-worker processor access, which you've now solved. 🔍 Minor Observations
📋 SummaryThe lifecycle changes successfully solve the cross-worker concurrency issue through handler ID indirection and per-connection resource ownership. The architecture now correctly enforces:
The implementation is sound and the test coverage validates the concurrent scenario. Excellent work on addressing the root cause! ✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
core/src/test/java/io/questdb/test/cutlass/http/HttpQueryTestBuilder.java (1)
98-98:⚠️ Potential issue | 🟡 MinorRemove the unused
sqlExecutionContextsfield.The field is declared at line 98 but is not referenced anywhere in the codebase. It should be removed.
🤖 Fix all issues with AI agents
In `@core/src/main/java/io/questdb/cutlass/http/HttpServer.java`:
- Around line 425-428: resolveProcessorById currently calls
handlersByIdList.getQuick(handlerId).getProcessor(header) without validating
handlerId or the returned entry; add bounds and null checks to avoid OOB/NPE:
first verify handlerId >= 0 && handlerId < handlersByIdList.size(), then fetch
the entry (using getQuick or get) and check it for null before calling
getProcessor(header); if validation fails, return null (or an appropriate
no-op/error value consistent with surrounding code) instead of dereferencing a
possibly invalid entry.
In
`@core/src/main/java/io/questdb/cutlass/http/processors/ExportQueryProcessor.java`:
- Around line 1054-1067: The current parsing of timeout in ExportQueryProcessor
overrides the per-format default when the provided timeout ≤ 0; update the logic
to preserve the format-specific default: compute a defaultTimeout based on
exportModel.isParquetFormat() (use configuration.getExportTimeout()) or for CSV
retrieve NetworkSqlExecutionCircuitBreaker via
context.getOrCreateCircuitBreaker(engine) and use
circuitBreaker.getDefaultMaxTime(), then when parsing
request.getUrlParam(URL_PARAM_TIMEOUT) with Numbers.parseLong(...) set
state.timeout = parsed*1000 and if parsed <= 0 set state.timeout =
defaultTimeout; this ensures ExportQueryProcessor keeps the parquet vs CSV
defaults (use the same symbols: exportModel.isParquetFormat(),
configuration.getExportTimeout(), context.getOrCreateCircuitBreaker(engine),
NetworkSqlExecutionCircuitBreaker.getDefaultMaxTime(), request.getUrlParam,
Numbers.parseLong).
🧹 Nitpick comments (2)
core/src/test/java/io/questdb/test/cutlass/http/ConcurrentParquetExportTest.java (1)
114-116: Consider a parallel join with total timeout.The sequential
join(120_000)per thread means worst-case 32 minutes if all threads hang. A common pattern is to track remaining time across joins:long deadline = System.currentTimeMillis() + 120_000; for (Thread thread : threads) { long remaining = deadline - System.currentTimeMillis(); if (remaining > 0) { thread.join(remaining); } }This is a minor suggestion since the test should complete quickly under normal conditions.
core/src/main/java/io/questdb/cutlass/http/processors/ExportQueryProcessorState.java (1)
138-145: Verify HTTPSerialParquetExporter lifecycle cleanup.
close()nulls the exporter without freeing it. If it owns native buffers/threads or implementsCloseable, this can leak; please confirm and free it viaMisc.free(...).♻️ Suggested cleanup (if Closeable)
- serialParquetExporter = null; + serialParquetExporter = Misc.free(serialParquetExporter);#!/bin/bash # Inspect whether HTTPSerialParquetExporter implements Closeable or owns resources. rg -n "class HTTPSerialParquetExporter" -g '*.java' -C2 rg -n "HTTPSerialParquetExporter" -g '*.java' -C2
core/src/main/java/io/questdb/cutlass/http/processors/ExportQueryProcessor.java
Outdated
Show resolved
Hide resolved
also: prevent a potential double-close when a handler is used as a default request processor
[PR Coverage check]😍 pass : 143 / 148 (96.62%) file detail
|
|
@jerrinot Was this other issue affecting other type of queries not related to parquet? e.g. I have a case where queries checking ttl values seem to have mixed the results of diff tables (ran from diff connections): EDIT: the behavior I observed was 100% unrelated. |
|
hello @freddyrios, what's your conclusion? is it a QuestDB bug or something in your clients? |
@jerrinot 100% on our side. It was a bug on our code, as we were caught by surprise by ttlValue changing to different units than the ones used to set it e.g. if one sets days to 21, it reports it in weeks so it returns 3. The maxing of connections & timeouts were due to using more parallelization than we should have for the operations that had the bug, combined with it resulting in heavier than intended operations, |
Concurrent parquet exports could produce corrupted or truncated files when multiple clients exported simultaneously through the same HTTP worker. Affected clients would receive invalid parquet data, failing to open in downstream tools.
Implementation details for reviewers:
ExportQueryProcessoris instantiated per-worker, but stored per-connection state(currentContext, serialParquetExporter)in per-processor fields. When a worker parked connection A viaPeerIsSlowToReadExceptionand began serving connection B, these fields were overwritten. On resume, theonWrite()callback would use the wrong connection context, writing parquet chunks to the wrong response.Solution: Move both fields to per-connection
ExportQueryProcessorState.TODO: