@@ -962,26 +962,21 @@ make_result(int16_t stream, messages::result_message& msg, const tracing::trace_
962962template <typename Process>
963963requires std::same_as<std::invoke_result_t <Process, service::client_state&, distributed<cql3::query_processor>&, request_reader,
964964 uint16_t , cql_protocol_version_type, service_permit, tracing::trace_state_ptr, bool , cql3::computed_function_values>, future<cql_server::process_fn_return_type>>
965- future<cql_server::result_with_foreign_response_ptr >
965+ future<cql_server::process_fn_return_type >
966966cql_server::connection::process_on_shard (::shared_ptr<messages::result_message::bounce_to_shard> bounce_msg, uint16_t stream, fragmented_temporary_buffer::istream is,
967967 service::client_state& cs, service_permit permit, tracing::trace_state_ptr trace_state, Process process_fn) {
968- return _server.container ().invoke_on (*bounce_msg->move_to_shard (), _server._config .bounce_request_smp_service_group ,
969- [this , is = std::move (is), cs = cs.move_to_other_shard (), stream, permit = std::move (permit), process_fn,
970- gt = tracing::global_trace_state_ptr (std::move (trace_state)),
971- cached_vals = std::move (bounce_msg->take_cached_pk_function_calls ())] (cql_server& server) {
972- service::client_state client_state = cs.get ();
973- return do_with (bytes_ostream (), std::move (client_state), std::move (cached_vals),
974- [this , &server, is = std::move (is), stream, process_fn,
975- trace_state = tracing::trace_state_ptr (gt)] (bytes_ostream& linearization_buffer,
976- service::client_state& client_state,
977- cql3::computed_function_values& cached_vals) mutable {
978- request_reader in (is, linearization_buffer);
979- return process_fn (client_state, server._query_processor , in, stream, _version,
980- /* FIXME */ empty_service_permit (), std::move (trace_state), false , std::move (cached_vals)).then ([] (auto msg) {
981- // result here has to be foreign ptr
982- return std::get<cql_server::result_with_foreign_response_ptr>(std::move (msg));
983- });
984- });
968+ unsigned shard = *bounce_msg->move_to_shard ();
969+ auto sg = _server._config .bounce_request_smp_service_group ;
970+ auto gcs = cs.move_to_other_shard ();
971+ auto gt = tracing::global_trace_state_ptr (std::move (trace_state));
972+ auto cached_vals = std::move (bounce_msg->take_cached_pk_function_calls ());
973+ co_return co_await _server.container ().invoke_on (shard, sg, [&] (cql_server& server) -> future<process_fn_return_type> {
974+ bytes_ostream linearization_buffer;
975+ request_reader in (is, linearization_buffer);
976+ auto client_state = gcs.get ();
977+ auto trace_state = gt.get ();
978+ co_return co_await process_fn (client_state, server._query_processor , in, stream, _version,
979+ std::move (permit), std::move (trace_state), false , std::move (cached_vals));
985980 });
986981}
987982
@@ -997,17 +992,12 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli
997992 tracing::trace_state_ptr trace_state, Process process_fn) {
998993 fragmented_temporary_buffer::istream is = in.get_stream ();
999994
1000- return process_fn (client_state, _server._query_processor , in, stream,
1001- _version, permit, trace_state, true , {})
1002- .then ([stream, &client_state, this , is, permit, process_fn, trace_state]
1003- (cql_server::process_fn_return_type msg) mutable {
1004- auto * bounce_msg = std::get_if<shared_ptr<messages::result_message::bounce_to_shard>>(&msg);
1005- if (bounce_msg) {
1006- return process_on_shard (*bounce_msg, stream, is, client_state, std::move (permit), trace_state, process_fn);
1007- }
1008- auto ptr = std::get<cql_server::result_with_foreign_response_ptr>(std::move (msg));
1009- return make_ready_future<cql_server::result_with_foreign_response_ptr>(std::move (ptr));
1010- });
995+ auto msg = co_await process_fn (client_state, _server._query_processor , in, stream,
996+ _version, permit, trace_state, true , {});
997+ while (auto * bounce_msg = std::get_if<shared_ptr<messages::result_message::bounce_to_shard>>(&msg)) {
998+ msg = co_await process_on_shard (*bounce_msg, stream, is, client_state, /* FIXME */ empty_service_permit (), trace_state, process_fn);
999+ }
1000+ co_return std::get<cql_server::result_with_foreign_response_ptr>(std::move (msg));
10111001}
10121002
10131003static future<cql_server::process_fn_return_type>
0 commit comments