3939import java .io .IOException ;
4040import java .util .ArrayList ;
4141import java .util .List ;
42- import java .util .concurrent .CountDownLatch ;
4342import java .util .concurrent .ScheduledExecutorService ;
4443import java .util .concurrent .ScheduledFuture ;
4544import java .util .concurrent .TimeUnit ;
@@ -244,10 +243,23 @@ public void close() {
244243 throw new IllegalStateException (e );
245244 }
246245
247- // Streaming pull is not enabled on the service yet.
248- // startStreamingConnections();
249- startPollingConnections ();
250- notifyStarted ();
246+ // When started, connections submit tasks to the executor.
247+ // These tasks must finish before the connections can declare themselves running.
248+ // If we have a single-thread executor and call startPollingConnections from the
249+ // same executor, it will deadlock: the thread will be stuck waiting for connections
250+ // to start but cannot start the connections.
251+ // For this reason, we spawn a dedicated thread. Starting subscriber should be rare.
252+ new Thread (new Runnable () {
253+ @ Override
254+ public void run () {
255+ try {
256+ startPollingConnections ();
257+ notifyStarted ();
258+ } catch (Throwable t ) {
259+ notifyFailed (t );
260+ }
261+ }
262+ }).start ();
251263 }
252264
253265 @ Override
@@ -387,25 +399,11 @@ private void stopAllPollingConnections() {
387399
388400 private void startConnections (
389401 List <? extends ApiService > connections , final ApiService .Listener connectionsListener ) {
390- final CountDownLatch subscribersStarting = new CountDownLatch (numChannels );
391- for (final ApiService subscriber : connections ) {
392- executor .submit (
393- new Runnable () {
394- @ Override
395- public void run () {
396- subscriber .addListener (connectionsListener , executor );
397- try {
398- subscriber .startAsync ().awaitRunning ();
399- } finally {
400- subscribersStarting .countDown ();
401- }
402- }
403- });
404- }
405- try {
406- subscribersStarting .await ();
407- } catch (InterruptedException e ) {
408- throw new RuntimeException (e );
402+ for (ApiService subscriber : connections ) {
403+ subscriber .addListener (connectionsListener , executor );
404+ // Starting each connection submits a blocking task to the executor.
405+ // We start connections one at a time to avoid swamping executor with blocking tasks.
406+ subscriber .startAsync ().awaitRunning ();
409407 }
410408 }
411409
@@ -415,26 +413,17 @@ private void stopConnections(List<? extends ApiService> connections) {
415413 liveConnections = new ArrayList <ApiService >(connections );
416414 connections .clear ();
417415 }
418- final CountDownLatch connectionsStopping = new CountDownLatch (liveConnections .size ());
419- for (final ApiService subscriberConnection : liveConnections ) {
420- executor .submit (
421- new Runnable () {
422- @ Override
423- public void run () {
424- try {
425- subscriberConnection .stopAsync ().awaitTerminated ();
426- } catch (IllegalStateException ignored ) {
427- // It is expected for some connections to be already in state failed so stop will
428- // throw this expection.
429- }
430- connectionsStopping .countDown ();
431- }
432- });
416+ for (ApiService subscriber : liveConnections ) {
417+ subscriber .stopAsync ();
433418 }
434- try {
435- connectionsStopping .await ();
436- } catch (InterruptedException e ) {
437- throw new IllegalStateException (e );
419+ for (ApiService subscriber : liveConnections ) {
420+ try {
421+ subscriber .awaitTerminated ();
422+ } catch (IllegalStateException e ) {
423+ // If the service fails, awaitTerminated will throw an exception.
424+ // However, we could be stopping services because at least one
425+ // has already failed, so we just ignore this exception.
426+ }
438427 }
439428 }
440429
0 commit comments