alternator/streams: keep disabled streams usable and purge on re-enable#29413
alternator/streams: keep disabled streams usable and purge on re-enable#29413ScyllaPiotr wants to merge 4 commits intoscylladb:masterfrom
Conversation
c898ba6 to
719517c
Compare
Docs Preview 📖Docs Preview for this pull request is available here Changed Files: Note: This preview will be available for 30 days and will be automatically deleted after that period. You can manually trigger a new build by committing changes. |
| # through the old ARN. In Alternator, the old stream data is purged | ||
| # immediately upon re-enabling (a documented discrepancy). | ||
| # Reproduces issue #7239. | ||
| def test_streams_reenable(dynamodb, dynamodbstreams, scylla_only): |
There was a problem hiding this comment.
I think this test should be xfail, not "scylla_only". This is a feature that DynamoDB has an Scylla doesn't - it's not a Scylla extension, it's a scylla deficiency.
There was a problem hiding this comment.
You're right.
I wanted the final assert to be with pytest.xfail for the non-AWS case, but that's not possible, so I will make it as two tests instead.
🟢 CI State: SUCCESS
Build Details:
|
d1b01c7 to
d5b7208
Compare
🟢 CI State: SUCCESS
Build Details:
|
|
@scylladb/releng - FYI there is a "iwyu / Analyze #includes in source files (pull_request)" failure below which I don't understand, and probably has nothing to do with this PR. |
@nyh We don't maintain this workflow, i am not sure what is about |
@avikivity @mykaul we need to either have one person in charge of all these github workflows - or at least maintain a document on who's responsible for which. When one of these bots breaks, it kills everyone's work, and I have no idea whom to ask. Two more options we can have:
|
d5b7208 to
3d29d91
Compare
annastuchlik
left a comment
There was a problem hiding this comment.
The docs look good.
How is it different than code modules? There's no one person in charge of any code module / file / whatnot.
I was surprised as you were to see the new license. I don't know how it happened. |
🔴 CI State: FAILURE
Failed Tests (3/67720):
📄 Elasticsearch analysis for test_random_failures[stop_during_gossip_shadow_round-restart_coordinator_node]
Recent 10 Failures
Build Details:
Note: To re-trigger CI for this PR, comment: |
🟢 CI State: SUCCESS
Build Details:
|
3d29d91 to
72743db
Compare
Previously, disabling Alternator Streams would create a blank cdc::options with only enabled=false, which meant losing access also to stored Streams's data (including preimage and postimage). Now, when a stream is disabled: - The existing CDC options are preserved (only 'enabled' is flipped to false), so StreamViewType remains available. - DescribeStream enumerates all shards with EndingSequenceNumber set, indicating they are closed. - GetRecords omits NextShardIterator for disabled streams. - DescribeTable (supplement_table_stream_info) reports the stream ARN and StreamEnabled: false when the CDC log table still exists. - ListStreams uses get_base_table instead of is_log_for_some_table so that disabled streams whose log table still exists are listed. When a stream is re-enabled on an Alternator table that has an existing (disabled) CDC log table, the old log table is dropped and a fresh one is created with a new UUID, producing a new StreamArn. This is Alternator-specific behavior; CQL CDC tables continue to reuse the existing log table. The old stream data is lost immediately upon re-enable. DynamoDB keeps it readable for 24 hours. Tests: - test_streams_closed_read, test_streams_disabled_stream: remove xfail now that disabled streams are usable. - test_streams_disable_data_survives: new test verifying that stream data remains readable after disabling. - test_streams_reenable: new test verifying that re-enabling produces a new ARN and the old data is still readable via the old ARN (xfail on Scylla, which currently purges old data on re-enable). Fixes scylladb#7239
Both disable_stream and wait_for_active_stream used time.process_time() for their timeouts, but process_time measures CPU time, not wall-clock time. Since these loops spend most of their time sleeping and waiting on API calls, the timeouts could last far longer than intended. Use time.time() instead to enforce actual wall-clock deadlines.
72743db to
71ee33e
Compare
🔴 CI State: FAILURE
Failed Tests (1/8942):📄 Elasticsearch analysis for test_deferred_stream_enablement_on_tablets
Recent 10 Failures
Build Details:
Note: To re-trigger CI for this PR, comment: |
|
@ScyllaPiotr if I understand correctly, the flaky test |
…n disable When disabling Alternator Streams, the disable path copied existing CDC options and only flipped enabled to false, preserving enable_requested and tablet_merge_blocked. This meant that: 1. Disabling during the ENABLING state left enable_requested=true, so the topology coordinator would finalize the enablement anyway. 2. tablet_merge_blocked=true persisted after disable, blocking tablet merges forever. Clear both flags when disabling streams. Fixes the Phase 4 failure in test_deferred_stream_enablement_on_tablets.
|
The bug this test caught was related to this patch. Fixed, pushed. |
|
@nyh I know you can't merge, but please review if possible. |
Do you know if this is this is intentional in CQL - that if you disable CDC and then re-enable it, the old data remains behind, and you get a mix of old data and new data with a time gap in the middle? Does anybody actually want this behavior? CC @piodul.
I see. I hope we have a test showing this discrepancy (we don't need to fix it, we may never want to fix it, but we must know it exists).
|
There was a problem hiding this comment.
Pull request overview
This PR updates Alternator Streams behavior so that disabling a stream keeps existing stream data readable (while stopping new records), and re-enabling forces creation of a fresh stream (new StreamArn) while purging old data. It also updates tests and documentation to reflect these semantics and the remaining DynamoDB-compatibility differences.
Changes:
- Keep disabled streams listed/usable for reads by preserving CDC options and treating shards as closed.
- On re-enable, drop the old CDC log table first so the subsequent enable creates a new log table/UUID (new StreamArn).
- Update Alternator Streams tests and compatibility documentation for the new disable/re-enable behavior.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| test/alternator/test_streams.py | Fixes stream-disable wait timing; adds new tests for post-disable readability and re-enable behavior. |
| docs/alternator/compatibility.md | Documents how disabled streams remain readable and how re-enable purges old data (differs from DynamoDB). |
| alternator/streams.cc | Lists disabled streams whose log tables still exist; describes disabled streams as closed; adjusts GetRecords behavior and CDC option handling on disable. |
| alternator/executor.hh | Extends add_stream_options() API to accept existing CDC options when disabling. |
| alternator/executor.cc | Passes existing CDC options on disable; on re-enable drops the old CDC log table before creating a new one. |
| if (!base->cdc_options().enabled()) { | ||
| // Stream is disabled -- all shards are closed (#7239). | ||
| // Don't return NextShardIterator. | ||
| } else if (shard.time < ts && ts < high_ts) { | ||
| // The DynamoDB documentation states that when a shard is | ||
| // closed, reading it until the end has NextShardIterator | ||
| // "set to null". Our test test_streams_closed_read |
There was a problem hiding this comment.
In this end-of-shard decision logic, the subsequent else-branch (when the shard is still open) constructs a next_iter but then returns the old iter as NextShardIterator. That leaves next_iter unused (will fail under -Werror) and prevents advancing the iterator threshold, risking callers repeatedly querying the same empty range. Ensure the returned NextShardIterator actually uses the advanced iterator value.
There was a problem hiding this comment.
I didn't understand what the AI meant here, but please investigate this.
| print('disabled stream on {}'.format(table.name)) | ||
| return | ||
| time.sleep(0.5) | ||
| time.sleep(0.1) |
There was a problem hiding this comment.
In disable_stream(), the polling loop now sleeps only 0.1s between ListStreams/DescribeStream calls. This can exceed DynamoDB’s documented DescribeStream rate limits (and will scale with the number of historic streams), making the tests more likely to be throttled/flaky. Consider restoring a slower poll interval (e.g., 0.5s) or adding backoff/jitter while still using time.time() for the timeout.
| time.sleep(0.1) | |
| time.sleep(0.5) |
There was a problem hiding this comment.
I think this is fine.
I didn't realize that Copilot is right -DescribeStream indeed has a 10-per-second limit (https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html). But sleeping 0.1, and adding the unavoidable network lag, I think 0.1 is exactly right. Not to mention that if this request does get throttled, it just means that the SDK will retry, so no harm done. On Alternator, lower sleeps between retries can make tests faster.
| desc = dynamodbstreams.describe_stream(StreamArn=arn)['StreamDescription'] | ||
| nrecords = 0 | ||
| for shard in desc['Shards']: | ||
| iter = dynamodbstreams.get_shard_iterator(StreamArn=arn, | ||
| ShardId=shard['ShardId'], ShardIteratorType='TRIM_HORIZON')['ShardIterator'] | ||
| response = dynamodbstreams.get_records(ShardIterator=iter) | ||
| if 'Records' in response: | ||
| nrecords += len(response['Records']) |
There was a problem hiding this comment.
_count_stream_records() claims to count “all shards” and “total number of records”, but it only reads the first DescribeStream page and only a single GetRecords page per shard (ignoring NextShardIterator). This can undercount and make the new assertions unreliable. Consider iterating DescribeStream pages (LastEvaluatedShardId) and paging GetRecords until NextShardIterator is absent to actually consume the full stream (or rename/comment it to reflect what it really does).
| desc = dynamodbstreams.describe_stream(StreamArn=arn)['StreamDescription'] | |
| nrecords = 0 | |
| for shard in desc['Shards']: | |
| iter = dynamodbstreams.get_shard_iterator(StreamArn=arn, | |
| ShardId=shard['ShardId'], ShardIteratorType='TRIM_HORIZON')['ShardIterator'] | |
| response = dynamodbstreams.get_records(ShardIterator=iter) | |
| if 'Records' in response: | |
| nrecords += len(response['Records']) | |
| shards = [] | |
| last_evaluated_shard_id = None | |
| while True: | |
| kwargs = {'StreamArn': arn} | |
| if last_evaluated_shard_id: | |
| kwargs['ExclusiveStartShardId'] = last_evaluated_shard_id | |
| desc = dynamodbstreams.describe_stream(**kwargs)['StreamDescription'] | |
| shards.extend(desc.get('Shards', [])) | |
| last_evaluated_shard_id = desc.get('LastEvaluatedShardId') | |
| if not last_evaluated_shard_id: | |
| break | |
| nrecords = 0 | |
| for shard in shards: | |
| shard_iter = dynamodbstreams.get_shard_iterator(StreamArn=arn, | |
| ShardId=shard['ShardId'], ShardIteratorType='TRIM_HORIZON')['ShardIterator'] | |
| while shard_iter: | |
| response = dynamodbstreams.get_records(ShardIterator=shard_iter) | |
| nrecords += len(response.get('Records', [])) | |
| shard_iter = response.get('NextShardIterator') |
There was a problem hiding this comment.
Copilot is right.
One option is to fix this function as copilot suggested. The other option, if you only care whether the count is zero or not, is to rename this function stream_is_empty() or something and return true or false. But even with stream_is_empty(), you should look at all shards, not just the first page.
| table.update_item(Key={'p': p, 'c': random_string()}, | ||
| UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5}) | ||
|
|
||
| assert _count_stream_records(dynamodbstreams, arn1) > 0 |
There was a problem hiding this comment.
The new tests assert that stream records are immediately visible right after update_item(). Alternator streams are asynchronous (and this file already uses retry loops/timeouts elsewhere), so these immediate “> 0” assertions are likely to be flaky on both AWS and Scylla. Consider adding a bounded retry loop (like fetch_and_compare_events()/other tests above) to wait until at least one record becomes visible before asserting on counts.
| assert _count_stream_records(dynamodbstreams, arn1) > 0 | |
| deadline = time.time() + 10 | |
| count = 0 | |
| while time.time() < deadline: | |
| count = _count_stream_records(dynamodbstreams, arn1) | |
| if count > 0: | |
| break | |
| time.sleep(0.1) | |
| assert count > 0 |
| group0_guard = co_await mm.start_group0_operation(); | ||
| tab = get_table(p.local(), request); | ||
| builder = schema_builder(tab); | ||
| add_stream_options(*stream_specification, builder, p.local(), tab->cdc_options()); | ||
| } |
There was a problem hiding this comment.
After dropping the old CDC log table, the code reloads tab/builder and re-applies add_stream_options(), but it does not re-run the follow-up steps that were applied to the original builder when enabling streams (e.g., validate_cdc_log_name_length() and, on tablet tables, defer_enabling_streams_block_tablet_merges()). Because builder was reconstructed, those adjustments are lost and re-enable behavior can diverge from first-time enablement on tablet tables. Consider re-applying the same post-processing after the second add_stream_options() call (or refactor to avoid duplicating the enable flow).
There was a problem hiding this comment.
I think my proposal to just do "continue" will do the right thing for free.
nyh
left a comment
There was a problem hiding this comment.
Thanks, looks mostly good, but I have a couple of comments and requests (and copilot had too), some of them are actually potential bugs that should be fixed, others are less serious nitpicks.
| format("alternator-executor: drop old CDC log for {}", tab->cf_name())); | ||
| co_await mm.wait_for_schema_agreement( | ||
| p.local().local_db(), db::timeout_clock::now() + 10s, nullptr); | ||
| group0_guard = co_await mm.start_group0_operation(); |
There was a problem hiding this comment.
I think this code is not concurrency-safe:
Between dropping the old guard (the std::move above) and starting a new one here, things might have changed. Some parallel operation might have re-enabled CDC or deleted the table entirely or I don't know what.
I think there's an easy solution: you just need to call "continue" here to retry from the beginning, and not try to assume that re-capturing the guard will work. In the "good" case, after the "continue" you reach will this code again, and it will notice that the log table no longer exists, so it will have nothing to do.
| group0_guard = co_await mm.start_group0_operation(); | ||
| tab = get_table(p.local(), request); | ||
| builder = schema_builder(tab); | ||
| add_stream_options(*stream_specification, builder, p.local(), tab->cdc_options()); | ||
| } |
There was a problem hiding this comment.
I think my proposal to just do "continue" will do the right thing for free.
| if (!base->cdc_options().enabled()) { | ||
| // Stream is disabled -- all shards are closed (#7239). | ||
| // Don't return NextShardIterator. | ||
| } else if (shard.time < ts && ts < high_ts) { | ||
| // The DynamoDB documentation states that when a shard is | ||
| // closed, reading it until the end has NextShardIterator | ||
| // "set to null". Our test test_streams_closed_read |
There was a problem hiding this comment.
I didn't understand what the AI meant here, but please investigate this.
| // When disabling, preserve the existing CDC options (preimage, | ||
| // postimage, ttl, etc.) so that DescribeStream can still report the | ||
| // correct StreamViewType on a disabled stream. Only flip enabled to | ||
| // false. See issue #7239. |
There was a problem hiding this comment.
Nitpick: I don't think we need to reference issue #7239 in every comment. The explanation that we need to keep the old options so DescribeStream can show them is good enough - nobody should go to issue 7239 for more information, you wont find additional options there.
| stream_arn arn(cf.schema(), cdc::get_base_table(db.real_database(), *cf.schema())); | ||
| // Show stream info when CDC is enabled, or when it was disabled but the | ||
| // log table still exists (the stream is usable for reads). See #7239. | ||
| auto db = sp.data_dictionary(); |
There was a problem hiding this comment.
The comment says "when CDC is enabled or ...", but I don't see the "when CDC is enabled" (opts.enabled()) in the new code. I am worried that because of this, UpdateTable may not add the right stream info in the response because the log table doesn't exist yet (?). I don't remember if we have a test for this - what UpdateTable adding a stream actually returns.
| # Test that after disabling and re-enabling a stream on a table, the old | ||
| # stream data remains readable through the old ARN. In DynamoDB, it | ||
| # remains readable for 24 hours. In Scylla, it is currently purged upon | ||
| # re-enabling (issue #7239). |
There was a problem hiding this comment.
You will be closing #7239, so please either create a new issue about this last remaining problem and mention that issue (in the comment and xfail), or don't mention any issue. I think it's confusing to list a closed issue as an xfail reason.
| table.update_item(Key={'p': p, 'c': random_string()}, | ||
| UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5}) | ||
|
|
||
| assert _count_stream_records(dynamodbstreams, arn1) > 0 |
| stream purges the old data immediately and produces a new StreamArn. | ||
| In contrast, DynamoDB keeps the old stream and its data readable for | ||
| 24 hours through the old StreamArn even after re-enabling. | ||
| <https://github.com/scylladb/scylla/issues/7239> |
There was a problem hiding this comment.
You'll be closing this issue so please don't leave a link to it as a closed issue.
You should create a new issue for the last remaining point.
Very sadly, this new link be a JIRA issue that will users who can see compatibility.md will not actually be able to see. What a colossal mistake it was to outlaw github issues :-(
CC @dani-tweig - this was one of the reasons I didn't want to drop the public github issues.
| # them (the current one) may remain DISABLING for some time. | ||
| exp = time.process_time() + 60 | ||
| while time.process_time() < exp: | ||
| exp = time.time() + 60 |
There was a problem hiding this comment.
yikes. good catch. How did I ever write this code like this? :-( Thanks for fixing it.
In any case, in the working case the timeout will never be reached which is why we never noticed this problem.
| cdc::options opts = existing_cdc_opts; | ||
| opts.enabled(false); | ||
| opts.enable_requested(false); | ||
| opts.tablet_merge_blocked(false); |
There was a problem hiding this comment.
Please merge this fix with the first patch, that introduced this bug.
When an Alternator stream is disabled, the data should continue to be accessible so that consumers can finish reading. When the stream is later re-enabled, a new StreamArn is produced and only then the old data is purged.
On disable, the existing CDC options (including preimage and postimage) are preserved so that DescribeStream can still report StreamViewType. All stream APIs continue to work on the disabled stream, with all shards reported as closed (EndingSequenceNumber set). No new CDC records are written; existing data expires via TTL after 24 hours.
On re-enable, the old CDC log table is dropped as a separate Raft group0 schema change and a fresh one is created with a new UUID, giving a new StreamArn. This is Alternator-specific — CQL CDC keeps reusing the log table. Re-enabling is the only way to immediately purge old stream data.
Old stream data is removed immediately upon re-enable (a discrepancy with DynamoDB, which keeps it readable for 24 hours through the old StreamArn).
Tests updated to cover the new disable and re-enable behavior.
Fixes #7239
Fixes SCYLLADB-523