@@ -161,32 +161,41 @@ private LocalNode(
161161 String .format ("%s is %s" , uri , status .getAvailability ()));
162162 } : healthCheck ;
163163
164+ this .tempFileSystems = CacheBuilder .newBuilder ()
165+ .expireAfterAccess (sessionTimeout )
166+ .ticker (ticker )
167+ .removalListener ((RemovalListener <SessionId , TemporaryFilesystem >) notification -> {
168+ TemporaryFilesystem tempFS = notification .getValue ();
169+ tempFS .deleteTemporaryFiles ();
170+ tempFS .deleteBaseDir ();
171+ })
172+ .build ();
173+
164174 this .currentSessions = CacheBuilder .newBuilder ()
165175 .expireAfterAccess (sessionTimeout )
166176 .ticker (ticker )
167177 .removalListener ((RemovalListener <SessionId , SessionSlot >) notification -> {
168178 if (notification .getKey () != null && notification .getValue () != null ) {
169179 // Attempt to stop the session
170180 SessionSlot slot = notification .getValue ();
171- if (!slot .isAvailable ()) {
172- slot .stop ();
181+ SessionId sessionId = notification .getKey ();
182+ slot .stop ();
183+ // Invalidate temp file system
184+ this .tempFileSystems .invalidate (sessionId );
185+ // Decrement pending sessions if Node is draining
186+ if (this .isDraining ()) {
187+ int done = pendingSessions .decrementAndGet ();
188+ if (done <= 0 ) {
189+ LOG .info ("Node draining complete!" );
190+ bus .fire (new NodeDrainComplete (this .getId ()));
191+ }
173192 }
174193 } else {
175194 LOG .log (Debug .getDebugLogLevel (), "Received stop session notification with null values" );
176195 }
177196 })
178197 .build ();
179198
180- this .tempFileSystems = CacheBuilder .newBuilder ()
181- .expireAfterAccess (sessionTimeout )
182- .ticker (ticker )
183- .removalListener ((RemovalListener <SessionId , TemporaryFilesystem >) notification -> {
184- TemporaryFilesystem tempFS = notification .getValue ();
185- tempFS .deleteTemporaryFiles ();
186- tempFS .deleteBaseDir ();
187- })
188- .build ();
189-
190199 ScheduledExecutorService sessionCleanupNodeService =
191200 Executors .newSingleThreadScheduledExecutor (
192201 r -> {
@@ -486,16 +495,6 @@ public void stop(SessionId id) throws NoSuchSessionException {
486495 }
487496
488497 currentSessions .invalidate (id );
489- tempFileSystems .invalidate (id );
490-
491- // Decrement pending sessions if Node is draining
492- if (this .isDraining ()) {
493- int done = pendingSessions .decrementAndGet ();
494- if (done <= 0 ) {
495- LOG .info ("Node draining complete!" );
496- bus .fire (new NodeDrainComplete (this .getId ()));
497- }
498- }
499498 }
500499
501500 private void stopAllSessions () {
0 commit comments