Skip to content

Commit 127fe0f

Browse files
committed
[grid] Improving Node health check execution
This commit does a couple of things... First, it removes the Regularly used to run all health checks and now uses a ScheduledExecutorService. This was done because this replacement has shown better CPU performance (checked with VisualVM(. Second, it runs the health check when the Node is registered to set the availability to UP. This was already being done by Regularly. But if the check was failing for any reason, no retry was attempted, and the next check execution was when the Regularly schedule interval kicks in. Instead of that, we use now Failsafe to run a retry when the initial execution of the health check fails. This is useful in environments like Kubernetes, since the pod comes up, but it might take a few seconds until the Distributor can reach the IP assigned to the pod. #9847
1 parent f860014 commit 127fe0f

3 files changed

Lines changed: 85 additions & 10 deletions

File tree

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Licensed to the Software Freedom Conservancy (SFC) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The SFC licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.openqa.selenium;
19+
20+
/**
21+
* Indicates that a Node health check failed.
22+
*/
23+
public class HealthCheckFailedException extends WebDriverException {
24+
25+
public HealthCheckFailedException(String msg, Throwable cause) {
26+
super(msg, cause);
27+
}
28+
}

java/src/org/openqa/selenium/grid/distributor/local/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,6 @@ java_library(
3030
"//java/src/org/openqa/selenium/json",
3131
"//java/src/org/openqa/selenium/remote",
3232
artifact("com.google.guava:guava"),
33+
artifact("net.jodah:failsafe"),
3334
],
3435
)

java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,19 @@
2828
import static org.openqa.selenium.remote.tracing.AttributeKey.SESSION_URI;
2929
import static org.openqa.selenium.remote.tracing.Tags.EXCEPTION;
3030

31+
import com.google.common.collect.ImmutableMap;
3132
import com.google.common.collect.ImmutableSet;
3233

34+
import net.jodah.failsafe.Failsafe;
35+
import net.jodah.failsafe.RetryPolicy;
36+
3337
import org.openqa.selenium.Beta;
3438
import org.openqa.selenium.Capabilities;
39+
import org.openqa.selenium.HealthCheckFailedException;
3540
import org.openqa.selenium.ImmutableCapabilities;
3641
import org.openqa.selenium.RetrySessionRequestException;
3742
import org.openqa.selenium.SessionNotCreatedException;
3843
import org.openqa.selenium.WebDriverException;
39-
import org.openqa.selenium.concurrent.Regularly;
4044
import org.openqa.selenium.events.EventBus;
4145
import org.openqa.selenium.grid.config.Config;
4246
import org.openqa.selenium.grid.data.CreateSessionRequest;
@@ -116,7 +120,6 @@ public class LocalDistributor extends Distributor implements Closeable {
116120
private final SessionMap sessions;
117121
private final SlotSelector slotSelector;
118122
private final Secret registrationSecret;
119-
private final Regularly hostChecker = new Regularly("distributor host checker");
120123
private final Map<NodeId, Runnable> allChecks = new HashMap<>();
121124
private final Duration healthcheckInterval;
122125

@@ -142,6 +145,15 @@ public class LocalDistributor extends Distributor implements Closeable {
142145
return thread;
143146
});
144147

148+
private final ScheduledExecutorService nodeHealthCheckService =
149+
Executors.newSingleThreadScheduledExecutor(
150+
r -> {
151+
Thread thread = new Thread(r);
152+
thread.setDaemon(true);
153+
thread.setName("Local Distributor - Node Health Check");
154+
return thread;
155+
});
156+
145157
private final Executor sessionCreatorExecutor = Executors.newFixedThreadPool(
146158
Runtime.getRuntime().availableProcessors(),
147159
r -> {
@@ -197,6 +209,12 @@ public LocalDistributor(
197209
Runnable purgeDeadNodes = model::purgeDeadNodes;
198210
purgeDeadNodesService.scheduleAtFixedRate(purgeDeadNodes, 30, 30, TimeUnit.SECONDS);
199211

212+
nodeHealthCheckService.scheduleAtFixedRate(
213+
runNodeHealthChecks(),
214+
this.healthcheckInterval.toMillis(),
215+
this.healthcheckInterval.toMillis(),
216+
TimeUnit.MILLISECONDS);
217+
200218
// if sessionRequestRetryInterval is 0, we will schedule session creation every 10 millis
201219
long period = sessionRequestRetryInterval.isZero() ?
202220
10 : sessionRequestRetryInterval.toMillis();
@@ -281,14 +299,26 @@ public LocalDistributor add(Node node) {
281299
try {
282300
model.add(node.getStatus());
283301
nodes.put(node.getId(), node);
284-
} catch (Exception e){
302+
} catch (Exception e) {
285303
return this;
286304
}
287305

288306
// Extract the health check
289307
Runnable runnableHealthCheck = asRunnableHealthCheck(node);
290308
allChecks.put(node.getId(), runnableHealthCheck);
291-
hostChecker.submit(runnableHealthCheck, healthcheckInterval, Duration.ofSeconds(30));
309+
310+
// Running the health check right after the Node registers itself. We retry the
311+
// execution because the Node might on a complex network topology. For example,
312+
// Kubernetes pods with IPs that take a while before they are reachable.
313+
RetryPolicy<Object> initialHealthCheckPolicy = new RetryPolicy<>()
314+
.withMaxAttempts(-1)
315+
.withMaxDuration(Duration.ofSeconds(90))
316+
.withDelay(Duration.ofSeconds(15))
317+
.abortIf(result -> true);
318+
319+
LOG.log(getDebugLogLevel(), "Running initial health check for Node " + node.getId());
320+
Executors.newSingleThreadExecutor().submit(
321+
() -> Failsafe.with(initialHealthCheckPolicy).run(runnableHealthCheck::run));
292322

293323
LOG.info(String.format(
294324
"Added node %s at %s. Health check every %ss",
@@ -301,10 +331,21 @@ public LocalDistributor add(Node node) {
301331
return this;
302332
}
303333

334+
private Runnable runNodeHealthChecks() {
335+
return () -> {
336+
ImmutableMap<NodeId, Runnable> nodeHealthChecks = ImmutableMap.copyOf(allChecks);
337+
for (Runnable nodeHealthCheck : nodeHealthChecks.values()) {
338+
nodeHealthCheck.run();
339+
}
340+
};
341+
}
342+
304343
private Runnable asRunnableHealthCheck(Node node) {
305344
HealthCheck healthCheck = node.getHealthCheck();
306345
NodeId id = node.getId();
307346
return () -> {
347+
boolean checkFailed = false;
348+
Exception failedCheckException = null;
308349
LOG.log(getDebugLogLevel(), "Running health check for " + node.getId());
309350

310351
HealthCheck.Result result;
@@ -313,19 +354,27 @@ private Runnable asRunnableHealthCheck(Node node) {
313354
} catch (Exception e) {
314355
LOG.log(Level.WARNING, "Unable to process node " + id, e);
315356
result = new HealthCheck.Result(DOWN, "Unable to run healthcheck. Assuming down");
357+
checkFailed = true;
358+
failedCheckException = e;
316359
}
317360

318361
Lock writeLock = lock.writeLock();
319362
writeLock.lock();
320363
try {
321364
LOG.log(
322365
getDebugLogLevel(),
323-
String.format("Health check result for %s was %s", node.getId(), result.getAvailability()));
366+
String.format(
367+
"Health check result for %s was %s",
368+
node.getId(),
369+
result.getAvailability()));
324370
model.setAvailability(id, result.getAvailability());
325371
model.updateHealthCheckCount(id, result.getAvailability());
326372
} finally {
327373
writeLock.unlock();
328374
}
375+
if (checkFailed) {
376+
throw new HealthCheckFailedException("Node " + id, failedCheckException);
377+
}
329378
};
330379
}
331380

@@ -355,10 +404,7 @@ public void remove(NodeId nodeId) {
355404
try {
356405
nodes.remove(nodeId);
357406
model.remove(nodeId);
358-
Runnable runnable = allChecks.remove(nodeId);
359-
if (runnable != null) {
360-
hostChecker.remove(runnable);
361-
}
407+
allChecks.remove(nodeId);
362408
} finally {
363409
writeLock.unlock();
364410
}
@@ -599,7 +645,7 @@ private boolean reserve(SlotId id) {
599645
public void close() {
600646
LOG.info("Shutting down Distributor executor service");
601647
purgeDeadNodesService.shutdown();
602-
hostChecker.shutdown();
648+
nodeHealthCheckService.shutdown();
603649
newSessionService.shutdown();
604650
}
605651

0 commit comments

Comments
 (0)