Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ class ClusterManager {
*/
virtual ThreadLocalCluster* getThreadLocalCluster(absl::string_view cluster) PURE;

/**
* @return std::shared_ptr<FutureCluster> the FutureCluster supplies ready check.
*/
virtual std::shared_ptr<FutureCluster>
futureThreadLocalCluster(absl::string_view cluster_name) PURE;

/**
* Remove a cluster via API. Only clusters added via addOrUpdateCluster() can
* be removed in this manner. Statically defined clusters present when Envoy starts cannot be
Expand Down
42 changes: 42 additions & 0 deletions include/envoy/upstream/thread_local_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,47 @@ class ThreadLocalCluster {

using ThreadLocalClusterOptRef = absl::optional<std::reference_wrapper<ThreadLocalCluster>>;

class FutureCluster;

// Used by FutureCluster user to operate FutureCluster. This class does not own FutureCluster.
class FutureClusterHandle {
public:
virtual ~FutureClusterHandle() = default;
virtual void cancel() PURE;
// FutureCluster& getCluster() { return future_cluster_; }

// private:
// FutureCluster& future_cluster_;
};
class FutureCluster {
public:
using Handle = FutureClusterHandle;
using ResumeCb = std::function<void(FutureCluster&)>;
FutureCluster(absl::string_view cluster_name, ClusterManager& cluster_manager)
: cluster_manager_(cluster_manager), cluster_name_(cluster_name) {}
virtual ~FutureCluster() = default;
virtual bool isReady() PURE;

// Obtain the underlying cluster. This can be called only if the future is ready. Notes that a
// ready future cluster doesn't always mean the thread local cluster is legit. The returned value
// is ONLY safe to use in the context of the owning call. See
// ClusterManager::getTThreadLocalCluster().
ThreadLocalCluster* getThreadLocalCluster();

absl::string_view getClusterName() { return cluster_name_; }

virtual std::unique_ptr<Handle> await(Event::Dispatcher& dispatcher, ResumeCb cb) PURE;
// UNREFERENCED_PARAMETER(dispatcher);
// ASSERT(cb_ == nullptr);
// cb_ = cb;
// return std::make_unique<FutureClusterHandle>(*this);
// }

private:
ResumeCb cb_;
ClusterManager& cluster_manager_;
std::string cluster_name_;
};

} // namespace Upstream
} // namespace Envoy
38 changes: 32 additions & 6 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,13 @@ Filter::Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager
}

Filter::~Filter() {
// This filter is destroyed while the future cluster is pending. Destroy the handle_ to prevent
// the future cluster callback.
if (future_cluster_handle_ != nullptr) {
future_cluster_handle_ = nullptr;
// TODO(lambdai): Add a new ResponseFlag and a metric.
getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoHealthyUpstream);
}
for (const auto& access_log : config_->accessLogs()) {
access_log->log(nullptr, nullptr, nullptr, getStreamInfo());
}
Expand Down Expand Up @@ -387,9 +394,22 @@ Network::FilterStatus Filter::initializeUpstreamConnection() {

const std::string& cluster_name = route_ ? route_->clusterName() : EMPTY_STRING;

Upstream::ThreadLocalCluster* thread_local_cluster =
cluster_manager_.getThreadLocalCluster(cluster_name);
std::shared_ptr<Upstream::FutureCluster> future =
cluster_manager_.futureThreadLocalCluster(cluster_name);
if (future->isReady()) {
onClusterReady(*future);
} else {
future_cluster_handle_ =
future->await(read_callbacks_->connection().dispatcher(),
[this](Upstream::FutureCluster& f) { onClusterReady(f); });
}
return Network::FilterStatus::StopIteration;
}

void Filter::onClusterReady(Upstream::FutureCluster& future) {
future_cluster_handle_ = nullptr;
Upstream::ThreadLocalCluster* thread_local_cluster = future.getThreadLocalCluster();
absl::string_view cluster_name = future.getClusterName();
if (thread_local_cluster) {
ENVOY_CONN_LOG(debug, "Creating connection to cluster {}", read_callbacks_->connection(),
cluster_name);
Expand All @@ -398,7 +418,7 @@ Network::FilterStatus Filter::initializeUpstreamConnection() {
config_->stats().downstream_cx_no_route_.inc();
getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoRouteFound);
onInitFailure(UpstreamFailureReason::NoRoute);
return Network::FilterStatus::StopIteration;
return;
}

Upstream::ClusterInfoConstSharedPtr cluster = thread_local_cluster->info();
Expand All @@ -410,15 +430,15 @@ Network::FilterStatus Filter::initializeUpstreamConnection() {
getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow);
cluster->stats().upstream_cx_overflow_.inc();
onInitFailure(UpstreamFailureReason::ResourceLimitExceeded);
return Network::FilterStatus::StopIteration;
return;
}

const uint32_t max_connect_attempts = config_->maxConnectAttempts();
if (connect_attempts_ >= max_connect_attempts) {
getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamRetryLimitExceeded);
cluster->stats().upstream_cx_connect_attempts_exceeded_.inc();
onInitFailure(UpstreamFailureReason::ConnectFailed);
return Network::FilterStatus::StopIteration;
return;
}

if (downstreamConnection()) {
Expand All @@ -445,7 +465,6 @@ Network::FilterStatus Filter::initializeUpstreamConnection() {
getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoHealthyUpstream);
onInitFailure(UpstreamFailureReason::NoHealthyUpstream);
}
return Network::FilterStatus::StopIteration;
}

bool Filter::maybeTunnel(Upstream::ThreadLocalCluster& cluster) {
Expand Down Expand Up @@ -590,6 +609,13 @@ void Filter::onDownstreamEvent(Network::ConnectionEvent event) {
generic_conn_pool_.reset();
}
}
// The future cluster callback can be waiting. Cancel the future callback.
if (future_cluster_handle_) {
if (event == Network::ConnectionEvent::LocalClose ||
event == Network::ConnectionEvent::RemoteClose) {
future_cluster_handle_.reset();
}
}
}

void Filter::onUpstreamData(Buffer::Instance& data, bool end_stream) {
Expand Down
3 changes: 3 additions & 0 deletions source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ class Filter : public Network::ReadFilter,

void initialize(Network::ReadFilterCallbacks& callbacks, bool set_connection_stats);
Network::FilterStatus initializeUpstreamConnection();
void onClusterReady(Upstream::FutureCluster& future);

bool maybeTunnel(Upstream::ThreadLocalCluster& cluster);
void onConnectTimeout();
void onDownstreamEvent(Network::ConnectionEvent event);
Expand All @@ -365,6 +367,7 @@ class Filter : public Network::ReadFilter,
Event::TimerPtr idle_timer_;
Event::TimerPtr connection_duration_timer_;

std::unique_ptr<Upstream::FutureCluster::Handle> future_cluster_handle_;
std::shared_ptr<UpstreamCallbacks> upstream_callbacks_; // shared_ptr required for passing as a
// read filter.
// The upstream handle (either TCP or HTTP). This is set in onGenericPoolReady and should persist
Expand Down
30 changes: 30 additions & 0 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,11 @@ ThreadLocalCluster* ClusterManagerImpl::getThreadLocalCluster(absl::string_view
}
}

std::shared_ptr<FutureCluster>
ClusterManagerImpl::futureThreadLocalCluster(absl::string_view cluster_name) {
return std::make_shared<ReadyFutureCluster>(cluster_name, *this);
}

void ClusterManagerImpl::maybePreconnect(
ThreadLocalClusterManagerImpl::ClusterEntry& cluster_entry,
const ClusterConnectivityState& state,
Expand Down Expand Up @@ -1541,5 +1546,30 @@ ProdClusterManagerFactory::createCds(const envoy::config::core::v3::ConfigSource
return CdsApiImpl::create(cds_config, cm, stats_, validation_context_.dynamicValidationVisitor());
}

ThreadLocalCluster* FutureCluster::getThreadLocalCluster() {
return cluster_manager_.getThreadLocalCluster(cluster_name_);
}

class DelayedFutureCluster::DumbHandle : public FutureCluster::Handle {
public:
DumbHandle(DelayedFutureCluster& future) : future_(future) { ASSERT(!future_.is_canceled_); }
void cancel() { future_.is_canceled_ = true; }

private:
DelayedFutureCluster& future_;
};

std::unique_ptr<FutureCluster::Handle> DelayedFutureCluster::await(Event::Dispatcher&,
ResumeCb cb) {
cb_ = [this, original_cb = std::move(cb)]() mutable {
if (!is_canceled_) {
original_cb(*this);
}
// Erase the function to release the captures.
original_cb = nullptr;
};
return std::make_unique<DumbHandle>(*this);
}

} // namespace Upstream
} // namespace Envoy
44 changes: 44 additions & 0 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "envoy/thread_local/thread_local.h"
#include "envoy/upstream/cluster_manager.h"

#include "common/common/assert.h"
#include "common/common/cleanup.h"
#include "common/config/grpc_mux_impl.h"
#include "common/config/subscription_factory_impl.h"
Expand Down Expand Up @@ -254,6 +255,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
const ClusterSet& primaryClusters() override { return primary_clusters_; }
ThreadLocalCluster* getThreadLocalCluster(absl::string_view cluster) override;

std::shared_ptr<FutureCluster> futureThreadLocalCluster(absl::string_view cluster_name) override;

bool removeCluster(const std::string& cluster) override;
void shutdown() override {
if (resume_cds_ != nullptr) {
Expand Down Expand Up @@ -606,5 +609,46 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
ClusterSet primary_clusters_;
};

class ReadyFutureCluster : public FutureCluster {
public:
ReadyFutureCluster(absl::string_view cluster_name, ClusterManager& cluster_manager)
: FutureCluster(cluster_name, cluster_manager) {}
bool isReady() override { return true; }
std::unique_ptr<Handle> await(Event::Dispatcher&, ResumeCb) override {
// User should not call await since ReadyFutureCluster is always ready.
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
};

// This future cluster readiness is controlled by the not-owning flag. It can be used by emulate
// never ready future cluster or eventually ready cluster.
class DelayedFutureCluster : public FutureCluster {
public:
DelayedFutureCluster(absl::string_view cluster_name, ClusterManager& cluster_manager,
bool& ready_flag)
: FutureCluster(cluster_name, cluster_manager), ready_(ready_flag) {}
// FutureCluster
bool isReady() override { return ready_; }
std::unique_ptr<Handle> await(Event::Dispatcher&, ResumeCb cb) override;

// Invoke the callback.
void readyCallback() {
ASSERT(ready_);
cb_();
}

private:
class DumbHandle;
friend class DumbHandle;
// The callback which is optional invoked on the target dispatcher.
std::function<void()> cb_;
// The flag if the future cluster is ready. This flag should only be switched from false to true.
bool& ready_;
// The flag if the ResumeCb is valid. This flag should only be switch false to true by handler.
// There is no mutex protecting this flag so the mutation must be executed on the same thread
// where the callback is invoked.
bool is_canceled_{false};
};

} // namespace Upstream
} // namespace Envoy
86 changes: 85 additions & 1 deletion test/common/tcp_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,90 @@ licenses(["notice"]) # Apache 2

envoy_package()

envoy_cc_test(
name = "config_test",
srcs = [
"config_test.cc",
"tcp_proxy_test_base.h",
],
deps = [
"//source/common/buffer:buffer_lib",
"//source/common/event:dispatcher_lib",
"//source/common/network:address_lib",
"//source/common/network:application_protocol_lib",
"//source/common/network:transport_socket_options_lib",
"//source/common/network:upstream_server_name_lib",
"//source/common/stats:stats_lib",
"//source/common/tcp_proxy",
"//source/common/upstream:upstream_includes",
"//source/common/upstream:upstream_lib",
"//source/extensions/access_loggers:well_known_names",
"//source/extensions/access_loggers/file:config",
"//source/extensions/upstreams/http/generic:config",
"//test/common/upstream:utility_lib",
"//test/mocks/buffer:buffer_mocks",
"//test/mocks/network:network_mocks",
"//test/mocks/runtime:runtime_mocks",
"//test/mocks/server:factory_context_mocks",
"//test/mocks/server:instance_mocks",
"//test/mocks/ssl:ssl_mocks",
"//test/mocks/stream_info:stream_info_mocks",
"//test/mocks/upstream:cluster_manager_mocks",
"//test/mocks/upstream:host_mocks",
"//test/test_common:test_runtime_lib",
"@envoy_api//envoy/config/accesslog/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/access_loggers/file/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/network/tcp_proxy/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/upstreams/http/generic/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/upstreams/tcp/generic/v3:pkg_cc_proto",
],
)

envoy_cc_test(
name = "tcp_proxy_test",
srcs = ["tcp_proxy_test.cc"],
srcs = [
"tcp_proxy_test.cc",
"tcp_proxy_test_base.h",
],
deps = [
"//source/common/buffer:buffer_lib",
"//source/common/event:dispatcher_lib",
"//source/common/network:address_lib",
"//source/common/network:application_protocol_lib",
"//source/common/network:transport_socket_options_lib",
"//source/common/network:upstream_server_name_lib",
"//source/common/stats:stats_lib",
"//source/common/tcp_proxy",
"//source/common/upstream:upstream_includes",
"//source/common/upstream:upstream_lib",
"//source/extensions/access_loggers:well_known_names",
"//source/extensions/access_loggers/file:config",
"//source/extensions/upstreams/http/generic:config",
"//test/common/upstream:utility_lib",
"//test/mocks/buffer:buffer_mocks",
"//test/mocks/network:network_mocks",
"//test/mocks/runtime:runtime_mocks",
"//test/mocks/server:factory_context_mocks",
"//test/mocks/server:instance_mocks",
"//test/mocks/ssl:ssl_mocks",
"//test/mocks/stream_info:stream_info_mocks",
"//test/mocks/upstream:cluster_manager_mocks",
"//test/mocks/upstream:host_mocks",
"//test/test_common:test_runtime_lib",
"@envoy_api//envoy/config/accesslog/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/access_loggers/file/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/network/tcp_proxy/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/upstreams/http/generic/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/upstreams/tcp/generic/v3:pkg_cc_proto",
],
)

envoy_cc_test(
name = "tcp_proxy_cluster_test",
srcs = [
"tcp_proxy_cluster_test.cc",
"tcp_proxy_test_base.h",
],
deps = [
"//source/common/buffer:buffer_lib",
"//source/common/event:dispatcher_lib",
Expand All @@ -20,6 +101,7 @@ envoy_cc_test(
"//source/common/network:upstream_server_name_lib",
"//source/common/stats:stats_lib",
"//source/common/tcp_proxy",
"//source/common/upstream:cluster_manager_lib",
"//source/common/upstream:upstream_includes",
"//source/common/upstream:upstream_lib",
"//source/extensions/access_loggers:well_known_names",
Expand All @@ -33,6 +115,7 @@ envoy_cc_test(
"//test/mocks/server:instance_mocks",
"//test/mocks/ssl:ssl_mocks",
"//test/mocks/stream_info:stream_info_mocks",
"//test/mocks/upstream:cluster_manager_mocks",
"//test/mocks/upstream:host_mocks",
"//test/test_common:test_runtime_lib",
"@envoy_api//envoy/config/accesslog/v3:pkg_cc_proto",
Expand All @@ -50,5 +133,6 @@ envoy_cc_test(
"//source/common/tcp_proxy",
"//test/mocks/http:http_mocks",
"//test/mocks/tcp:tcp_mocks",
"//test/mocks/upstream:cluster_manager_mocks",
],
)
Loading