Skip to content

Commit 046e989

Browse files
maximebedardmattklein123
authored andcommitted
redis: prefixed routing (#5658)
Signed-off-by: Maxime Bedard <[email protected]>
1 parent 78ad883 commit 046e989

File tree

26 files changed

+660
-78
lines changed

26 files changed

+660
-78
lines changed

DEPRECATED.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ A logged warning is expected for each deprecated item that is in deprecation win
1111
* Use of `enabled` in `CorsPolicy`, found in
1212
[route.proto](https://github.com/envoyproxy/envoy/blob/master/api/envoy/api/v2/route/route.proto).
1313
Set the `filter_enabled` field instead.
14+
* Use of google.protobuf.Struct for extension opaque configs is deprecated. Use google.protobuf.Any instead or pack
15+
google.protobuf.Struct in google.protobuf.Any.
16+
* Use of `cluster`, found in [redis-proxy.proto](https://github.com/envoyproxy/envoy/blob/master/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto) is deprecated. Set a `PrefixRoutes.catch_all_cluster` instead.
1417

1518
## Version 1.9.0 (Dec 20, 2018)
1619

api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,13 @@ message RedisProxy {
2222
// Name of cluster from cluster manager. See the :ref:`configuration section
2323
// <arch_overview_redis_configuration>` of the architecture overview for recommendations on
2424
// configuring the backing cluster.
25-
string cluster = 2 [(validate.rules).string.min_bytes = 1];
25+
//
26+
// .. attention::
27+
//
28+
// This field is deprecated. Use a :ref:`catch-all
29+
// cluster<envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.PrefixRoutes.catch_all_cluster>`
30+
// instead.
31+
string cluster = 2 [deprecated = true];
2632

2733
// Redis connection pool settings.
2834
message ConnPoolSettings {
@@ -48,10 +54,63 @@ message RedisProxy {
4854
bool enable_hashtagging = 2;
4955
}
5056

51-
// Network settings for the connection pool to the upstream cluster.
57+
// Network settings for the connection pool to the upstream clusters.
5258
ConnPoolSettings settings = 3 [(validate.rules).message.required = true];
5359

5460
// Indicates that latency stat should be computed in microseconds. By default it is computed in
5561
// milliseconds.
5662
bool latency_in_micros = 4;
63+
64+
message PrefixRoutes {
65+
message Route {
66+
// String prefix that must match the beginning of the keys. Envoy will always favor the
67+
// longest match.
68+
string prefix = 1 [(validate.rules).string.min_bytes = 1];
69+
70+
// Indicates if the prefix needs to be removed from the key when forwarded.
71+
bool remove_prefix = 2;
72+
73+
// Upstream cluster to forward the command to.
74+
string cluster = 3 [(validate.rules).string.min_bytes = 1];
75+
}
76+
77+
// List of prefix routes.
78+
repeated Route routes = 1 [(gogoproto.nullable) = false];
79+
80+
// Indicates that prefix matching should be case insensitive.
81+
bool case_insensitive = 2;
82+
83+
// Optional catch-all route to forward commands that doesn't match any of the routes. The
84+
// catch-all route becomes required when no routes are specified.
85+
string catch_all_cluster = 3;
86+
}
87+
88+
// List of **unique** prefixes used to separate keys from different workloads to different
89+
// clusters. Envoy will always favor the longest match first in case of overlap. A catch-all
90+
// cluster can be used to forward commands when there is no match. Time complexity of the
91+
// lookups are in O(min(longest key prefix, key length)).
92+
//
93+
// Example:
94+
//
95+
// .. code-block:: yaml
96+
//
97+
// prefix_routes:
98+
// routes:
99+
// - prefix: "ab"
100+
// cluster: "cluster_a"
101+
// - prefix: "abc"
102+
// cluster: "cluster_b"
103+
//
104+
// When using the above routes, the following prefixes would be sent to:
105+
//
106+
// * 'get abc:users' would retrive the key 'abc:users' from cluster_b.
107+
// * 'get ab:users' would retrive the key 'ab:users' from cluster_a.
108+
// * 'get z:users' would return a NoUpstreamHost error. A :ref:`catch-all
109+
// cluster<envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.PrefixRoutes.catch_all_cluster>`
110+
// would have retrieved the key from that cluster instead.
111+
//
112+
// See the :ref:`configuration section
113+
// <arch_overview_redis_configuration>` of the architecture overview for recommendations on
114+
// configuring the backing clusters.
115+
PrefixRoutes prefix_routes = 5 [(gogoproto.nullable) = false];
57116
}

docs/root/intro/arch_overview/redis.rst

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ In this mode, the goals of Envoy are to maintain availability and partition tole
88
over consistency. This is the key point when comparing Envoy to `Redis Cluster
99
<https://redis.io/topics/cluster-spec>`_. Envoy is designed as a best-effort cache,
1010
meaning that it will not try to reconcile inconsistent data or keep a globally consistent
11-
view of cluster membership.
11+
view of cluster membership. It also supports routing commands from different workload to
12+
different to different upstream clusters based on their access patterns, eviction, or isolation
13+
requirements.
1214

1315
The Redis project offers a thorough reference on partitioning as it relates to Redis. See
1416
"`Partitioning: how to split data among multiple Redis instances
@@ -22,6 +24,7 @@ The Redis project offers a thorough reference on partitioning as it relates to R
2224
* Detailed command statistics.
2325
* Active and passive healthchecking.
2426
* Hash tagging.
27+
* Prefix routing.
2528

2629
**Planned future enhancements**:
2730

docs/root/intro/version_history.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ Version history
5151
* ratelimit: removed deprecated rate limit configuration from bootstrap.
5252
* redis: added :ref:`hashtagging <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.ConnPoolSettings.enable_hashtagging>` to guarantee a given key's upstream.
5353
* redis: added :ref:`latency stats <config_network_filters_redis_proxy_per_command_stats>` for commands.
54+
* redis: added :ref:`prefix routing <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.prefix_routes>` to enable routing commands based on their key's prefix to different upstream.
5455
* redis: added :ref:`success and error stats <config_network_filters_redis_proxy_per_command_stats>` for commands.
5556
* redis: migrate hash function for host selection to `MurmurHash2 <https://sites.google.com/site/murmurhash>`_ from std::hash. MurmurHash2 is compatible with std::hash in GNU libstdc++ 3.4.20 or above. This is typically the case when compiled on Linux and not macOS.
5657
* redis: added :ref:`latency_in_micros <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.latency_in_micros>` to specify the redis commands stats time unit in microseconds.

source/common/common/utility.h

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -568,8 +568,11 @@ template <class Value> struct TrieLookupTable {
568568
* Adds an entry to the Trie at the given Key.
569569
* @param key the key used to add the entry.
570570
* @param value the value to be associated with the key.
571+
* @param overwrite_existing will overwrite the value when the value for a given key already
572+
* exists.
573+
* @return false when a value already exists for the given key.
571574
*/
572-
void add(const char* key, Value value) {
575+
bool add(const char* key, Value value, bool overwrite_existing = true) {
573576
TrieEntry<Value>* current = &root_;
574577
while (uint8_t c = *key) {
575578
if (!current->entries_[c]) {
@@ -578,7 +581,11 @@ template <class Value> struct TrieLookupTable {
578581
current = current->entries_[c].get();
579582
key++;
580583
}
584+
if (current->value_ && !overwrite_existing) {
585+
return false;
586+
}
581587
current->value_ = value;
588+
return true;
582589
}
583590

584591
/**
@@ -599,6 +606,31 @@ template <class Value> struct TrieLookupTable {
599606
return current->value_;
600607
}
601608

609+
/**
610+
* Finds the entry associated with the longest prefix. Complexity is O(min(longest key prefix, key
611+
* length))
612+
* @param key the key used to find.
613+
* @return the value matching the longest prefix based on the key.
614+
*/
615+
Value findLongestPrefix(const char* key) const {
616+
const TrieEntry<Value>* current = &root_;
617+
const TrieEntry<Value>* result = nullptr;
618+
while (uint8_t c = *key) {
619+
if (current->value_) {
620+
result = current;
621+
}
622+
623+
// https://github.com/facebook/mcrouter/blob/master/mcrouter/lib/fbi/cpp/Trie-inl.h#L126-L143
624+
current = current->entries_[c].get();
625+
if (current == nullptr) {
626+
return result ? result->value_ : nullptr;
627+
}
628+
629+
key++;
630+
}
631+
return current ? current->value_ : result->value_;
632+
}
633+
602634
TrieEntry<Value> root_;
603635
};
604636

source/extensions/filters/network/redis_proxy/BUILD

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,22 @@ envoy_cc_library(
3030
],
3131
)
3232

33+
envoy_cc_library(
34+
name = "router_interface",
35+
hdrs = ["router.h"],
36+
deps = [
37+
":conn_pool_interface",
38+
"@envoy_api//envoy/config/filter/network/redis_proxy/v2:redis_proxy_cc",
39+
],
40+
)
41+
3342
envoy_cc_library(
3443
name = "command_splitter_lib",
3544
srcs = ["command_splitter_impl.cc"],
3645
hdrs = ["command_splitter_impl.h"],
3746
deps = [
3847
":command_splitter_interface",
39-
":conn_pool_interface",
48+
":router_interface",
4049
"//include/envoy/stats:stats_macros",
4150
"//include/envoy/stats:timespan",
4251
"//source/common/common:assert_lib",
@@ -54,7 +63,6 @@ envoy_cc_library(
5463
hdrs = ["conn_pool_impl.h"],
5564
deps = [
5665
":conn_pool_interface",
57-
"//include/envoy/router:router_interface",
5866
"//include/envoy/thread_local:thread_local_interface",
5967
"//include/envoy/upstream:cluster_manager_interface",
6068
"//source/common/buffer:buffer_lib",
@@ -73,6 +81,7 @@ envoy_cc_library(
7381
hdrs = ["proxy_filter.h"],
7482
deps = [
7583
":command_splitter_interface",
84+
":router_interface",
7685
"//include/envoy/network:drain_decision_interface",
7786
"//include/envoy/network:filter_interface",
7887
"//include/envoy/upstream:cluster_manager_interface",
@@ -95,7 +104,21 @@ envoy_cc_library(
95104
"//source/extensions/filters/network/common:factory_base_lib",
96105
"//source/extensions/filters/network/common/redis:codec_lib",
97106
"//source/extensions/filters/network/redis_proxy:command_splitter_lib",
98-
"//source/extensions/filters/network/redis_proxy:conn_pool_lib",
99107
"//source/extensions/filters/network/redis_proxy:proxy_filter_lib",
108+
"//source/extensions/filters/network/redis_proxy:router_lib",
109+
],
110+
)
111+
112+
envoy_cc_library(
113+
name = "router_lib",
114+
srcs = ["router_impl.cc"],
115+
hdrs = ["router_impl.h"],
116+
deps = [
117+
":router_interface",
118+
"//include/envoy/thread_local:thread_local_interface",
119+
"//include/envoy/upstream:cluster_manager_interface",
120+
"//source/common/common:to_lower_table_lib",
121+
"//source/extensions/filters/network/redis_proxy:conn_pool_lib",
122+
"@envoy_api//envoy/config/filter/network/redis_proxy/v2:redis_proxy_cc",
100123
],
101124
)

source/extensions/filters/network/redis_proxy/command_splitter_impl.cc

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,15 @@ void SingleServerRequest::cancel() {
5959
handle_ = nullptr;
6060
}
6161

62-
SplitRequestPtr SimpleRequest::create(ConnPool::Instance& conn_pool,
62+
SplitRequestPtr SimpleRequest::create(Router& router,
6363
const Common::Redis::RespValue& incoming_request,
6464
SplitCallbacks& callbacks, CommandStats& command_stats,
6565
TimeSource& time_source, bool latency_in_micros) {
6666
std::unique_ptr<SimpleRequest> request_ptr{
6767
new SimpleRequest(callbacks, command_stats, time_source, latency_in_micros)};
6868

69-
request_ptr->handle_ = conn_pool.makeRequest(incoming_request.asArray()[1].asString(),
70-
incoming_request, *request_ptr);
69+
request_ptr->handle_ =
70+
router.makeRequest(incoming_request.asArray()[1].asString(), incoming_request, *request_ptr);
7171
if (!request_ptr->handle_) {
7272
request_ptr->callbacks_.onResponse(Utility::makeError(Response::get().NoUpstreamHost));
7373
return nullptr;
@@ -76,7 +76,7 @@ SplitRequestPtr SimpleRequest::create(ConnPool::Instance& conn_pool,
7676
return std::move(request_ptr);
7777
}
7878

79-
SplitRequestPtr EvalRequest::create(ConnPool::Instance& conn_pool,
79+
SplitRequestPtr EvalRequest::create(Router& router,
8080
const Common::Redis::RespValue& incoming_request,
8181
SplitCallbacks& callbacks, CommandStats& command_stats,
8282
TimeSource& time_source, bool latency_in_micros) {
@@ -91,8 +91,8 @@ SplitRequestPtr EvalRequest::create(ConnPool::Instance& conn_pool,
9191

9292
std::unique_ptr<EvalRequest> request_ptr{
9393
new EvalRequest(callbacks, command_stats, time_source, latency_in_micros)};
94-
request_ptr->handle_ = conn_pool.makeRequest(incoming_request.asArray()[3].asString(),
95-
incoming_request, *request_ptr);
94+
request_ptr->handle_ =
95+
router.makeRequest(incoming_request.asArray()[3].asString(), incoming_request, *request_ptr);
9696
if (!request_ptr->handle_) {
9797
command_stats.error_.inc();
9898
request_ptr->callbacks_.onResponse(Utility::makeError(Response::get().NoUpstreamHost));
@@ -123,7 +123,7 @@ void FragmentedRequest::onChildFailure(uint32_t index) {
123123
onChildResponse(Utility::makeError(Response::get().UpstreamFailure), index);
124124
}
125125

126-
SplitRequestPtr MGETRequest::create(ConnPool::Instance& conn_pool,
126+
SplitRequestPtr MGETRequest::create(Router& router,
127127
const Common::Redis::RespValue& incoming_request,
128128
SplitCallbacks& callbacks, CommandStats& command_stats,
129129
TimeSource& time_source, bool latency_in_micros) {
@@ -152,8 +152,8 @@ SplitRequestPtr MGETRequest::create(ConnPool::Instance& conn_pool,
152152

153153
single_mget.asArray()[1].asString() = incoming_request.asArray()[i].asString();
154154
ENVOY_LOG(debug, "redis: parallel get: '{}'", single_mget.toString());
155-
pending_request.handle_ = conn_pool.makeRequest(incoming_request.asArray()[i].asString(),
156-
single_mget, pending_request);
155+
pending_request.handle_ =
156+
router.makeRequest(incoming_request.asArray()[i].asString(), single_mget, pending_request);
157157
if (!pending_request.handle_) {
158158
pending_request.onResponse(Utility::makeError(Response::get().NoUpstreamHost));
159159
}
@@ -195,7 +195,7 @@ void MGETRequest::onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t
195195
}
196196
}
197197

198-
SplitRequestPtr MSETRequest::create(ConnPool::Instance& conn_pool,
198+
SplitRequestPtr MSETRequest::create(Router& router,
199199
const Common::Redis::RespValue& incoming_request,
200200
SplitCallbacks& callbacks, CommandStats& command_stats,
201201
TimeSource& time_source, bool latency_in_micros) {
@@ -231,8 +231,8 @@ SplitRequestPtr MSETRequest::create(ConnPool::Instance& conn_pool,
231231
single_mset.asArray()[2].asString() = incoming_request.asArray()[i + 1].asString();
232232

233233
ENVOY_LOG(debug, "redis: parallel set: '{}'", single_mset.toString());
234-
pending_request.handle_ = conn_pool.makeRequest(incoming_request.asArray()[i].asString(),
235-
single_mset, pending_request);
234+
pending_request.handle_ =
235+
router.makeRequest(incoming_request.asArray()[i].asString(), single_mset, pending_request);
236236
if (!pending_request.handle_) {
237237
pending_request.onResponse(Utility::makeError(Response::get().NoUpstreamHost));
238238
}
@@ -270,7 +270,7 @@ void MSETRequest::onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t
270270
}
271271
}
272272

273-
SplitRequestPtr SplitKeysSumResultRequest::create(ConnPool::Instance& conn_pool,
273+
SplitRequestPtr SplitKeysSumResultRequest::create(Router& router,
274274
const Common::Redis::RespValue& incoming_request,
275275
SplitCallbacks& callbacks,
276276
CommandStats& command_stats,
@@ -299,8 +299,8 @@ SplitRequestPtr SplitKeysSumResultRequest::create(ConnPool::Instance& conn_pool,
299299
single_fragment.asArray()[1].asString() = incoming_request.asArray()[i].asString();
300300
ENVOY_LOG(debug, "redis: parallel {}: '{}'", incoming_request.asArray()[0].asString(),
301301
single_fragment.toString());
302-
pending_request.handle_ = conn_pool.makeRequest(incoming_request.asArray()[i].asString(),
303-
single_fragment, pending_request);
302+
pending_request.handle_ = router.makeRequest(incoming_request.asArray()[i].asString(),
303+
single_fragment, pending_request);
304304
if (!pending_request.handle_) {
305305
pending_request.onResponse(Utility::makeError(Response::get().NoUpstreamHost));
306306
}
@@ -337,12 +337,11 @@ void SplitKeysSumResultRequest::onChildResponse(Common::Redis::RespValuePtr&& va
337337
}
338338
}
339339

340-
InstanceImpl::InstanceImpl(ConnPool::InstancePtr&& conn_pool, Stats::Scope& scope,
341-
const std::string& stat_prefix, TimeSource& time_source,
342-
bool latency_in_micros)
343-
: conn_pool_(std::move(conn_pool)), simple_command_handler_(*conn_pool_),
344-
eval_command_handler_(*conn_pool_), mget_handler_(*conn_pool_), mset_handler_(*conn_pool_),
345-
split_keys_sum_result_handler_(*conn_pool_),
340+
InstanceImpl::InstanceImpl(RouterPtr&& router, Stats::Scope& scope, const std::string& stat_prefix,
341+
TimeSource& time_source, bool latency_in_micros)
342+
: router_(std::move(router)), simple_command_handler_(*router_),
343+
eval_command_handler_(*router_), mget_handler_(*router_), mset_handler_(*router_),
344+
split_keys_sum_result_handler_(*router_),
346345
stats_{ALL_COMMAND_SPLITTER_STATS(POOL_COUNTER_PREFIX(scope, stat_prefix + "splitter."))},
347346
latency_in_micros_(latency_in_micros), time_source_(time_source) {
348347
for (const std::string& command : Common::Redis::SupportedCommands::simpleCommands()) {

0 commit comments

Comments
 (0)