Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -905,9 +905,22 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(
continue;
}

// To match a new host with an existing host means comparing their addresses.
auto existing_host = all_hosts_.find(host->address()->asString());

if (existing_host != all_hosts_.end()) {
const bool existing_host_found = existing_host != all_hosts_.end();

// Check if in-place host update should be skipped, i.e. when the following criteria are met
// (currently there is only one criterion, but we might add more in the future):
// - The cluster health checker is activated and a new host is matched with the existing one,
// but the health check address is different.
const bool skip_inplace_host_update =
health_checker_ != nullptr && existing_host_found &&
*existing_host->second->healthCheckAddress() != *host->healthCheckAddress();

// When there is a match and we decided to do in-place update, we potentially update the host's
// health check flag and metadata. Afterwards, the host is pushed back into the final_hosts,
// i.e. hosts that should be preserved in the current priority.
if (existing_host_found && !skip_inplace_host_update) {
existing_hosts_for_current_priority.emplace(existing_host->first);
// If we find a host matched based on address, we keep it. However we do change weight inline
// so do that here.
Expand Down
161 changes: 161 additions & 0 deletions test/common/upstream/eds_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,89 @@ class EdsTest : public testing::Test {
NiceMock<LocalInfo::MockLocalInfo> local_info_;
};

class EdsWithHealthCheckUpdateTest : public EdsTest {
protected:
EdsWithHealthCheckUpdateTest() {}

// Build the initial cluster with some endpoints.
void initializeCluster(const std::vector<uint32_t> endpoint_ports,
const bool drain_connections_on_host_removal) {
resetCluster(drain_connections_on_host_removal);

auto health_checker = std::make_shared<MockHealthChecker>();
EXPECT_CALL(*health_checker, start());
EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)).Times(2);
cluster_->setHealthChecker(health_checker);

cluster_load_assignment_ = resources_.Add();
cluster_load_assignment_->set_cluster_name("fare");

for (const auto& port : endpoint_ports) {
addEndpoint(port);
}

VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources_, ""));

// Make sure the cluster is rebuilt.
EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value());
{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);

EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));

// Mark the hosts as healthy
hosts[0]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
hosts[1]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
}
}

void resetCluster(const bool drain_connections_on_host_removal) {
const std::string config = R"EOF(
name: name
connect_timeout: 0.25s
type: EDS
lb_policy: ROUND_ROBIN
drain_connections_on_host_removal: {}
eds_cluster_config:
service_name: fare
eds_config:
api_config_source:
cluster_names:
- eds
refresh_delay: 1s
)EOF";
EdsTest::resetCluster(fmt::format(config, drain_connections_on_host_removal));
}

void addEndpoint(const uint32_t port) {
auto* endpoints = cluster_load_assignment_->add_endpoints();
auto* socket_address = endpoints->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
socket_address->set_address("1.2.3.4");
socket_address->set_port_value(port);
}

void updateEndpointHealthCheckPortAtIndex(const uint32_t index, const uint32_t port) {
cluster_load_assignment_->mutable_endpoints(index)
->mutable_lb_endpoints(0)
->mutable_endpoint()
->mutable_health_check_config()
->set_port_value(port);

VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources_, ""));

// Always rebuild if health check config is changed.
EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value());
}

Protobuf::RepeatedPtrField<envoy::api::v2::ClusterLoadAssignment> resources_;
envoy::api::v2::ClusterLoadAssignment* cluster_load_assignment_;
};

// Negative test for protoc-gen-validate constraints.
TEST_F(EdsTest, ValidateFail) {
Protobuf::RepeatedPtrField<envoy::api::v2::ClusterLoadAssignment> resources;
Expand Down Expand Up @@ -1152,6 +1235,84 @@ TEST_F(EdsTest, PriorityAndLocalityWeighted) {
EXPECT_EQ(1UL, stats_.counter("cluster.name.update_no_rebuild").value());
}

TEST_F(EdsWithHealthCheckUpdateTest, EndpointUpdateHealthCheckConfig) {
const std::vector<uint32_t> endpoint_ports = {80, 81};
const uint32_t new_health_check_port = 8000;

// Initialize the cluster with two endpoints without draining connections on host removal.
initializeCluster(endpoint_ports, false);

updateEndpointHealthCheckPortAtIndex(0, new_health_check_port);
{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 3);
// Make sure the first endpoint health check port is updated.
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());

EXPECT_NE(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());
EXPECT_NE(new_health_check_port, hosts[2]->healthCheckAddress()->ip()->port());
EXPECT_EQ(endpoint_ports[1], hosts[1]->healthCheckAddress()->ip()->port());
EXPECT_EQ(endpoint_ports[0], hosts[2]->healthCheckAddress()->ip()->port());

EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));

// The old hosts are still active. The health checker continues to do health checking to these
// hosts, until they are removed.
EXPECT_FALSE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
EXPECT_FALSE(hosts[2]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
}

updateEndpointHealthCheckPortAtIndex(1, new_health_check_port);
{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 4);
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());

// Make sure the second endpoint health check port is updated.
EXPECT_EQ(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());

EXPECT_EQ(endpoint_ports[1], hosts[2]->healthCheckAddress()->ip()->port());
EXPECT_EQ(endpoint_ports[0], hosts[3]->healthCheckAddress()->ip()->port());

EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));

// The old hosts are still active.
EXPECT_FALSE(hosts[2]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
EXPECT_FALSE(hosts[3]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
}
}

TEST_F(EdsWithHealthCheckUpdateTest, EndpointUpdateHealthCheckConfigWithDrainConnectionsOnRemoval) {
const std::vector<uint32_t> endpoint_ports = {80, 81};
const uint32_t new_health_check_port = 8000;

// Initialize the cluster with two endpoints with draining connections on host removal.
initializeCluster(endpoint_ports, true);

updateEndpointHealthCheckPortAtIndex(0, new_health_check_port);
{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
// Since drain_connections_on_host_removal is set to true, the old hosts are removed
// immediately.
EXPECT_EQ(hosts.size(), 2);
// Make sure the first endpoint health check port is updated.
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());

EXPECT_NE(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());
}

updateEndpointHealthCheckPortAtIndex(1, new_health_check_port);
{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());

// Make sure the second endpoint health check port is updated.
EXPECT_EQ(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());
}
}

// Throw on adding a new resource with an invalid endpoint (since the given address is invalid).
TEST_F(EdsTest, MalformedIP) {
Protobuf::RepeatedPtrField<envoy::api::v2::ClusterLoadAssignment> resources;
Expand Down