Skip to content

Commit ba40cc9

Browse files
diohtuch
authored andcommitted
upstream: rebuild cluster when health check config is changed (#4075)
This PR makes sure changing health check address triggers rebuilding the cluster. Risk Level: medium Testing: unit, manual as depicted by @bplotnick in #4070. Docs Changes: N/A Release Notes: N/A Fixes #4070 Signed-off-by: Dhi Aurrahman <[email protected]>
1 parent 05c0d52 commit ba40cc9

File tree

2 files changed

+176
-2
lines changed

2 files changed

+176
-2
lines changed

source/common/upstream/upstream_impl.cc

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -905,9 +905,22 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(
905905
continue;
906906
}
907907

908+
// To match a new host with an existing host means comparing their addresses.
908909
auto existing_host = all_hosts_.find(host->address()->asString());
909-
910-
if (existing_host != all_hosts_.end()) {
910+
const bool existing_host_found = existing_host != all_hosts_.end();
911+
912+
// Check if in-place host update should be skipped, i.e. when the following criteria are met
913+
// (currently there is only one criterion, but we might add more in the future):
914+
// - The cluster health checker is activated and a new host is matched with the existing one,
915+
// but the health check address is different.
916+
const bool skip_inplace_host_update =
917+
health_checker_ != nullptr && existing_host_found &&
918+
*existing_host->second->healthCheckAddress() != *host->healthCheckAddress();
919+
920+
// When there is a match and we decided to do in-place update, we potentially update the host's
921+
// health check flag and metadata. Afterwards, the host is pushed back into the final_hosts,
922+
// i.e. hosts that should be preserved in the current priority.
923+
if (existing_host_found && !skip_inplace_host_update) {
911924
existing_hosts_for_current_priority.emplace(existing_host->first);
912925
// If we find a host matched based on address, we keep it. However we do change weight inline
913926
// so do that here.

test/common/upstream/eds_test.cc

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,89 @@ class EdsTest : public testing::Test {
7575
NiceMock<LocalInfo::MockLocalInfo> local_info_;
7676
};
7777

78+
class EdsWithHealthCheckUpdateTest : public EdsTest {
79+
protected:
80+
EdsWithHealthCheckUpdateTest() {}
81+
82+
// Build the initial cluster with some endpoints.
83+
void initializeCluster(const std::vector<uint32_t> endpoint_ports,
84+
const bool drain_connections_on_host_removal) {
85+
resetCluster(drain_connections_on_host_removal);
86+
87+
auto health_checker = std::make_shared<MockHealthChecker>();
88+
EXPECT_CALL(*health_checker, start());
89+
EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)).Times(2);
90+
cluster_->setHealthChecker(health_checker);
91+
92+
cluster_load_assignment_ = resources_.Add();
93+
cluster_load_assignment_->set_cluster_name("fare");
94+
95+
for (const auto& port : endpoint_ports) {
96+
addEndpoint(port);
97+
}
98+
99+
VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources_, ""));
100+
101+
// Make sure the cluster is rebuilt.
102+
EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value());
103+
{
104+
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
105+
EXPECT_EQ(hosts.size(), 2);
106+
107+
EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
108+
EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
109+
110+
// Mark the hosts as healthy
111+
hosts[0]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
112+
hosts[1]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
113+
}
114+
}
115+
116+
void resetCluster(const bool drain_connections_on_host_removal) {
117+
const std::string config = R"EOF(
118+
name: name
119+
connect_timeout: 0.25s
120+
type: EDS
121+
lb_policy: ROUND_ROBIN
122+
drain_connections_on_host_removal: {}
123+
eds_cluster_config:
124+
service_name: fare
125+
eds_config:
126+
api_config_source:
127+
cluster_names:
128+
- eds
129+
refresh_delay: 1s
130+
)EOF";
131+
EdsTest::resetCluster(fmt::format(config, drain_connections_on_host_removal));
132+
}
133+
134+
void addEndpoint(const uint32_t port) {
135+
auto* endpoints = cluster_load_assignment_->add_endpoints();
136+
auto* socket_address = endpoints->add_lb_endpoints()
137+
->mutable_endpoint()
138+
->mutable_address()
139+
->mutable_socket_address();
140+
socket_address->set_address("1.2.3.4");
141+
socket_address->set_port_value(port);
142+
}
143+
144+
void updateEndpointHealthCheckPortAtIndex(const uint32_t index, const uint32_t port) {
145+
cluster_load_assignment_->mutable_endpoints(index)
146+
->mutable_lb_endpoints(0)
147+
->mutable_endpoint()
148+
->mutable_health_check_config()
149+
->set_port_value(port);
150+
151+
VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources_, ""));
152+
153+
// Always rebuild if health check config is changed.
154+
EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value());
155+
}
156+
157+
Protobuf::RepeatedPtrField<envoy::api::v2::ClusterLoadAssignment> resources_;
158+
envoy::api::v2::ClusterLoadAssignment* cluster_load_assignment_;
159+
};
160+
78161
// Negative test for protoc-gen-validate constraints.
79162
TEST_F(EdsTest, ValidateFail) {
80163
Protobuf::RepeatedPtrField<envoy::api::v2::ClusterLoadAssignment> resources;
@@ -1152,6 +1235,84 @@ TEST_F(EdsTest, PriorityAndLocalityWeighted) {
11521235
EXPECT_EQ(1UL, stats_.counter("cluster.name.update_no_rebuild").value());
11531236
}
11541237

1238+
TEST_F(EdsWithHealthCheckUpdateTest, EndpointUpdateHealthCheckConfig) {
1239+
const std::vector<uint32_t> endpoint_ports = {80, 81};
1240+
const uint32_t new_health_check_port = 8000;
1241+
1242+
// Initialize the cluster with two endpoints without draining connections on host removal.
1243+
initializeCluster(endpoint_ports, false);
1244+
1245+
updateEndpointHealthCheckPortAtIndex(0, new_health_check_port);
1246+
{
1247+
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
1248+
EXPECT_EQ(hosts.size(), 3);
1249+
// Make sure the first endpoint health check port is updated.
1250+
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());
1251+
1252+
EXPECT_NE(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());
1253+
EXPECT_NE(new_health_check_port, hosts[2]->healthCheckAddress()->ip()->port());
1254+
EXPECT_EQ(endpoint_ports[1], hosts[1]->healthCheckAddress()->ip()->port());
1255+
EXPECT_EQ(endpoint_ports[0], hosts[2]->healthCheckAddress()->ip()->port());
1256+
1257+
EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
1258+
1259+
// The old hosts are still active. The health checker continues to do health checking to these
1260+
// hosts, until they are removed.
1261+
EXPECT_FALSE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
1262+
EXPECT_FALSE(hosts[2]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
1263+
}
1264+
1265+
updateEndpointHealthCheckPortAtIndex(1, new_health_check_port);
1266+
{
1267+
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
1268+
EXPECT_EQ(hosts.size(), 4);
1269+
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());
1270+
1271+
// Make sure the second endpoint health check port is updated.
1272+
EXPECT_EQ(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());
1273+
1274+
EXPECT_EQ(endpoint_ports[1], hosts[2]->healthCheckAddress()->ip()->port());
1275+
EXPECT_EQ(endpoint_ports[0], hosts[3]->healthCheckAddress()->ip()->port());
1276+
1277+
EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
1278+
EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
1279+
1280+
// The old hosts are still active.
1281+
EXPECT_FALSE(hosts[2]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
1282+
EXPECT_FALSE(hosts[3]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
1283+
}
1284+
}
1285+
1286+
TEST_F(EdsWithHealthCheckUpdateTest, EndpointUpdateHealthCheckConfigWithDrainConnectionsOnRemoval) {
1287+
const std::vector<uint32_t> endpoint_ports = {80, 81};
1288+
const uint32_t new_health_check_port = 8000;
1289+
1290+
// Initialize the cluster with two endpoints with draining connections on host removal.
1291+
initializeCluster(endpoint_ports, true);
1292+
1293+
updateEndpointHealthCheckPortAtIndex(0, new_health_check_port);
1294+
{
1295+
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
1296+
// Since drain_connections_on_host_removal is set to true, the old hosts are removed
1297+
// immediately.
1298+
EXPECT_EQ(hosts.size(), 2);
1299+
// Make sure the first endpoint health check port is updated.
1300+
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());
1301+
1302+
EXPECT_NE(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());
1303+
}
1304+
1305+
updateEndpointHealthCheckPortAtIndex(1, new_health_check_port);
1306+
{
1307+
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
1308+
EXPECT_EQ(hosts.size(), 2);
1309+
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());
1310+
1311+
// Make sure the second endpoint health check port is updated.
1312+
EXPECT_EQ(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());
1313+
}
1314+
}
1315+
11551316
// Throw on adding a new resource with an invalid endpoint (since the given address is invalid).
11561317
TEST_F(EdsTest, MalformedIP) {
11571318
Protobuf::RepeatedPtrField<envoy::api::v2::ClusterLoadAssignment> resources;

0 commit comments

Comments
 (0)