@@ -117,7 +117,6 @@ public class LocalDistributor extends Distributor implements Closeable {
117117 private final SlotSelector slotSelector ;
118118 private final Secret registrationSecret ;
119119 private final Regularly hostChecker = new Regularly ("distributor host checker" );
120- private final Regularly purgeDeadNodes = new Regularly ("Purge deadNodes" );
121120 private final Map <NodeId , Runnable > allChecks = new HashMap <>();
122121 private final Duration healthcheckInterval ;
123122
@@ -130,16 +129,24 @@ public class LocalDistributor extends Distributor implements Closeable {
130129 r -> {
131130 Thread thread = new Thread (r );
132131 thread .setDaemon (true );
133- thread .setName ("Local Distributor New Session Queue" );
132+ thread .setName ("Local Distributor - New Session Queue" );
134133 return thread ;
135134 });
136135
136+ private final ScheduledExecutorService purgeDeadNodesService =
137+ Executors .newSingleThreadScheduledExecutor (
138+ r -> {
139+ Thread thread = new Thread (r );
140+ thread .setDaemon (true );
141+ thread .setName ("Local Distributor - Purge Dead Nodes" );
142+ return thread ;
143+ });
137144
138145 private final Executor sessionCreatorExecutor = Executors .newFixedThreadPool (
139146 Runtime .getRuntime ().availableProcessors (),
140147 r -> {
141148 Thread thread = new Thread (r );
142- thread .setName ("Local Distributor session creation " );
149+ thread .setName ("Local Distributor - Session Creation " );
143150 thread .setDaemon (true );
144151 return thread ;
145152 }
@@ -187,7 +194,8 @@ public LocalDistributor(
187194 NewSessionRunnable newSessionRunnable = new NewSessionRunnable ();
188195 bus .addListener (NodeDrainComplete .listener (this ::remove ));
189196
190- purgeDeadNodes .submit (model ::purgeDeadNodes , Duration .ofSeconds (30 ), Duration .ofSeconds (30 ));
197+ Runnable purgeDeadNodes = model ::purgeDeadNodes ;
198+ purgeDeadNodesService .scheduleAtFixedRate (purgeDeadNodes , 30 , 30 , TimeUnit .SECONDS );
191199
192200 // if sessionRequestRetryInterval is 0, we will schedule session creation every 10 millis
193201 long period = sessionRequestRetryInterval .isZero () ?
@@ -590,7 +598,7 @@ private boolean reserve(SlotId id) {
590598 @ Override
591599 public void close () {
592600 LOG .info ("Shutting down Distributor executor service" );
593- purgeDeadNodes .shutdown ();
601+ purgeDeadNodesService .shutdown ();
594602 hostChecker .shutdown ();
595603 newSessionService .shutdown ();
596604 }
0 commit comments