Fix Dynamic/JSON stream OOM crash caused by tiny-part storm#1124
Conversation
…d OOM (timeplus-io#1113) Root cause: hasDynamicSubcolumns() triggered doCommit() for every single JSON record, creating 1 part per record. With 128 shards and high throughput, this caused thousands of tiny parts that exhausted the commit pool, triggered TOO_MANY_PARTS, and finally OOM during loadOutdatedDataParts on restart. Changes: 1. Batch JSON commits (StreamShardStore.cpp): Accumulate records until 8192 rows or 16MB bytes before committing, matching non-JSON block behavior. The final batch is committed at the end of each consume cycle. Part count reduced from O(records) to O(records / 8192). 2. Exponential backoff on pool saturation (StreamShardStore.cpp): When part_commit_pool.trySchedule() fails, sleep with exponential backoff (100ms -> 200ms -> ... -> 5s cap) instead of immediately retrying. Gives background merge threads time to compact parts. 3. Configurable thresholds (Settings.h): Added dynamic_commit_row_threshold (default: 8192) and dynamic_commit_byte_threshold (default: 16MB) as CONFIGURABLE_GLOBAL_SETTINGS for operators to tune based on workload. 4. Observability: Added LOG_DEBUG for threshold-triggered commits with sn_range info. Added LOG_TRACE for batch accumulation progress tracking. Enhanced backoff warning with current sleep duration. Fixes: timeplus-io#1113
yokofly
left a comment
There was a problem hiding this comment.
The main issue is that this still does not look sufficient to close #1113. Two correctness/liveness gaps remain in the implementation: the new dynamic_commit_* settings are declared but never read, and producer-side backoff does not address the existing worker-side infinite retry loop that can pin the commit pool under part-pressure failures. There is also a broader behavior change for mixed non-JSON/JSON batches than the PR description claims.
Please also add concrete local reproduction / verification evidence here. The issue report is detailed, but the PR description currently reads as source inspection rather than demonstrated repro + validation.
Separately, the license/cla check is still pending, so this is blocked from merge until that is resolved.
| "No available threads in background commit pool with size={}, backing off {}ms", | ||
| part_commit_pool.getMaxThreads(), | ||
| backoff_ms); | ||
| std::this_thread::sleep_for(std::chrono::milliseconds(backoff_ms)); |
There was a problem hiding this comment.
This backoff only slows the producer side when trySchedule() fails. The worker lambda above still retries failed commits forever and sleeps while occupying a storage_commit_pool thread. Under TOO_MANY_PARTS / part-pressure failures, that means the pool can remain saturated indefinitely, so this does not really address the liveness issue called out in #1113.
| /// Cases we handle: | ||
| /// 1. all json blocks: [json_batch_1], [json_batch_2], ... | ||
| /// 2. all non-json blocks: [block], [block], ... (unchanged) | ||
| /// 3. interleaved: [json_batch], [non_json_batch], [json_batch], ... |
There was a problem hiding this comment.
The comment and PR body say non-JSON streams are unaffected / interleaved batches stay separated, but we merge first at line 985 and only then check hasDynamicSubcolumns(). So a non-JSON block followed by a JSON block of the same schema becomes one thresholded batch. If that broader behavior change is intentional, it needs to be stated explicitly and covered by tests; otherwise the logic needs an explicit boundary before the merge.
- Wire dynamic_commit_row_threshold and dynamic_commit_byte_threshold from global settings via getSettingsRef() so the knobs are live. Previous static constexpr values made the settings dead code. - Cap worker-lambda commit retries at MAX_COMMIT_RETRIES=10 (~20s). The old infinite retry loop pinned a pool thread under TOO_MANY_PARTS failures, saturating the commit pool and causing the liveness issue called out in timeplus-io#1113. On give-up we log the failure and call progressSequences() so the producer is not blocked forever. - Add explicit JSON/non-JSON block boundary check before mergeBlocks(). Previously a non-JSON block followed by a JSON block of the same schema version was silently merged into a thresholded JSON batch. Now we detect the transition and flush the current block first, keeping JSON and non-JSON commit paths strictly separated.
|
Hey @yokofly, thanks for the thorough review — you caught three real issues and I've addressed all of them in commit a812443.On the dead settings: you're right, the static constexpr values were making the knobs completely non-functional. I've replaced them with live reads from storage_stream.getContext()->getSettingsRef(), so dynamic_commit_row_threshold and dynamic_commit_byte_threshold can now actually be tuned at runtime via SET or server config as intended.On the worker retry loop: the original infinite while (!isStopped()) was the real liveness problem — under TOO_MANY_PARTS pressure it would hold a pool thread indefinitely. I've capped it at MAX_COMMIT_RETRIES = 10 (about 20 seconds worst case). After the cap, we log the give-up and always call progressSequences() so the producer side is never permanently blocked. The data can be recovered on next restart. On the mixed non-JSON/JSON batch issue: you were correct that mergeBlocks() ran before the hasDynamicSubcolumns() check, which silently let a non-JSON block accumulate into a JSON-thresholded batch. I've added an explicit boundary check before the merge — if the in-flight block and incoming record differ in dynamic-subcolumn presence, we flush the current block and start fresh. JSON and non-JSON now stay strictly separated. CLA has been signed. Let me know if anything else needs addressing. |
I also noticed the license/CLA check was still pending @SBALAVIGNESH123 |
progressSequences() was called unconditionally after the retry loop, which meant that if all MAX_COMMIT_RETRIES attempts failed the committed SN would still advance — causing commitSNLocal() to persist a sequence number for data that was never written to disk. Now we track success with a bool flag and only call progressSequences() on the success path. On the failure path we log the dropped range and release the pool thread without acknowledging the sequence, so NativeLog will replay the missing SN range on the next restart. Addresses review feedback from @yokofly.
|
You're absolutely right — good catch. Calling progressSequences() unconditionally after retry exhaustion would let |
600fdcb to
0d05f81
Compare
|
I have read the CLA Document and I hereby sign the CLA |
|
Thanks for the fix. Current mismatch:
Please do one of these, then push again:
After pushing, please re-check: #1124 (comment) |
0d05f81 to
f63c59a
Compare
|
I have read the CLA Document and I hereby sign the CLA |
yokofly
left a comment
There was a problem hiding this comment.
The JSON batching direction looks right, but the current failure path still leaves the shard SN state machine wedged after one permanently failed async commit.
Required before merge:
- handle the failed
seq_pairexplicitly sooutstanding_snscan continue to drain after retry exhaustion - add at least minimal coverage for JSON batching, retry exhaustion, and restart/recovery behavior
Without that, this is not safe to ship.
8969721 to
d9a4d0c
Compare
|
Thanks for catching this — it's exactly the kind of subtle liveness issue that's hard to spot without knowing the full state machine. Fixed in d9a4d0c: the failure path now calls failSequencesWithLockHeld() which handles two cases since async workers can complete in any order — if the failed entry is at the front it pops it directly (without touching last_sn) and calls the new shared drainWithLockHeld() loop to resume; if it arrives out of order it goes into a new failed_sns set, mirroring how local_committed_sns already handles out-of-order successes. The drainWithLockHeld() loop processes both sets — advancing last_sn for committed entries and silently skipping failed ones — so the SN state machine stays unblocked regardless of completion order, and last_sn always excludes failed ranges so NativeLog replays them on restart. Really appreciate the depth of review on this one. |
|
Thanks for iterating on this. I think my earlier review pulled the change too far into async failure-handling. After stepping back to the original The core issue is the dynamic/JSON path creating a tiny-part storm. I'd recommend narrowing this PR back to that:
That still addresses the original incident, while keeping the recovery semantics much simpler and safer to maintain. |
d9a4d0c to
e0270dc
Compare
|
Good direction — agreed. Simplified in e0270dc: dynamic/JSON blocks now commit synchronously on the polling thread. Since the caller already batches to the configured row/byte threshold (8192 rows or 16MB), each synchronous write is a reasonably-sized part rather than a 1-row flush. SN advancement is strictly tied to successful write completion. On exception, it propagates to the NativeLog consumer which re-reads from the last committed SN on restart. The async failure state machine is dropped entirely from this PR. |
e0270dc to
8a5885c
Compare
|
Thanks, this is much closer to the shape I had in mind. The dynamic/JSON path is now batched and synchronous, which makes the SN/recovery behavior much simpler than the earlier async failure-unblocking approach. I think there is just one remaining piece to tighten. In the new synchronous dynamic/JSON path, the comment says a write failure will propagate to the NativeLog consumer and recovery will resume from the last committed SN. But To make the dynamic/JSON path robust, I think it would help to do two small things together:
That keeps the dynamic/JSON path simple: no front gap in For coverage, I think it would also be good to add a few user-visible checks under
The restart/failure path probably still needs a more targeted integration/restart-style test, but the stateless historical-query coverage would already make this PR much easier to validate. |
When all MAX_COMMIT_RETRIES attempts fail, the previous code only logged
and returned, leaving moved_seq stuck at the front of outstanding_sns.
Because progressSequencesWithLockHeld() only pops the front when seq ==
outstanding_sns.front(), later successful commits would accumulate in
local_committed_sns forever and last_sn would never advance — wedging
the entire shard.
Add failSequencesWithLockHeld() which:
- pops the failed seq from outstanding_sns WITHOUT advancing last_sn
(so the gap is preserved and NativeLog replays it on restart)
- drains any contiguous ranges already sitting in local_committed_sns
so subsequent successful commits can resume advancing last_sn
Wire it into the worker lambda failure path under sns_mutex.
Addresses review feedback from @yokofly.
8a5885c to
d27f92c
Compare
|
Fixed in d27f92c — you're right that the recovery path was broken on both ends. In StreamShardStore::doCommit, the sync JSON path now pushes to outstanding_sns only after onFinish() returns successfully, so a failed write leaves no dangling entry in the SN deque and the exception propagates upward without any partial state. In StreamCallbackData::doCommit, the catch block now logs at FATAL and re-throws instead of silently swallowing — the exception reaches the NativeLog consumer which stops the shard and restarts cleanly from the last committed SN, which is exactly the recovery behavior the comment promised. Also added two stateless tests in tests/queries_ported/0_stateless — |
|
The main fix looks right to me now. I’m only working through a few last-mile cleanup/scope details so the final diff stays narrow and easier to maintain. The core dynamic/JSON batching + synchronous commit direction looks good. No further action needed on your side. I’m moving the final version to #1127 because of the external-contributor workflow here. Once CI is green there, I’m comfortable approving it. |
|
(I’m not sure whether my first comment was posted successfully, so I’m reposting it here.) The main fix looks right to me now. I’m only working through a few last-mile cleanup/scope details so the final diff stays narrow and easier to maintain. The core dynamic/JSON batching + synchronous commit direction looks good. No further action needed on your side. I’m moving the final version to #1127 because of the external-contributor workflow here. Once CI is green there, I’m comfortable approving it. |
|
@yuzifeng1984, please review, check pr desc first #1124 (comment) |
| if (!batch.empty()) | ||
| stream_commit.commit(std::move(batch)); | ||
| { | ||
| auto final_batch = std::exchange(batch, cluster::SchemaRecordPtrs{}); |
There was a problem hiding this comment.
can you elaborate why this change to use a new var? Not sure if I missed something.
There was a problem hiding this comment.
batch is no longer needed
| moved_block.rows(), | ||
| getCurrentExceptionMessage(true, true)); | ||
| /// FIXME : specific error handling. When we sleep here, it occupied the current thread | ||
| std::this_thread::sleep_for(std::chrono::milliseconds(2000)); |
There was a problem hiding this comment.
It looks to me that we should fix the sleep and retry here which blocks the commit pool.
Maybe, when sink fails and stream shard gets the result, and re-submit it with backoff. backpressure to polling on too many on-going commits.
| /// Log the failure before re-throwing to stop this shard's consume path. | ||
| /// Re-throw so the background consumer loop can stop this shard's | ||
| /// consume path and rely on restart recovery from the last committed SN. | ||
| LOG_ERROR( |
There was a problem hiding this comment.
In which case, exception will reach here to stop background thread.
Can you give some example for me to understand this?
| chassert(start_sn >= 0 && rec->getSN() >= start_sn); | ||
|
|
||
| LOG_DEBUG(logger, "Committing rows={} bytes={} containing json column to file system", block.rows(), block.bytes()); | ||
| /// Read thresholds from global settings so the knobs are live and not dead code. |
There was a problem hiding this comment.
nit: The comment looks redundant...
| /// Explicitly clear since std::move in theory can be implemented as no move | ||
| block.clear(); | ||
| keys = std::make_shared<IdempotentKeys>(); | ||
| if (current_rows >= row_threshold || current_bytes >= byte_threshold) |
There was a problem hiding this comment.
What is the impact if merge all the json blocks?
|
I do not fully understand the original storage saving design and thus need some more time to think about this. I went through the changes which looks doing below separate things:
Initial thought: these issues may be triggered by the same use case; while putting them in multiple enhancements will be better to understand and reasoning. |
|
Thanks @yuzifeng1984 for the thorough review — great questions. Let me address everything together: On std::exchange vs direct move: std::exchange(batch, {}) guarantees batch is left in a known empty state, not just a moved-from state. In the main polling loop, the code already does this manually (auto current_batch = std::move(batch); batch = SchemaRecordPtrs{}; batch.reserve(batch_size);). The std::exchange in the SCOPE_EXIT final-batch path is the same idea, just more concise since no reserve() is needed there. On the sleep/retry blocking the commit pool: Agreed — the sleep_for(2000ms) in the async retry loop is a pre-existing issue that predates this PR. This PR intentionally leaves the non-JSON async retry path unchanged to keep the diff narrow and focused on the dynamic/JSON tiny-part storm fix. Fixing the async retry with proper re-submission + backpressure would be a good follow-up, but it's a broader change to the commit pool scheduling model. On when the exception stops the background thread: The synchronous dynamic/JSON commit path calls stream_shard_store->commit(...) directly (not via the async pool). If that write fails — for example, disk full, filesystem error, or a TOO_MANY_PARTS exception — it propagates through StreamCallbackData::doCommit(), which logs and re-throws. The exception then reaches backgroundPollNativeLog() where the stream_commit.commit() call is wrapped in a try/catch that catches the failure, logs "Stopping consume loop after commit failure", and returns — stopping the shard's consume loop. On restart, NativeLog replays from the last committed SN, so no data is lost. On the impact of merging all JSON blocks without thresholds: Without thresholds, JSON blocks just accumulate indefinitely in the commit() loop until the batch ends, creating one massive part. This has two problems: (1) memory pressure — the entire catch-up backlog stays in memory before any write, which could itself cause OOM; and (2) commit latency — one massive write blocks the polling thread for a long time, stalling SN progress. The thresholds (8192 rows / 16MB, read live from storage_stream.getContext()->getSettingsRef()) keep each part bounded — large enough to avoid the tiny-part storm, small enough to bound memory and latency. On the redundant comment (nit): Fair point — will remove. On splitting into separate enhancements: That's a fair observation. The changes do touch three areas, but they're tightly coupled around one failure mode: Batching JSON blocks — the core fix. Without it, every JSON record creates a 1-row part, causing the part storm. |
|
I think sync commit for the dynamic/JSON path should stay in this PR, because it simplifies the fix rather than expanding scope: SN advancement stays tied to successful writes, and we avoid reintroducing more complex async failure-handling. That also seems consistent with the earlier note in #1113 (comment) that sync commit would be a minor enhancement. |
|
@SBALAVIGNESH123 Thanks for the clarification. My thought to split is that squashing JSON tiny block is clear enhancement; while introducing sync write, the flow is quite complex and need more review. Background task works differently by whether block has json type, async block write may encourter the same exceptions you mentioned or others. It will retry but not quit. I am thinking if stop background polling and rely on restart is a good solution. |
|
Good point — you're right, batch goes out of scope right after this, so std::exchange is unnecessary here. A plain std::move(batch) is sufficient. Thanks for catching that. |
|
Thanks for the thoughtful feedback. @yokofly covered the main reasoning above — the sync approach was specifically chosen to avoid the async failure-handling complexity from the earlier iterations of this PR. On stop + restart: agreed it's a heavy response — but for synchronous writes, the block is gone after a failed write, so there's no clean in-place retry without re-reading from NativeLog. Happy to iterate on this if you and @yokofly agree on a different direction. |
|
Hi @SBALAVIGNESH123 Let us discuss about this. Will reply to you later. |
|
Hi @SBALAVIGNESH123 After discussion, we think:
If you have any ideas, please share them. |
|
Thanks @yuzifeng1984 — understood. I'll narrow this PR to just the JSON batching logic: Keep the dynamic_commit_row_threshold / dynamic_commit_byte_threshold batching On the sync/async enhancement idea: one approach could be a per-stream setting like commit_mode = 'async' | 'sync' (defaulting to async for backward compatibility). For streams with hasDynamicSubcolumns(), the engine could default to |
…er maintainer feedback
303e2c9 to
044af72
Compare
|
Updated in 448029c. Narrowed to JSON batching only: Kept: dynamic_commit_row_threshold / dynamic_commit_byte_threshold batching, JSON/non-JSON boundary flush, settings wiring, stateless tests |
9ae4461 to
448029c
Compare
| SequenceRanges missing_sequence_ranges, | ||
| StorageMetadataPtr metadata) | ||
| { | ||
| if (!block) |
There was a problem hiding this comment.
What is the difference before and after the change?
|
The behavior is the same — both paths push to outstanding_sns under the lock and call progressSequencesWithLockHeld |
|
@yokofly The PR LGTM. Please help to progress. |
|
Thanks @yokofly and @yuzifeng1984 for the review and the merge! Tracking the OOM back to the per-record commit in the dynamic subcolumns path was a great debugging exercise — I really enjoyed digging into it. Between this fix and the NATS JetStream connector (#1111), I've spent quite a bit of time exploring Proton internals — the storage commit pipeline, NativeLog integration, external stream framework, and the settings system. Thanks again for being welcoming to contributors — it really makes a difference. |
Thanks for the contributions and for your interest. @SBALAVIGNESH123 At the moment, we do not have budget or headcount for contract, part-time, internship, or other formal roles. We still appreciate your contributions, and you’re welcome to keep contributing through the normal OSS workflow. Separately, if you’re looking for paid open-source opportunities more broadly, you may also want to keep an eye on programs like Google Summer of Code or the Linux Foundation mentorship programs for projects that participate. |
|
Thanks for the honest response — I completely understand the constraints. I'll keep contributing through the OSS workflow and appreciate the team being so responsive and welcoming |

Fixes #1113
Summary
Dynamic/JSON streams were committing too aggressively:
hasDynamicSubcolumns()could force JSON records to flush one part at a time. Under high-throughput catch-up, especially with many shards, that created a tiny-part storm, saturated the commit path, and could end in OOM during restart while outdated parts were being loaded.What changed
dynamic_commit_row_threshold(default8192)dynamic_commit_byte_threshold(default16 MiB)Why this fixes the issue
The original failure mode was part-count explosion: one JSON record could become one part. This change turns that into bounded batch commits, which keeps part creation proportional to the configured thresholds instead of the raw record count.
Tests
99175_dynamic_json_batching99176_dynamic_json_nonjson_boundary