Skip to content

Commit 0387022

Browse files
authored
---
yaml --- r: 5697 b: refs/heads/master c: 5af4a46 h: refs/heads/master i: 5695: 67eb9ab
1 parent 0d1ed60 commit 0387022

3 files changed

Lines changed: 34 additions & 49 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
refs/heads/master: fdea40dce6514ac3dbb9687ed183afbefaf9d4d3
2+
refs/heads/master: 5af4a462bfc42a07e02966d51ddc55e22df94207
33
refs/heads/travis: dae77e558b884bc1b165155482d76c8e40b0fca4
44
refs/heads/gh-pages: f8ea70cdc599a5d39c2df480280877afb3bef9bd
55
refs/tags/0.0.9: 22f1839238f66c39e67ed4dfdcd273b1ae2e8444

trunk/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,6 @@ public PollingSubscriberConnection(
9999
@Override
100100
protected void doStart() {
101101
logger.config("Starting subscriber.");
102-
initialize();
103-
notifyStarted();
104-
}
105-
106-
private void initialize() {
107102
ListenableFuture<Subscription> subscriptionInfo =
108103
stub.withDeadlineAfter(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
109104
.getSubscription(
@@ -116,6 +111,7 @@ private void initialize() {
116111
public void onSuccess(Subscription result) {
117112
messageDispatcher.setMessageDeadlineSeconds(result.getAckDeadlineSeconds());
118113
pullMessages(INITIAL_BACKOFF);
114+
notifyStarted();
119115
}
120116

121117
@Override

trunk/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 32 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import java.io.IOException;
4040
import java.util.ArrayList;
4141
import java.util.List;
42-
import java.util.concurrent.CountDownLatch;
4342
import java.util.concurrent.ScheduledExecutorService;
4443
import java.util.concurrent.ScheduledFuture;
4544
import java.util.concurrent.TimeUnit;
@@ -244,10 +243,23 @@ public void close() {
244243
throw new IllegalStateException(e);
245244
}
246245

247-
// Streaming pull is not enabled on the service yet.
248-
// startStreamingConnections();
249-
startPollingConnections();
250-
notifyStarted();
246+
// When started, connections submit tasks to the executor.
247+
// These tasks must finish before the connections can declare themselves running.
248+
// If we have a single-thread executor and call startPollingConnections from the
249+
// same executor, it will deadlock: the thread will be stuck waiting for connections
250+
// to start but cannot start the connections.
251+
// For this reason, we spawn a dedicated thread. Starting subscriber should be rare.
252+
new Thread(new Runnable() {
253+
@Override
254+
public void run() {
255+
try {
256+
startPollingConnections();
257+
notifyStarted();
258+
} catch (Throwable t) {
259+
notifyFailed(t);
260+
}
261+
}
262+
}).start();
251263
}
252264

253265
@Override
@@ -387,25 +399,11 @@ private void stopAllPollingConnections() {
387399

388400
private void startConnections(
389401
List<? extends ApiService> connections, final ApiService.Listener connectionsListener) {
390-
final CountDownLatch subscribersStarting = new CountDownLatch(numChannels);
391-
for (final ApiService subscriber : connections) {
392-
executor.submit(
393-
new Runnable() {
394-
@Override
395-
public void run() {
396-
subscriber.addListener(connectionsListener, executor);
397-
try {
398-
subscriber.startAsync().awaitRunning();
399-
} finally {
400-
subscribersStarting.countDown();
401-
}
402-
}
403-
});
404-
}
405-
try {
406-
subscribersStarting.await();
407-
} catch (InterruptedException e) {
408-
throw new RuntimeException(e);
402+
for (ApiService subscriber : connections) {
403+
subscriber.addListener(connectionsListener, executor);
404+
// Starting each connection submits a blocking task to the executor.
405+
// We start connections one at a time to avoid swamping executor with blocking tasks.
406+
subscriber.startAsync().awaitRunning();
409407
}
410408
}
411409

@@ -415,26 +413,17 @@ private void stopConnections(List<? extends ApiService> connections) {
415413
liveConnections = new ArrayList<ApiService>(connections);
416414
connections.clear();
417415
}
418-
final CountDownLatch connectionsStopping = new CountDownLatch(liveConnections.size());
419-
for (final ApiService subscriberConnection : liveConnections) {
420-
executor.submit(
421-
new Runnable() {
422-
@Override
423-
public void run() {
424-
try {
425-
subscriberConnection.stopAsync().awaitTerminated();
426-
} catch (IllegalStateException ignored) {
427-
// It is expected for some connections to be already in state failed so stop will
428-
// throw this expection.
429-
}
430-
connectionsStopping.countDown();
431-
}
432-
});
416+
for (ApiService subscriber : liveConnections) {
417+
subscriber.stopAsync();
433418
}
434-
try {
435-
connectionsStopping.await();
436-
} catch (InterruptedException e) {
437-
throw new IllegalStateException(e);
419+
for (ApiService subscriber : liveConnections) {
420+
try {
421+
subscriber.awaitTerminated();
422+
} catch (IllegalStateException e) {
423+
// If the service fails, awaitTerminated will throw an exception.
424+
// However, we could be stopping services because at least one
425+
// has already failed, so we just ignore this exception.
426+
}
438427
}
439428
}
440429

0 commit comments

Comments
 (0)