Skip to content

Commit 0f588c5

Browse files
committed
cql_server::connection do_process_on_shard: rebounce msg if needed
Re-bounce the msg to another shard if needed, e.g. in the case of tablet migration. Fixes #15465 Signed-off-by: Benny Halevy <[email protected]>
1 parent a78ee4d commit 0f588c5

2 files changed

Lines changed: 20 additions & 30 deletions

File tree

transport/server.cc

Lines changed: 19 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -962,26 +962,21 @@ make_result(int16_t stream, messages::result_message& msg, const tracing::trace_
962962
template<typename Process>
963963
requires std::same_as<std::invoke_result_t<Process, service::client_state&, distributed<cql3::query_processor>&, request_reader,
964964
uint16_t, cql_protocol_version_type, service_permit, tracing::trace_state_ptr, bool, cql3::computed_function_values>, future<cql_server::process_fn_return_type>>
965-
future<cql_server::result_with_foreign_response_ptr>
965+
future<cql_server::process_fn_return_type>
966966
cql_server::connection::process_on_shard(::shared_ptr<messages::result_message::bounce_to_shard> bounce_msg, uint16_t stream, fragmented_temporary_buffer::istream is,
967967
service::client_state& cs, service_permit permit, tracing::trace_state_ptr trace_state, Process process_fn) {
968-
return _server.container().invoke_on(*bounce_msg->move_to_shard(), _server._config.bounce_request_smp_service_group,
969-
[this, is = std::move(is), cs = cs.move_to_other_shard(), stream, permit = std::move(permit), process_fn,
970-
gt = tracing::global_trace_state_ptr(std::move(trace_state)),
971-
cached_vals = std::move(bounce_msg->take_cached_pk_function_calls())] (cql_server& server) {
972-
service::client_state client_state = cs.get();
973-
return do_with(bytes_ostream(), std::move(client_state), std::move(cached_vals),
974-
[this, &server, is = std::move(is), stream, process_fn,
975-
trace_state = tracing::trace_state_ptr(gt)] (bytes_ostream& linearization_buffer,
976-
service::client_state& client_state,
977-
cql3::computed_function_values& cached_vals) mutable {
978-
request_reader in(is, linearization_buffer);
979-
return process_fn(client_state, server._query_processor, in, stream, _version,
980-
/* FIXME */empty_service_permit(), std::move(trace_state), false, std::move(cached_vals)).then([] (auto msg) {
981-
// result here has to be foreign ptr
982-
return std::get<cql_server::result_with_foreign_response_ptr>(std::move(msg));
983-
});
984-
});
968+
unsigned shard = *bounce_msg->move_to_shard();
969+
auto sg = _server._config.bounce_request_smp_service_group;
970+
auto gcs = cs.move_to_other_shard();
971+
auto gt = tracing::global_trace_state_ptr(std::move(trace_state));
972+
auto cached_vals = std::move(bounce_msg->take_cached_pk_function_calls());
973+
co_return co_await _server.container().invoke_on(shard, sg, [&] (cql_server& server) -> future<process_fn_return_type> {
974+
bytes_ostream linearization_buffer;
975+
request_reader in(is, linearization_buffer);
976+
auto client_state = gcs.get();
977+
auto trace_state = gt.get();
978+
co_return co_await process_fn(client_state, server._query_processor, in, stream, _version,
979+
std::move(permit), std::move(trace_state), false, std::move(cached_vals));
985980
});
986981
}
987982

@@ -997,17 +992,12 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli
997992
tracing::trace_state_ptr trace_state, Process process_fn) {
998993
fragmented_temporary_buffer::istream is = in.get_stream();
999994

1000-
return process_fn(client_state, _server._query_processor, in, stream,
1001-
_version, permit, trace_state, true, {})
1002-
.then([stream, &client_state, this, is, permit, process_fn, trace_state]
1003-
(cql_server::process_fn_return_type msg) mutable {
1004-
auto* bounce_msg = std::get_if<shared_ptr<messages::result_message::bounce_to_shard>>(&msg);
1005-
if (bounce_msg) {
1006-
return process_on_shard(*bounce_msg, stream, is, client_state, std::move(permit), trace_state, process_fn);
1007-
}
1008-
auto ptr = std::get<cql_server::result_with_foreign_response_ptr>(std::move(msg));
1009-
return make_ready_future<cql_server::result_with_foreign_response_ptr>(std::move(ptr));
1010-
});
995+
auto msg = co_await process_fn(client_state, _server._query_processor, in, stream,
996+
_version, permit, trace_state, true, {});
997+
while (auto* bounce_msg = std::get_if<shared_ptr<messages::result_message::bounce_to_shard>>(&msg)) {
998+
msg = co_await process_on_shard(*bounce_msg, stream, is, client_state, /* FIXME */empty_service_permit(), trace_state, process_fn);
999+
}
1000+
co_return std::get<cql_server::result_with_foreign_response_ptr>(std::move(msg));
10111001
}
10121002

10131003
static future<cql_server::process_fn_return_type>

transport/server.hh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ private:
274274
template<typename Process>
275275
requires std::same_as<std::invoke_result_t<Process, service::client_state&, distributed<cql3::query_processor>&, request_reader,
276276
uint16_t, cql_protocol_version_type, service_permit, tracing::trace_state_ptr, bool, cql3::computed_function_values>, future<process_fn_return_type>>
277-
future<result_with_foreign_response_ptr>
277+
future<process_fn_return_type>
278278
process_on_shard(::shared_ptr<messages::result_message::bounce_to_shard> bounce_msg, uint16_t stream, fragmented_temporary_buffer::istream is, service::client_state& cs,
279279
service_permit permit, tracing::trace_state_ptr trace_state, Process process_fn);
280280

0 commit comments

Comments
 (0)