2828import static org .openqa .selenium .remote .tracing .AttributeKey .SESSION_URI ;
2929import static org .openqa .selenium .remote .tracing .Tags .EXCEPTION ;
3030
31+ import com .google .common .collect .ImmutableMap ;
3132import com .google .common .collect .ImmutableSet ;
3233
34+ import net .jodah .failsafe .Failsafe ;
35+ import net .jodah .failsafe .RetryPolicy ;
36+
3337import org .openqa .selenium .Beta ;
3438import org .openqa .selenium .Capabilities ;
39+ import org .openqa .selenium .HealthCheckFailedException ;
3540import org .openqa .selenium .ImmutableCapabilities ;
3641import org .openqa .selenium .RetrySessionRequestException ;
3742import org .openqa .selenium .SessionNotCreatedException ;
3843import org .openqa .selenium .WebDriverException ;
39- import org .openqa .selenium .concurrent .Regularly ;
4044import org .openqa .selenium .events .EventBus ;
4145import org .openqa .selenium .grid .config .Config ;
4246import org .openqa .selenium .grid .data .CreateSessionRequest ;
@@ -116,7 +120,6 @@ public class LocalDistributor extends Distributor implements Closeable {
116120 private final SessionMap sessions ;
117121 private final SlotSelector slotSelector ;
118122 private final Secret registrationSecret ;
119- private final Regularly hostChecker = new Regularly ("distributor host checker" );
120123 private final Map <NodeId , Runnable > allChecks = new HashMap <>();
121124 private final Duration healthcheckInterval ;
122125
@@ -142,6 +145,15 @@ public class LocalDistributor extends Distributor implements Closeable {
142145 return thread ;
143146 });
144147
148+ private final ScheduledExecutorService nodeHealthCheckService =
149+ Executors .newSingleThreadScheduledExecutor (
150+ r -> {
151+ Thread thread = new Thread (r );
152+ thread .setDaemon (true );
153+ thread .setName ("Local Distributor - Node Health Check" );
154+ return thread ;
155+ });
156+
145157 private final Executor sessionCreatorExecutor = Executors .newFixedThreadPool (
146158 Runtime .getRuntime ().availableProcessors (),
147159 r -> {
@@ -197,6 +209,12 @@ public LocalDistributor(
197209 Runnable purgeDeadNodes = model ::purgeDeadNodes ;
198210 purgeDeadNodesService .scheduleAtFixedRate (purgeDeadNodes , 30 , 30 , TimeUnit .SECONDS );
199211
212+ nodeHealthCheckService .scheduleAtFixedRate (
213+ runNodeHealthChecks (),
214+ this .healthcheckInterval .toMillis (),
215+ this .healthcheckInterval .toMillis (),
216+ TimeUnit .MILLISECONDS );
217+
200218 // if sessionRequestRetryInterval is 0, we will schedule session creation every 10 millis
201219 long period = sessionRequestRetryInterval .isZero () ?
202220 10 : sessionRequestRetryInterval .toMillis ();
@@ -281,14 +299,26 @@ public LocalDistributor add(Node node) {
281299 try {
282300 model .add (node .getStatus ());
283301 nodes .put (node .getId (), node );
284- } catch (Exception e ){
302+ } catch (Exception e ) {
285303 return this ;
286304 }
287305
288306 // Extract the health check
289307 Runnable runnableHealthCheck = asRunnableHealthCheck (node );
290308 allChecks .put (node .getId (), runnableHealthCheck );
291- hostChecker .submit (runnableHealthCheck , healthcheckInterval , Duration .ofSeconds (30 ));
309+
310+ // Running the health check right after the Node registers itself. We retry the
311+ // execution because the Node might on a complex network topology. For example,
312+ // Kubernetes pods with IPs that take a while before they are reachable.
313+ RetryPolicy <Object > initialHealthCheckPolicy = new RetryPolicy <>()
314+ .withMaxAttempts (-1 )
315+ .withMaxDuration (Duration .ofSeconds (90 ))
316+ .withDelay (Duration .ofSeconds (15 ))
317+ .abortIf (result -> true );
318+
319+ LOG .log (getDebugLogLevel (), "Running initial health check for Node " + node .getId ());
320+ Executors .newSingleThreadExecutor ().submit (
321+ () -> Failsafe .with (initialHealthCheckPolicy ).run (runnableHealthCheck ::run ));
292322
293323 LOG .info (String .format (
294324 "Added node %s at %s. Health check every %ss" ,
@@ -301,10 +331,21 @@ public LocalDistributor add(Node node) {
301331 return this ;
302332 }
303333
334+ private Runnable runNodeHealthChecks () {
335+ return () -> {
336+ ImmutableMap <NodeId , Runnable > nodeHealthChecks = ImmutableMap .copyOf (allChecks );
337+ for (Runnable nodeHealthCheck : nodeHealthChecks .values ()) {
338+ nodeHealthCheck .run ();
339+ }
340+ };
341+ }
342+
304343 private Runnable asRunnableHealthCheck (Node node ) {
305344 HealthCheck healthCheck = node .getHealthCheck ();
306345 NodeId id = node .getId ();
307346 return () -> {
347+ boolean checkFailed = false ;
348+ Exception failedCheckException = null ;
308349 LOG .log (getDebugLogLevel (), "Running health check for " + node .getId ());
309350
310351 HealthCheck .Result result ;
@@ -313,19 +354,27 @@ private Runnable asRunnableHealthCheck(Node node) {
313354 } catch (Exception e ) {
314355 LOG .log (Level .WARNING , "Unable to process node " + id , e );
315356 result = new HealthCheck .Result (DOWN , "Unable to run healthcheck. Assuming down" );
357+ checkFailed = true ;
358+ failedCheckException = e ;
316359 }
317360
318361 Lock writeLock = lock .writeLock ();
319362 writeLock .lock ();
320363 try {
321364 LOG .log (
322365 getDebugLogLevel (),
323- String .format ("Health check result for %s was %s" , node .getId (), result .getAvailability ()));
366+ String .format (
367+ "Health check result for %s was %s" ,
368+ node .getId (),
369+ result .getAvailability ()));
324370 model .setAvailability (id , result .getAvailability ());
325371 model .updateHealthCheckCount (id , result .getAvailability ());
326372 } finally {
327373 writeLock .unlock ();
328374 }
375+ if (checkFailed ) {
376+ throw new HealthCheckFailedException ("Node " + id , failedCheckException );
377+ }
329378 };
330379 }
331380
@@ -355,10 +404,7 @@ public void remove(NodeId nodeId) {
355404 try {
356405 nodes .remove (nodeId );
357406 model .remove (nodeId );
358- Runnable runnable = allChecks .remove (nodeId );
359- if (runnable != null ) {
360- hostChecker .remove (runnable );
361- }
407+ allChecks .remove (nodeId );
362408 } finally {
363409 writeLock .unlock ();
364410 }
@@ -599,7 +645,7 @@ private boolean reserve(SlotId id) {
599645 public void close () {
600646 LOG .info ("Shutting down Distributor executor service" );
601647 purgeDeadNodesService .shutdown ();
602- hostChecker .shutdown ();
648+ nodeHealthCheckService .shutdown ();
603649 newSessionService .shutdown ();
604650 }
605651
0 commit comments