Skip to content

Commit d28dd49

Browse files
committed
lwt: Process lwt request on a owning shard
LWT is much more efficient if a request is processed on a shard that owns a token for the request. This is because otherwise the processing will bounce to an owning shard multiple times. The patch proposes a way to move request to correct shard before running lwt. It works by returning an error from lwt code if a shard is incorrect one specifying the shard the request should be moved to. The error is processed by transport code that jumps to a correct shard and re-process incoming message there.
1 parent 2832f1d commit d28dd49

17 files changed

Lines changed: 281 additions & 60 deletions

auth/service.hh

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <seastar/core/future.hh>
2929
#include <seastar/core/sstring.hh>
3030
#include <seastar/util/bool_class.hh>
31+
#include <seastar/core/sharded.hh>
3132

3233
#include "auth/authenticator.hh"
3334
#include "auth/authorizer.hh"
@@ -76,7 +77,9 @@ public:
7677
///
7778
/// All state associated with access-control is stored externally to any particular instance of this class.
7879
///
79-
class service final {
80+
/// peering_sharded_service inheritance is needed to be able to access shard local authentication service
81+
/// given an object from another shard. Used for bouncing lwt requests to correct shard.
82+
class service final : public seastar::peering_sharded_service<service> {
8083
permissions_cache_config _permissions_cache_config;
8184
std::unique_ptr<permissions_cache> _permissions_cache;
8285

cql3/statements/batch_statement.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,12 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
377377
throw exceptions::invalid_request_exception(format("Unrestricted partition key in a conditional BATCH"));
378378
}
379379

380+
auto shard = service::storage_proxy::cas_shard(request->key()[0].start()->value().as_decorated_key().token());
381+
if (shard != engine().cpu_id()) {
382+
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
383+
make_shared<cql_transport::messages::result_message::bounce_to_shard>(shard));
384+
}
385+
380386
return proxy.cas(schema, request, request->read_command(), request->key(),
381387
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
382388
cl_for_paxos, cl_for_commit, batch_timeout, cas_timeout).then([this, request] (bool is_applied) {

cql3/statements/modification_statement.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,12 @@ modification_statement::execute_with_condition(service::storage_proxy& proxy, se
373373
// modification in the list of CAS commands, since we're handling single-statement execution.
374374
request->add_row_update(*this, std::move(ranges), std::move(json_cache), options);
375375

376+
auto shard = service::storage_proxy::cas_shard(request->key()[0].start()->value().as_decorated_key().token());
377+
if (shard != engine().cpu_id()) {
378+
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
379+
make_shared<cql_transport::messages::result_message::bounce_to_shard>(shard));
380+
}
381+
376382
return proxy.cas(s, request, request->read_command(), request->key(),
377383
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
378384
cl_for_paxos, cl_for_commit, statement_timeout, cas_timeout).then([this, request] (bool is_applied) {

cql3/statements/select_statement.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,14 @@ select_statement::do_execute(service::storage_proxy& proxy,
332332

333333
auto key_ranges = _restrictions->get_partition_key_ranges(options);
334334

335+
if (db::is_serial_consistency(options.get_consistency())) {
336+
unsigned shard = dht::shard_of(key_ranges[0].start()->value().as_decorated_key().token());
337+
if (engine().cpu_id() != shard) {
338+
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
339+
make_shared<cql_transport::messages::result_message::bounce_to_shard>(shard));
340+
}
341+
}
342+
335343
if (!aggregate && !restrictions_need_filtering && (page_size <= 0
336344
|| !service::pager::query_pagers::may_need_paging(*_schema, page_size,
337345
*command, key_ranges))) {

service/client_state.hh

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,29 @@ public:
6868
UNINITIALIZED, AUTHENTICATION, READY
6969
};
7070

71+
// This class is used to move client_state between shards
72+
// It is created on a shard that owns client_state than passed
73+
// to a target shard where client_state_for_another_shard::get()
74+
// can be called to obtain a shard local copy.
75+
class client_state_for_another_shard {
76+
private:
77+
const client_state* _cs;
78+
tracing::global_trace_state_ptr _trace_state;
79+
seastar::sharded<auth::service>* _auth_service;
80+
client_state_for_another_shard(const client_state* cs, tracing::global_trace_state_ptr gt,
81+
seastar::sharded<auth::service>* auth_service) : _cs(cs), _trace_state(gt), _auth_service(auth_service) {}
82+
friend client_state;
83+
public:
84+
client_state get() const {
85+
return client_state(_cs, _trace_state, _auth_service);
86+
}
87+
};
88+
private:
89+
client_state(const client_state* cs, tracing::global_trace_state_ptr gt, seastar::sharded<auth::service>* auth_service)
90+
: _keyspace(cs->_keyspace), _trace_state_ptr(gt), _user(cs->_user), _auth_state(cs->_auth_state),
91+
_is_internal(cs->_is_internal), _is_thrift(cs->_is_thrift), _remote_address(cs->_remote_address),
92+
_auth_service(auth_service ? &auth_service->local() : nullptr) {}
93+
friend client_state_for_another_shard;
7194
private:
7295
sstring _keyspace;
7396
tracing::trace_state_ptr _trace_state_ptr;
@@ -155,7 +178,8 @@ public:
155178
, _is_thrift(false)
156179
{}
157180

158-
client_state(client_state&) = delete;
181+
client_state(const client_state&) = delete;
182+
client_state(client_state&&) = default;
159183

160184
///
161185
/// `nullptr` for internal instances.
@@ -315,6 +339,10 @@ public:
315339
return _user;
316340
}
317341

342+
client_state_for_another_shard move_to_other_shard() {
343+
return client_state_for_another_shard(this, _trace_state_ptr, _auth_service ? &_auth_service->container() : nullptr);
344+
}
345+
318346
#if 0
319347
public static SemanticVersion[] getCQLSupportedVersion()
320348
{

service/query_state.hh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ public:
4242
, _permit(std::move(permit))
4343
{ }
4444

45+
query_state(client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit)
46+
: _client_state(client_state)
47+
, _trace_state_ptr(std::move(trace_state_ptr))
48+
, _permit(std::move(permit))
49+
{ }
50+
4551
const tracing::trace_state_ptr& get_trace_state() const {
4652
return _trace_state_ptr;
4753
}

service/storage_proxy.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,10 @@ sstring get_local_dc() {
137137
return get_dc(local_addr);
138138
}
139139

140+
unsigned storage_proxy::cas_shard(dht::token token) {
141+
return dht::shard_of(token);
142+
}
143+
140144
class mutation_holder {
141145
protected:
142146
size_t _size = 0;
@@ -3969,6 +3973,7 @@ storage_proxy::do_query(schema_ptr s,
39693973
}
39703974
}
39713975

3976+
// WARNING: the function should be called on a shard that owns the key that is been read
39723977
future<storage_proxy::coordinator_query_result>
39733978
storage_proxy::do_query_with_paxos(schema_ptr s,
39743979
lw_shared_ptr<query::read_command> cmd,
@@ -3983,6 +3988,9 @@ storage_proxy::do_query_with_paxos(schema_ptr s,
39833988
auto cl_for_learn = cl == db::consistency_level::LOCAL_SERIAL ? db::consistency_level::LOCAL_QUORUM :
39843989
db::consistency_level::QUORUM;
39853990

3991+
if (cas_shard(partition_ranges[0].start()->value().as_decorated_key().token()) != engine().cpu_id()) {
3992+
throw std::logic_error("storage_proxy::do_query_with_paxos called on a wrong shard");
3993+
}
39863994
// All cas networking operations run with query provided timeout
39873995
db::timeout_clock::time_point timeout = query_options.timeout(*this);
39883996
// When to give up due to contention
@@ -4074,6 +4082,8 @@ storage_proxy::do_query_with_paxos(schema_ptr s,
40744082
* Note that since we are performing a CAS rather than a simple update, we perform a read (of committed
40754083
* values) between the prepare and accept phases. This gives us a slightly longer window for another
40764084
* coordinator to come along and trump our own promise with a newer one but is otherwise safe.
4085+
*
4086+
* WARNING: the function should be called on a shard that owns the key cas() operates on
40774087
*/
40784088
future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> request, lw_shared_ptr<query::read_command> cmd,
40794089
dht::partition_range_vector&& partition_ranges, storage_proxy::coordinator_query_options query_options,
@@ -4086,6 +4096,10 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
40864096
db::validate_for_cas(cl_for_paxos);
40874097
db::validate_for_cas_commit(cl_for_commit, schema->ks_name());
40884098

4099+
if (cas_shard(partition_ranges[0].start()->value().as_decorated_key().token()) != engine().cpu_id()) {
4100+
throw std::logic_error("storage_proxy::cas called on a wrong shard");
4101+
}
4102+
40894103
shared_ptr<paxos_response_handler> handler;
40904104
try {
40914105
handler = seastar::make_shared<paxos_response_handler>(shared_from_this(),

service/storage_proxy.hh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,8 @@ public:
532532
return _stats;
533533
}
534534

535+
static unsigned cas_shard(dht::token token);
536+
535537
virtual void on_join_cluster(const gms::inet_address& endpoint) override;
536538
virtual void on_leave_cluster(const gms::inet_address& endpoint) override;
537539
virtual void on_up(const gms::inet_address& endpoint) override;

service/storage_service.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2236,6 +2236,9 @@ future<> storage_service::start_native_transport() {
22362236
cql_server_config.max_request_size = ss._service_memory_total;
22372237
cql_server_config.get_service_memory_limiter_semaphore = [ss = std::ref(get_storage_service())] () -> semaphore& { return ss.get().local()._service_memory_limiter; };
22382238
cql_server_config.allow_shard_aware_drivers = cfg.enable_shard_aware_drivers();
2239+
smp_service_group_config cql_server_smp_service_group_config;
2240+
cql_server_smp_service_group_config.max_nonlocal_requests = 5000;
2241+
cql_server_config.bounce_request_smp_service_group = create_smp_service_group(cql_server_smp_service_group_config).get0();
22392242
seastar::net::inet_address ip = gms::inet_address::lookup(addr, family, preferred).get0();
22402243
cserver->start(std::ref(cql3::get_query_processor()), std::ref(ss._auth_service), std::ref(ss._cql_config), cql_server_config).get();
22412244
struct listen_cfg {

test/boost/cql_query_test.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1417,6 +1417,7 @@ SEASTAR_TEST_CASE(test_functions) {
14171417
res.push_back(rw[0]);
14181418
}
14191419
}
1420+
virtual void visit(const result_message::bounce_to_shard& rows) override { throw "bad"; }
14201421
};
14211422
validator v;
14221423
msg->accept(v);

0 commit comments

Comments
 (0)