@@ -753,7 +753,17 @@ private class NewSessionRunnable implements Runnable {
753753
754754 @ Override
755755 public void run () {
756- boolean loop = true ;
756+ Set <RequestId > inQueue ;
757+ boolean loop ;
758+ if (rejectUnsupportedCaps ) {
759+ inQueue = sessionQueue .getQueueContents ().stream ()
760+ .map (SessionRequestCapability ::getRequestId )
761+ .collect (Collectors .toSet ());
762+ loop = !inQueue .isEmpty ();
763+ } else {
764+ inQueue = null ;
765+ loop = true ;
766+ }
757767 while (loop ) {
758768 // We deliberately run this outside of a lock: if we're unsuccessful
759769 // starting the session, we just put the request back on the queue.
@@ -762,13 +772,11 @@ public void run() {
762772 Map <Capabilities , Long > stereotypes =
763773 getAvailableNodes ().stream ()
764774 .filter (NodeStatus ::hasCapacity )
765- .map (
775+ .flatMap (
766776 node ->
767777 node .getSlots ().stream ()
768- .map (Slot ::getStereotype )
769- .collect (Collectors .toList ()))
770- .flatMap (Collection ::stream )
771- .collect (Collectors .groupingBy (ImmutableCapabilities ::new , Collectors .counting ()));
778+ .map (Slot ::getStereotype ))
779+ .collect (Collectors .groupingBy (ImmutableCapabilities ::copyOf , Collectors .counting ()));
772780
773781 if (!stereotypes .isEmpty ()) {
774782 List <SessionRequest > matchingRequests = sessionQueue .getNextAvailable (stereotypes );
@@ -780,7 +788,9 @@ public void run() {
780788 }
781789 }
782790 if (rejectUnsupportedCaps ) {
783- checkMatchingSlot (sessionQueue .getQueueContents ());
791+ checkMatchingSlot (sessionQueue .getQueueContents ().stream ()
792+ .filter ((src ) -> inQueue .contains (src .getRequestId ()))
793+ .collect (Collectors .toList ()));
784794 }
785795 }
786796
0 commit comments