Skip to content

Commit a085974

Browse files
dmitri-dhtuch
authored andcommitted
Fixes #2943 - implements shared grpc connection per multiple EDS subscriptions. (#5026)
Fixes #2943 by implementing shared grpc connection per multiple EDS subscriptions. Signed-off-by: Dmitri Dolguikh <[email protected]>
1 parent 9ba0a43 commit a085974

18 files changed

+401
-24
lines changed

source/common/config/BUILD

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,19 @@ envoy_cc_library(
169169
envoy_cc_library(
170170
name = "grpc_subscription_lib",
171171
hdrs = ["grpc_subscription_impl.h"],
172+
deps = [
173+
":grpc_managed_mux_subscription_lib",
174+
":grpc_mux_lib",
175+
"//include/envoy/config:subscription_interface",
176+
"//include/envoy/event:dispatcher_interface",
177+
"//include/envoy/grpc:async_client_interface",
178+
"@envoy_api//envoy/api/v2/core:base_cc",
179+
],
180+
)
181+
182+
envoy_cc_library(
183+
name = "grpc_managed_mux_subscription_lib",
184+
hdrs = ["grpc_managed_mux_subscription_impl.h"],
172185
deps = [
173186
":grpc_mux_lib",
174187
":grpc_mux_subscription_lib",
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#pragma once
2+
3+
#include "envoy/api/v2/core/base.pb.h"
4+
#include "envoy/config/subscription.h"
5+
#include "envoy/event/dispatcher.h"
6+
#include "envoy/grpc/async_client.h"
7+
8+
#include "common/config/grpc_mux_impl.h"
9+
#include "common/config/grpc_mux_subscription_impl.h"
10+
#include "common/config/utility.h"
11+
12+
namespace Envoy {
13+
namespace Config {
14+
15+
template <class ResourceType>
16+
class GrpcManagedMuxSubscriptionImpl : public Config::Subscription<ResourceType> {
17+
public:
18+
GrpcManagedMuxSubscriptionImpl(GrpcMux& grpc_mux, SubscriptionStats stats)
19+
: grpc_mux_(grpc_mux), grpc_mux_subscription_(grpc_mux_, stats) {}
20+
21+
// Config::Subscription
22+
void start(const std::vector<std::string>& resources,
23+
Config::SubscriptionCallbacks<ResourceType>& callbacks) override {
24+
// Subscribe first, so we get failure callbacks if grpc_mux_.start() fails.
25+
grpc_mux_subscription_.start(resources, callbacks);
26+
grpc_mux_.start();
27+
}
28+
29+
void updateResources(const std::vector<std::string>& resources) override {
30+
grpc_mux_subscription_.updateResources(resources);
31+
}
32+
33+
GrpcMux& grpcMux() { return grpc_mux_; }
34+
35+
private:
36+
GrpcMux& grpc_mux_;
37+
GrpcMuxSubscriptionImpl<ResourceType> grpc_mux_subscription_;
38+
};
39+
40+
} // namespace Config
41+
} // namespace Envoy

source/common/config/grpc_subscription_impl.h

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
#include "envoy/event/dispatcher.h"
66
#include "envoy/grpc/async_client.h"
77

8+
#include "common/config/grpc_managed_mux_subscription_impl.h"
89
#include "common/config/grpc_mux_impl.h"
9-
#include "common/config/grpc_mux_subscription_impl.h"
1010
#include "common/config/utility.h"
1111

1212
namespace Envoy {
@@ -21,25 +21,23 @@ class GrpcSubscriptionImpl : public Config::Subscription<ResourceType> {
2121
Stats::Scope& scope, const RateLimitSettings& rate_limit_settings)
2222
: grpc_mux_(local_info, std::move(async_client), dispatcher, service_method, random, scope,
2323
rate_limit_settings),
24-
grpc_mux_subscription_(grpc_mux_, stats) {}
24+
grpc_managed_mux_subscription_(grpc_mux_, stats) {}
2525

2626
// Config::Subscription
2727
void start(const std::vector<std::string>& resources,
2828
Config::SubscriptionCallbacks<ResourceType>& callbacks) override {
29-
// Subscribe first, so we get failure callbacks if grpc_mux_.start() fails.
30-
grpc_mux_subscription_.start(resources, callbacks);
31-
grpc_mux_.start();
29+
grpc_managed_mux_subscription_.start(resources, callbacks);
3230
}
3331

3432
void updateResources(const std::vector<std::string>& resources) override {
35-
grpc_mux_subscription_.updateResources(resources);
33+
grpc_managed_mux_subscription_.updateResources(resources);
3634
}
3735

3836
GrpcMuxImpl& grpcMux() { return grpc_mux_; }
3937

4038
private:
4139
GrpcMuxImpl grpc_mux_;
42-
GrpcMuxSubscriptionImpl<ResourceType> grpc_mux_subscription_;
40+
GrpcManagedMuxSubscriptionImpl<ResourceType> grpc_managed_mux_subscription_;
4341
};
4442

4543
} // namespace Config

source/common/upstream/BUILD

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,21 @@ envoy_cc_library(
2828
],
2929
)
3030

31+
envoy_cc_library(
32+
name = "eds_subscription_factory_lib",
33+
srcs = ["eds_subscription_factory.cc"],
34+
hdrs = ["eds_subscription_factory.h"],
35+
deps = [
36+
"//include/envoy/config:subscription_interface",
37+
"//include/envoy/event:dispatcher_interface",
38+
"//include/envoy/local_info:local_info_interface",
39+
"//source/common/config:grpc_managed_mux_subscription_lib",
40+
"//source/common/config:grpc_mux_lib",
41+
"//source/common/config:subscription_factory_lib",
42+
"//source/common/config:utility_lib",
43+
],
44+
)
45+
3146
envoy_cc_library(
3247
name = "cds_subscription_lib",
3348
srcs = ["cds_subscription.cc"],
@@ -83,6 +98,7 @@ envoy_cc_library(
8398
"//source/common/protobuf:utility_lib",
8499
"//source/common/router:shadow_writer_lib",
85100
"//source/common/tcp:conn_pool_lib",
101+
"//source/common/upstream:eds_subscription_factory_lib",
86102
"//source/common/upstream:upstream_lib",
87103
"@envoy_api//envoy/admin/v2alpha:config_dump_cc",
88104
"@envoy_api//envoy/api/v2/core:base_cc",
@@ -296,6 +312,7 @@ envoy_cc_library(
296312
"//source/common/network:resolver_lib",
297313
"//source/common/network:utility_lib",
298314
"//source/common/protobuf:utility_lib",
315+
"//source/common/upstream:eds_subscription_factory_lib",
299316
"@envoy_api//envoy/api/v2:eds_cc",
300317
"@envoy_api//envoy/api/v2/core:base_cc",
301318
"@envoy_api//envoy/api/v2/endpoint:endpoint_cc",
@@ -401,6 +418,7 @@ envoy_cc_library(
401418
"//source/common/config:well_known_names",
402419
"//source/common/stats:isolated_store_lib",
403420
"//source/common/stats:stats_lib",
421+
"//source/common/upstream:eds_subscription_factory_lib",
404422
"//source/server:init_manager_lib",
405423
"//source/server:transport_socket_config_lib",
406424
"@envoy_api//envoy/api/v2/core:base_cc",

source/common/upstream/cluster_manager_impl.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1210,7 +1210,8 @@ ClusterSharedPtr ProdClusterManagerFactory::clusterFromProto(
12101210
bool added_via_api) {
12111211
return ClusterImplBase::create(cluster, cm, stats_, tls_, dns_resolver_, ssl_context_manager_,
12121212
runtime_, random_, main_thread_dispatcher_, log_manager,
1213-
local_info_, outlier_event_logger, added_via_api);
1213+
local_info_, outlier_event_logger, added_via_api,
1214+
eds_subscription_factory_);
12141215
}
12151216

12161217
CdsApiPtr ProdClusterManagerFactory::createCds(

source/common/upstream/cluster_manager_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
#include "common/config/grpc_mux_impl.h"
2424
#include "common/http/async_client_impl.h"
25+
#include "common/upstream/eds_subscription_factory.h"
2526
#include "common/upstream/load_stats_reporter.h"
2627
#include "common/upstream/upstream_impl.h"
2728

@@ -82,6 +83,7 @@ class ProdClusterManagerFactory : public ClusterManagerFactory {
8283
Ssl::ContextManager& ssl_context_manager_;
8384
const LocalInfo::LocalInfo& local_info_;
8485
Secret::SecretManager& secret_manager_;
86+
Upstream::EdsSubscriptionFactory eds_subscription_factory_;
8587
};
8688

8789
/**

source/common/upstream/eds.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,22 @@ namespace Upstream {
2222
EdsClusterImpl::EdsClusterImpl(
2323
const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime,
2424
Server::Configuration::TransportSocketFactoryContext& factory_context,
25-
Stats::ScopePtr&& stats_scope, bool added_via_api)
25+
Stats::ScopePtr&& stats_scope, bool added_via_api,
26+
EdsSubscriptionFactory& eds_subscription_factory)
2627
: BaseDynamicClusterImpl(cluster, runtime, factory_context, std::move(stats_scope),
2728
added_via_api),
2829
cm_(factory_context.clusterManager()), local_info_(factory_context.localInfo()),
2930
cluster_name_(cluster.eds_cluster_config().service_name().empty()
3031
? cluster.name()
31-
: cluster.eds_cluster_config().service_name()) {
32+
: cluster.eds_cluster_config().service_name()),
33+
eds_subscription_factory_(eds_subscription_factory) {
3234
Config::Utility::checkLocalInfo("eds", local_info_);
3335

3436
const auto& eds_config = cluster.eds_cluster_config().eds_config();
3537
Event::Dispatcher& dispatcher = factory_context.dispatcher();
3638
Runtime::RandomGenerator& random = factory_context.random();
3739
Upstream::ClusterManager& cm = factory_context.clusterManager();
38-
subscription_ = Config::SubscriptionFactory::subscriptionFromConfigSource<
39-
envoy::api::v2::ClusterLoadAssignment>(
40+
subscription_ = eds_subscription_factory_.subscriptionFromConfigSource(
4041
eds_config, local_info_, dispatcher, cm, random, info_->statsScope(),
4142
[this, &eds_config, &cm, &dispatcher,
4243
&random]() -> Config::Subscription<envoy::api::v2::ClusterLoadAssignment>* {

source/common/upstream/eds.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "envoy/stats/scope.h"
99
#include "envoy/upstream/locality.h"
1010

11+
#include "common/upstream/eds_subscription_factory.h"
1112
#include "common/upstream/upstream_impl.h"
1213

1314
namespace Envoy {
@@ -21,7 +22,8 @@ class EdsClusterImpl : public BaseDynamicClusterImpl,
2122
public:
2223
EdsClusterImpl(const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime,
2324
Server::Configuration::TransportSocketFactoryContext& factory_context,
24-
Stats::ScopePtr&& stats_scope, bool added_via_api);
25+
Stats::ScopePtr&& stats_scope, bool added_via_api,
26+
EdsSubscriptionFactory& eds_subscription_factory);
2527

2628
// Upstream::Cluster
2729
InitializePhase initializePhase() const override { return InitializePhase::Secondary; }
@@ -51,6 +53,7 @@ class EdsClusterImpl : public BaseDynamicClusterImpl,
5153
const std::string cluster_name_;
5254
std::vector<LocalityWeightsMap> locality_weights_map_;
5355
HostMap all_hosts_;
56+
EdsSubscriptionFactory& eds_subscription_factory_;
5457
};
5558

5659
} // namespace Upstream
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
#include "common/upstream/eds_subscription_factory.h"
2+
3+
namespace Envoy {
4+
namespace Upstream {
5+
Config::GrpcMux& EdsSubscriptionFactory::getOrCreateMux(
6+
const LocalInfo::LocalInfo& local_info, Grpc::AsyncClientPtr async_client,
7+
Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method,
8+
Runtime::RandomGenerator& random, const ::envoy::api::v2::core::ApiConfigSource& config_source,
9+
Stats::Scope& scope, const Config::RateLimitSettings& rate_limit_settings) {
10+
const uint64_t mux_key = MessageUtil::hash(config_source.grpc_services(0));
11+
if (muxes_.find(mux_key) == muxes_.end()) {
12+
muxes_.emplace(std::make_pair(
13+
mux_key,
14+
std::make_unique<Config::GrpcMuxImpl>(local_info, std::move(async_client), dispatcher,
15+
service_method, random, scope, rate_limit_settings)));
16+
}
17+
return *(muxes_.at(mux_key));
18+
}
19+
20+
std::unique_ptr<Config::Subscription<envoy::api::v2::ClusterLoadAssignment>>
21+
EdsSubscriptionFactory::subscriptionFromConfigSource(
22+
const envoy::api::v2::core::ConfigSource& config, const LocalInfo::LocalInfo& local_info,
23+
Event::Dispatcher& dispatcher, Upstream::ClusterManager& cm, Runtime::RandomGenerator& random,
24+
Stats::Scope& scope,
25+
std::function<Config::Subscription<envoy::api::v2::ClusterLoadAssignment>*()>
26+
rest_legacy_constructor,
27+
const std::string& rest_method, const std::string& grpc_method) {
28+
if (config.config_source_specifier_case() ==
29+
envoy::api::v2::core::ConfigSource::kApiConfigSource &&
30+
config.api_config_source().api_type() == envoy::api::v2::core::ApiConfigSource::GRPC) {
31+
const envoy::api::v2::core::ApiConfigSource& api_config_source = config.api_config_source();
32+
33+
Config::GrpcMux& mux_to_use = getOrCreateMux(
34+
local_info,
35+
Config::Utility::factoryForGrpcApiConfigSource(cm.grpcAsyncClientManager(),
36+
api_config_source, scope)
37+
->create(),
38+
dispatcher, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(grpc_method),
39+
random, api_config_source, scope,
40+
Config::Utility::parseRateLimitSettings(api_config_source));
41+
42+
Config::SubscriptionStats stats = Config::Utility::generateStats(scope);
43+
return std::make_unique<
44+
Config::GrpcManagedMuxSubscriptionImpl<envoy::api::v2::ClusterLoadAssignment>>(mux_to_use,
45+
stats);
46+
}
47+
48+
return Config::SubscriptionFactory::subscriptionFromConfigSource<
49+
envoy::api::v2::ClusterLoadAssignment>(config, local_info, dispatcher, cm, random, scope,
50+
std::move(rest_legacy_constructor), rest_method,
51+
grpc_method);
52+
}
53+
} // namespace Upstream
54+
} // namespace Envoy
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#pragma once
2+
3+
#include "envoy/api/v2/core/base.pb.h"
4+
#include "envoy/config/subscription.h"
5+
#include "envoy/stats/scope.h"
6+
7+
#include "common/config/grpc_managed_mux_subscription_impl.h"
8+
#include "common/config/grpc_mux_impl.h"
9+
#include "common/config/subscription_factory.h"
10+
#include "common/config/utility.h"
11+
#include "common/protobuf/protobuf.h"
12+
13+
/**
14+
* EdsSubscriptionFactory is used for instantiation of EDS subscriptions so as to minimize the
15+
* number of open grpc connections used by thses subscriptions. This is done by sharing a grpc
16+
* multiplexer between subscriptions handled by the same config server. Please see
17+
* https://github.com/envoyproxy/envoy/issues/2943 for additional information and related issues.
18+
*
19+
* TODO (dmitri-d): This implementation should be generalized to cover RDS.
20+
*/
21+
22+
namespace Envoy {
23+
namespace Upstream {
24+
class EdsSubscriptionFactory {
25+
public:
26+
std::unique_ptr<Config::Subscription<envoy::api::v2::ClusterLoadAssignment>>
27+
subscriptionFromConfigSource(
28+
const envoy::api::v2::core::ConfigSource& config, const LocalInfo::LocalInfo& local_info,
29+
Event::Dispatcher& dispatcher, Upstream::ClusterManager& cm, Runtime::RandomGenerator& random,
30+
Stats::Scope& scope,
31+
std::function<Config::Subscription<envoy::api::v2::ClusterLoadAssignment>*()>
32+
rest_legacy_constructor,
33+
const std::string& rest_method, const std::string& grpc_method);
34+
35+
protected:
36+
Config::GrpcMux& getOrCreateMux(const LocalInfo::LocalInfo& local_info,
37+
Grpc::AsyncClientPtr async_client, Event::Dispatcher& dispatcher,
38+
const Protobuf::MethodDescriptor& service_method,
39+
Runtime::RandomGenerator& random,
40+
const ::envoy::api::v2::core::ApiConfigSource& config_source,
41+
Stats::Scope& scope,
42+
const Config::RateLimitSettings& rate_limit_settings);
43+
44+
private:
45+
std::unordered_map<uint64_t, Config::GrpcMuxPtr> muxes_;
46+
};
47+
} // namespace Upstream
48+
} // namespace Envoy

0 commit comments

Comments
 (0)