Skip to content

Commit 7d0f4c1

Browse files
committed
test: tablets: Add test for failed streaming being fenced away
1 parent 083a027 commit 7d0f4c1

3 files changed

Lines changed: 126 additions & 7 deletions

File tree

streaming/stream_session.cc

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include "consumer.hh"
3939
#include "readers/generating_v2.hh"
4040
#include "service/topology_guard.hh"
41+
#include "utils/error_injection.hh"
4142

4243
namespace streaming {
4344

@@ -127,18 +128,27 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
127128
return make_exception_future<rpc::sink<int>>(std::runtime_error(format("Node {} is not fully initialized for streaming, try again later",
128129
utils::fb_utilities::get_broadcast_address())));
129130
}
130-
return _mm.local().get_schema_for_write(schema_id, from, _ms.local(), as).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard] (schema_ptr s) mutable {
131-
return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout, {}).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard, s] (reader_permit permit) mutable {
131+
return _mm.local().get_schema_for_write(schema_id, from, _ms.local(), as).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard, &as] (schema_ptr s) mutable {
132+
return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout, {}).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard, s, &as] (reader_permit permit) mutable {
132133
struct stream_mutation_fragments_cmd_status {
133134
bool got_cmd = false;
134135
bool got_end_of_stream = false;
135136
};
136137
auto cmd_status = make_lw_shared<stream_mutation_fragments_cmd_status>();
137138
auto offstrategy_update = make_lw_shared<offstrategy_trigger>(_db, cf_id, plan_id);
138139
auto guard = service::topology_guard(s->table(), topo_guard);
139-
auto get_next_mutation_fragment = [guard = std::move(guard), &sm = container(), source, plan_id, from, s, cmd_status, offstrategy_update, permit] () mutable {
140+
141+
// Will log a message when streaming is done. Used to synchronize tests.
142+
lw_shared_ptr<std::any> log_done;
143+
if (utils::get_local_injector().is_enabled("stream_mutation_fragments")) {
144+
log_done = make_lw_shared<std::any>(seastar::make_shared(seastar::defer([] {
145+
sslog.info("stream_mutation_fragments: done");
146+
})));
147+
}
148+
149+
auto get_next_mutation_fragment = [guard = std::move(guard), &as, &sm = container(), source, plan_id, from, s, cmd_status, offstrategy_update, permit] () mutable {
140150
guard.check();
141-
return source().then([&sm, plan_id, from, s, cmd_status, offstrategy_update, permit] (std::optional<std::tuple<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>>> opt) mutable {
151+
return source().then([&sm, &guard, &as, plan_id, from, s, cmd_status, offstrategy_update, permit] (std::optional<std::tuple<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>>> opt) mutable {
142152
if (opt) {
143153
auto cmd = std::get<1>(*opt);
144154
if (cmd) {
@@ -160,7 +170,19 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
160170
auto mf = fmf.unfreeze(*s, permit);
161171
sm.local().update_progress(plan_id, from.addr, progress_info::direction::IN, sz);
162172
offstrategy_update->update();
163-
return make_ready_future<mutation_fragment_opt>(std::move(mf));
173+
174+
return utils::get_local_injector().inject_with_handler("stream_mutation_fragments", [&guard, &as] (auto& handler) -> future<> {
175+
auto& guard_ = guard;
176+
auto& as_ = as;
177+
sslog.info("stream_mutation_fragments: waiting");
178+
while (!handler.poll_for_message()) {
179+
guard_.check();
180+
co_await sleep_abortable(std::chrono::milliseconds(5), as_);
181+
}
182+
sslog.info("stream_mutation_fragments: released");
183+
}).then([mf = std::move(mf)] () mutable {
184+
return mutation_fragment_opt(std::move(mf));
185+
});
164186
} else {
165187
// If the sender has sent stream_mutation_fragments_cmd it means it is
166188
// a node that understands the new protocol. It must send end_of_stream
@@ -185,7 +207,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
185207
make_generating_reader_v1(s, permit, std::move(get_next_mutation_fragment)),
186208
make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard),
187209
std::move(op)
188-
).then_wrapped([s, plan_id, from, sink, estimated_partitions, sh_ptr = std::move(sharder_ptr)] (future<uint64_t> f) mutable {
210+
).then_wrapped([s, plan_id, from, sink, estimated_partitions, log_done, sh_ptr = std::move(sharder_ptr)] (future<uint64_t> f) mutable {
189211
int32_t status = 0;
190212
uint64_t received_partitions = 0;
191213
if (f.failed()) {

test/pylib/manager_client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,10 @@ async def get_host_id(self, server_id: ServerNum) -> HostID:
337337
raise Exception(f"Failed to get local host id address for server {server_id}") from exc
338338
return HostID(host_id)
339339

340+
async def get_table_id(self, keyspace: str, table: str):
341+
rows = await self.cql.run_async(f"select id from system_schema.tables where keyspace_name = '{keyspace}' and table_name = '{table}'")
342+
return rows[0].id
343+
340344
async def server_sees_others(self, server_id: ServerNum, count: int, interval: float = 45.):
341345
"""Wait till a server sees a minimum given count of other servers"""
342346
if count < 1:

test/topology_experimental_raft/test_tablets.py

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@
33
#
44
# SPDX-License-Identifier: AGPL-3.0-or-later
55
#
6+
from uuid import UUID
7+
8+
from cassandra.query import SimpleStatement, ConsistencyLevel
69

710
from test.pylib.manager_client import ManagerClient
8-
from test.pylib.rest_client import inject_error_one_shot
11+
from test.pylib.rest_client import inject_error_one_shot, HTTPError
912
from test.pylib.rest_client import inject_error
1013
from test.pylib.util import wait_for_cql_and_get_hosts
14+
from test.topology.conftest import skip_mode
1115
from test.topology.util import reconnect_driver
1216

1317
import pytest
@@ -29,6 +33,21 @@ async def inject_error_on(manager, error_name, servers):
2933
await asyncio.gather(*errs)
3034

3135

36+
async def get_tablet_replicas(manager, keyspace_name, table_name, token):
37+
table_id = await manager.get_table_id(keyspace_name, table_name)
38+
rows = await manager.cql.run_async(f"SELECT last_token, replicas FROM system.tablets where "
39+
f"keyspace_name = '{keyspace_name}' and "
40+
f"table_id = {table_id}")
41+
for row in rows:
42+
if row.last_token >= token:
43+
return row.replicas
44+
45+
46+
async def get_tablet_replica(manager, keyspace_name, table_name, token):
47+
replicas = await get_tablet_replicas(manager, keyspace_name, table_name, token)
48+
return replicas[0]
49+
50+
3251
@pytest.mark.asyncio
3352
async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(manager: ManagerClient):
3453
"""Test that you can create a table and insert and query data"""
@@ -199,3 +218,77 @@ async def check():
199218
await check()
200219

201220
await cql.run_async("DROP KEYSPACE test;")
221+
222+
223+
@pytest.mark.asyncio
224+
@skip_mode('release', 'error injections are not supported in release mode')
225+
async def test_streaming_is_guarded_by_topology_guard(manager: ManagerClient):
226+
logger.info("Bootstrapping cluster")
227+
cmdline = [
228+
'--logger-log-level', 'storage_service=trace',
229+
]
230+
servers = [await manager.server_add(cmdline=cmdline)]
231+
232+
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
233+
234+
cql = manager.get_cql()
235+
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', "
236+
"'replication_factor': 1, 'initial_tablets': 1};")
237+
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
238+
239+
servers.append(await manager.server_add(cmdline=cmdline))
240+
241+
key = 7 # Whatever
242+
tablet_token = 0 # Doesn't matter since there is one tablet
243+
await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({key}, 0)")
244+
rows = await cql.run_async("SELECT pk from test.test")
245+
assert len(list(rows)) == 1
246+
247+
replica = await get_tablet_replica(manager, 'test', 'test', tablet_token)
248+
249+
s0_host_id = await manager.get_host_id(servers[0].server_id)
250+
s1_host_id = await manager.get_host_id(servers[1].server_id)
251+
dst_shard = 0
252+
253+
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments", one_shot=True)
254+
s1_log = await manager.server_open_log(servers[1].server_id)
255+
s1_mark = await s1_log.mark()
256+
257+
migration_task = asyncio.create_task(
258+
manager.api.move_tablet(servers[0].ip_addr, "test", "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
259+
260+
# Wait for the replica-side writer of streaming to reach a place where it already
261+
# received writes from the leaving replica but haven't applied them yet.
262+
# Once the writer reaches this place, it will wait for the message_injection() call below before proceeding.
263+
# The place we block the writer in should not hold to erm or topology_guard because that will block the migration
264+
# below and prevent test from proceeding.
265+
await s1_log.wait_for('stream_mutation_fragments: waiting', from_mark=s1_mark)
266+
s1_mark = await s1_log.mark()
267+
268+
# Should cause streaming to fail and be retried while leaving behind the replica-side writer.
269+
await manager.api.inject_disconnect(servers[0].ip_addr, servers[1].ip_addr)
270+
271+
logger.info("Waiting for migration to finish")
272+
await migration_task
273+
logger.info("Migration done")
274+
275+
# Sanity test
276+
rows = await cql.run_async("SELECT pk from test.test")
277+
assert len(list(rows)) == 1
278+
279+
await cql.run_async("TRUNCATE test.test")
280+
rows = await cql.run_async("SELECT pk from test.test")
281+
assert len(list(rows)) == 0
282+
283+
# Release abandoned streaming
284+
await manager.api.message_injection(servers[1].ip_addr, "stream_mutation_fragments")
285+
await s1_log.wait_for('stream_mutation_fragments: done', from_mark=s1_mark)
286+
287+
# Verify that there is no data resurrection
288+
rows = await cql.run_async("SELECT pk from test.test")
289+
assert len(list(rows)) == 0
290+
291+
# Verify that moving the tablet back works
292+
await manager.api.move_tablet(servers[0].ip_addr, "test", "test", s1_host_id, dst_shard, replica[0], replica[1], tablet_token)
293+
rows = await cql.run_async("SELECT pk from test.test")
294+
assert len(list(rows)) == 0

0 commit comments

Comments
 (0)