|
16 | 16 |
|
17 | 17 | #include "envoy/registry/registry.h" |
18 | 18 | #include "envoy/server/bootstrap_extension_config.h" |
| 19 | +#include "envoy/server/factory_context.h" |
| 20 | +#include "envoy/singleton/manager.h" |
| 21 | +#include "envoy/thread_local/thread_local.h" |
| 22 | +#include "source/common/common/non_copyable.h" |
| 23 | +#include "source/common/config/subscription_base.h" |
| 24 | +#include "source/common/grpc/common.h" |
| 25 | +#include "source/common/init/target_impl.h" |
| 26 | +#include "source/extensions/common/workload_discovery/discovery.pb.h" |
| 27 | +#include "source/extensions/common/workload_discovery/discovery.pb.validate.h" |
| 28 | +#include "source/extensions/common/workload_discovery/extension.pb.h" |
| 29 | +#include "source/extensions/common/workload_discovery/extension.pb.validate.h" |
19 | 30 |
|
20 | 31 | namespace Envoy::Extensions::Common::WorkloadDiscovery { |
21 | 32 |
|
| 33 | +namespace { |
| 34 | +Istio::Common::WorkloadMetadataObject convert(const istio::workload::Workload& workload) { |
| 35 | + auto workload_type = Istio::Common::WorkloadType::Deployment; |
| 36 | + switch (workload.workload_type()) { |
| 37 | + case istio::workload::WorkloadType::CRONJOB: |
| 38 | + workload_type = Istio::Common::WorkloadType::CronJob; |
| 39 | + break; |
| 40 | + case istio::workload::WorkloadType::JOB: |
| 41 | + workload_type = Istio::Common::WorkloadType::Job; |
| 42 | + break; |
| 43 | + case istio::workload::WorkloadType::POD: |
| 44 | + workload_type = Istio::Common::WorkloadType::Pod; |
| 45 | + break; |
| 46 | + default: |
| 47 | + break; |
| 48 | + } |
| 49 | + return Istio::Common::WorkloadMetadataObject( |
| 50 | + workload.name(), /* cluster_name */ "", workload.namespace_(), workload.workload_name(), |
| 51 | + workload.canonical_name(), workload.canonical_revision(), /* app_name */ "", |
| 52 | + /* app_version */ "", workload_type); |
| 53 | +} |
| 54 | +} // namespace |
| 55 | + |
| 56 | +class WorkloadMetadataProviderImpl : public WorkloadMetadataProvider, public Singleton::Instance { |
| 57 | +public: |
| 58 | + WorkloadMetadataProviderImpl(const envoy::config::core::v3::ConfigSource& config_source, |
| 59 | + Server::Configuration::ServerFactoryContext& factory_context) |
| 60 | + : config_source_(config_source), factory_context_(factory_context), |
| 61 | + tls_(factory_context.threadLocal()), |
| 62 | + scope_(factory_context.scope().createScope("workload_discovery")), |
| 63 | + stats_(generateStats(*scope_)), subscription_(*this) { |
| 64 | + tls_.set([](Event::Dispatcher&) { return std::make_shared<ThreadLocalProvider>(); }); |
| 65 | + // This is safe because the ADS mux is started in the cluster manager constructor prior to this |
| 66 | + // call. |
| 67 | + subscription_.start(); |
| 68 | + } |
| 69 | + |
| 70 | + std::optional<Istio::Common::WorkloadMetadataObject> |
| 71 | + GetMetadata(const Network::Address::InstanceConstSharedPtr& address) override { |
| 72 | + if (address && address->ip()) { |
| 73 | + if (const auto ipv4 = address->ip()->ipv4(); ipv4) { |
| 74 | + uint32_t value = ipv4->address(); |
| 75 | + std::array<uint8_t, 4> output; |
| 76 | + absl::little_endian::Store32(&output, value); |
| 77 | + return tls_->get(std::string(output.begin(), output.end())); |
| 78 | + } else if (const auto ipv6 = address->ip()->ipv6(); ipv6) { |
| 79 | + const uint64_t high = absl::Uint128High64(ipv6->address()); |
| 80 | + const uint64_t low = absl::Uint128Low64(ipv6->address()); |
| 81 | + std::array<uint8_t, 16> output; |
| 82 | + absl::little_endian::Store64(&output, high); |
| 83 | + absl::little_endian::Store64(&output[8], low); |
| 84 | + return tls_->get(std::string(output.begin(), output.end())); |
| 85 | + } |
| 86 | + } |
| 87 | + return {}; |
| 88 | + } |
| 89 | + |
| 90 | +private: |
| 91 | + using AddressIndex = absl::flat_hash_map<std::string, Istio::Common::WorkloadMetadataObject>; |
| 92 | + using AddressIndexSharedPtr = std::shared_ptr<AddressIndex>; |
| 93 | + using AddressVector = std::vector<std::string>; |
| 94 | + using AddressVectorSharedPtr = std::shared_ptr<AddressVector>; |
| 95 | + |
| 96 | + struct ThreadLocalProvider : public ThreadLocal::ThreadLocalObject { |
| 97 | + void reset(const AddressIndexSharedPtr& index) { address_index_ = *index; } |
| 98 | + void update(const AddressIndexSharedPtr& added, const AddressVectorSharedPtr& removed) { |
| 99 | + for (const auto& [address, workload] : *added) { |
| 100 | + address_index_.emplace(address, workload); |
| 101 | + } |
| 102 | + for (const auto& address : *removed) { |
| 103 | + address_index_.erase(address); |
| 104 | + } |
| 105 | + } |
| 106 | + size_t total() const { return address_index_.size(); } |
| 107 | + // Returns by-value since the flat map does not provide pointer stability. |
| 108 | + std::optional<Istio::Common::WorkloadMetadataObject> get(const std::string& address) { |
| 109 | + const auto it = address_index_.find(address); |
| 110 | + if (it != address_index_.end()) { |
| 111 | + return it->second; |
| 112 | + } |
| 113 | + return {}; |
| 114 | + } |
| 115 | + AddressIndex address_index_; |
| 116 | + }; |
| 117 | + class WorkloadSubscription : Config::SubscriptionBase<istio::workload::Workload> { |
| 118 | + public: |
| 119 | + WorkloadSubscription(WorkloadMetadataProviderImpl& parent) |
| 120 | + : Config::SubscriptionBase<istio::workload::Workload>( |
| 121 | + parent.factory_context_.messageValidationVisitor(), "address"), |
| 122 | + parent_(parent) { |
| 123 | + subscription_ = parent.factory_context_.clusterManager() |
| 124 | + .subscriptionFactory() |
| 125 | + .subscriptionFromConfigSource( |
| 126 | + parent.config_source_, Grpc::Common::typeUrl(getResourceName()), |
| 127 | + *parent.scope_, *this, resource_decoder_, {}); |
| 128 | + } |
| 129 | + void start() { subscription_->start({}); } |
| 130 | + |
| 131 | + private: |
| 132 | + // Config::SubscriptionCallbacks |
| 133 | + void onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources, |
| 134 | + const std::string&) override { |
| 135 | + AddressIndexSharedPtr index = std::make_shared<AddressIndex>(); |
| 136 | + for (const auto& resource : resources) { |
| 137 | + const auto& workload = |
| 138 | + dynamic_cast<const istio::workload::Workload&>(resource.get().resource()); |
| 139 | + index->emplace(workload.address(), convert(workload)); |
| 140 | + } |
| 141 | + parent_.reset(index); |
| 142 | + } |
| 143 | + void onConfigUpdate(const std::vector<Config::DecodedResourceRef>& added_resources, |
| 144 | + const Protobuf::RepeatedPtrField<std::string>& removed_resources, |
| 145 | + const std::string&) override { |
| 146 | + AddressIndexSharedPtr added = std::make_shared<AddressIndex>(); |
| 147 | + for (const auto& resource : added_resources) { |
| 148 | + const auto& workload = |
| 149 | + dynamic_cast<const istio::workload::Workload&>(resource.get().resource()); |
| 150 | + added->emplace(workload.address(), convert(workload)); |
| 151 | + } |
| 152 | + AddressVectorSharedPtr removed = std::make_shared<AddressVector>(); |
| 153 | + removed->reserve(removed_resources.size()); |
| 154 | + for (const auto& resource : removed_resources) { |
| 155 | + removed->push_back(resource); |
| 156 | + } |
| 157 | + parent_.update(added, removed); |
| 158 | + } |
| 159 | + void onConfigUpdateFailed(Config::ConfigUpdateFailureReason, const EnvoyException*) override { |
| 160 | + // Do nothing - feature is automatically disabled. |
| 161 | + // TODO: Potential issue with the expiration of the metadata. |
| 162 | + } |
| 163 | + WorkloadMetadataProviderImpl& parent_; |
| 164 | + Config::SubscriptionPtr subscription_; |
| 165 | + }; |
| 166 | + |
| 167 | + void reset(AddressIndexSharedPtr index) { |
| 168 | + tls_.runOnAllThreads([index](OptRef<ThreadLocalProvider> tls) { tls->reset(index); }); |
| 169 | + stats_.total_.set(tls_->total()); |
| 170 | + } |
| 171 | + |
| 172 | + void update(AddressIndexSharedPtr added, AddressVectorSharedPtr removed) { |
| 173 | + tls_.runOnAllThreads( |
| 174 | + [added, removed](OptRef<ThreadLocalProvider> tls) { tls->update(added, removed); }); |
| 175 | + stats_.total_.set(tls_->total()); |
| 176 | + } |
| 177 | + |
| 178 | + WorkloadDiscoveryStats generateStats(Stats::Scope& scope) { |
| 179 | + return WorkloadDiscoveryStats{WORKLOAD_DISCOVERY_STATS(POOL_GAUGE(scope))}; |
| 180 | + } |
| 181 | + |
| 182 | + const envoy::config::core::v3::ConfigSource config_source_; |
| 183 | + Server::Configuration::ServerFactoryContext& factory_context_; |
| 184 | + ThreadLocal::TypedSlot<ThreadLocalProvider> tls_; |
| 185 | + Stats::ScopeSharedPtr scope_; |
| 186 | + WorkloadDiscoveryStats stats_; |
| 187 | + WorkloadSubscription subscription_; |
| 188 | +}; |
| 189 | + |
22 | 190 | SINGLETON_MANAGER_REGISTRATION(WorkloadMetadataProvider) |
23 | 191 |
|
24 | 192 | class WorkloadDiscoveryExtension : public Server::BootstrapExtension { |
25 | | - public: |
26 | | - WorkloadDiscoveryExtension( |
27 | | - Server::Configuration::ServerFactoryContext& factory_context, |
28 | | - const istio::workload_discovery::v1::BootstrapExtension& config) |
| 193 | +public: |
| 194 | + WorkloadDiscoveryExtension(Server::Configuration::ServerFactoryContext& factory_context, |
| 195 | + const istio::workload::BootstrapExtension& config) |
29 | 196 | : factory_context_(factory_context), config_(config) {} |
30 | 197 |
|
31 | 198 | // Server::Configuration::BootstrapExtension |
32 | 199 | void onServerInitialized() override { |
33 | | - provider_ = |
34 | | - factory_context_.singletonManager().getTyped<WorkloadMetadataProvider>( |
35 | | - SINGLETON_MANAGER_REGISTERED_NAME(WorkloadMetadataProvider), [&] { |
36 | | - return std::make_shared<WorkloadMetadataProvider>( |
37 | | - config_.config_source(), factory_context_); |
38 | | - }); |
39 | | - /* Example: |
40 | | - provider_->fetch("127.0.0.1", [](const WorkloadRecordSharedPtr& record) { |
41 | | - std::cout << "1: " << (record->metadata_.has_value() ? |
42 | | - record->metadata_->baggage() : "(none)") |
43 | | - << std::endl; |
44 | | - }); |
45 | | - provider_->fetch("127.0.0.2", [](const WorkloadRecordSharedPtr& record) { |
46 | | - std::cout << "2: " << (record->metadata_.has_value() ? |
47 | | - record->metadata_->baggage() : "(none)") |
48 | | - << std::endl; |
49 | | - }); |
50 | | - */ |
| 200 | + provider_ = factory_context_.singletonManager().getTyped<WorkloadMetadataProvider>( |
| 201 | + SINGLETON_MANAGER_REGISTERED_NAME(WorkloadMetadataProvider), [&] { |
| 202 | + return std::make_shared<WorkloadMetadataProviderImpl>(config_.config_source(), |
| 203 | + factory_context_); |
| 204 | + }); |
51 | 205 | } |
52 | 206 |
|
53 | | - private: |
| 207 | +private: |
54 | 208 | Server::Configuration::ServerFactoryContext& factory_context_; |
55 | | - const istio::workload_discovery::v1::BootstrapExtension config_; |
| 209 | + const istio::workload::BootstrapExtension config_; |
56 | 210 | WorkloadMetadataProviderSharedPtr provider_; |
57 | 211 | }; |
58 | 212 |
|
59 | | -class WorkloadDiscoveryFactory |
60 | | - : public Server::Configuration::BootstrapExtensionFactory { |
61 | | - public: |
| 213 | +class WorkloadDiscoveryFactory : public Server::Configuration::BootstrapExtensionFactory { |
| 214 | +public: |
62 | 215 | // Server::Configuration::BootstrapExtensionFactory |
63 | | - Server::BootstrapExtensionPtr createBootstrapExtension( |
64 | | - const Protobuf::Message& config, |
65 | | - Server::Configuration::ServerFactoryContext& context) override { |
66 | | - const auto& message = MessageUtil::downcastAndValidate< |
67 | | - const istio::workload_discovery::v1::BootstrapExtension&>( |
68 | | - config, context.messageValidationVisitor()); |
| 216 | + Server::BootstrapExtensionPtr |
| 217 | + createBootstrapExtension(const Protobuf::Message& config, |
| 218 | + Server::Configuration::ServerFactoryContext& context) override { |
| 219 | + const auto& message = |
| 220 | + MessageUtil::downcastAndValidate<const istio::workload::BootstrapExtension&>( |
| 221 | + config, context.messageValidationVisitor()); |
69 | 222 | return std::make_unique<WorkloadDiscoveryExtension>(context, message); |
70 | 223 | } |
71 | 224 | ProtobufTypes::MessagePtr createEmptyConfigProto() override { |
72 | | - return std::make_unique< |
73 | | - istio::workload_discovery::v1::BootstrapExtension>(); |
| 225 | + return std::make_unique<istio::workload::BootstrapExtension>(); |
74 | 226 | } |
75 | | - std::string name() const override { |
76 | | - return "envoy.bootstrap.workload_discovery"; |
77 | | - }; |
| 227 | + std::string name() const override { return "envoy.bootstrap.workload_discovery"; }; |
78 | 228 | }; |
79 | 229 |
|
80 | | -REGISTER_FACTORY(WorkloadDiscoveryFactory, |
81 | | - Server::Configuration::BootstrapExtensionFactory); |
| 230 | +REGISTER_FACTORY(WorkloadDiscoveryFactory, Server::Configuration::BootstrapExtensionFactory); |
| 231 | + |
| 232 | +WorkloadMetadataProviderSharedPtr |
| 233 | +GetProvider(Server::Configuration::ServerFactoryContext& context) { |
| 234 | + return context.singletonManager().getTyped<WorkloadMetadataProvider>( |
| 235 | + SINGLETON_MANAGER_REGISTERED_NAME(WorkloadMetadataProvider)); |
| 236 | +} |
82 | 237 |
|
83 | | -} // namespace Envoy::Extensions::Common::WorkloadDiscovery |
| 238 | +} // namespace Envoy::Extensions::Common::WorkloadDiscovery |
0 commit comments