Skip to content

Commit 4928775

Browse files
committed
pubsub: use one thread to start/stop connections
Previously we submit a bunch of tasks which is unnecessary and deadlock-prone. This does not solve the one-thread-deadlock problem, but it changes the symptom. In my testing, Subscriber now successfully pulls one batch of messages before failing due to DEADLINE_EXCEEDED error. This is obviously undesirable but at least it doesn't deadlock anymore. This is my best attempt at reconstructing the events: 1. A bunch of tasks get registered to [GetSubscription](https://github.com/GoogleCloudPlatform/google-cloud-java/blob/9623994f58199c79ee5b9f99ad0ff6d7fb69bd84/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java#L105). 2. At least one of these tasks succeed and continue to [pull messsages](https://github.com/GoogleCloudPlatform/google-cloud-java/blob/9623994f58199c79ee5b9f99ad0ff6d7fb69bd84/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java#L132). 3. Something somewhere register a very long-running task (~30sec on my machine) to the pool. Stack trace (obtained by `jstack`) revealed that the pool is busy doing something, but I don't know what it is yet. 4. A message is pulled and the task of printing it to the screen is registered to the pool. It cannot run yet because the long-running task is using the only thread available. 5. At least one of tasks that called GetSubscription timed out. 6. Long running task timed out or completed. 7. Message finally printed to the screen. 8. Because time out on GetSubscription is not considered retryable, the Subscriber fails. It only failed after 30sec because the task to fail it is also blocked behind the long-running task. Updates #1827 and #2041.
1 parent 8293249 commit 4928775

1 file changed

Lines changed: 14 additions & 37 deletions

File tree

  • google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 14 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -387,25 +387,12 @@ private void stopAllPollingConnections() {
387387

388388
private void startConnections(
389389
List<? extends ApiService> connections, final ApiService.Listener connectionsListener) {
390-
final CountDownLatch subscribersStarting = new CountDownLatch(numChannels);
391-
for (final ApiService subscriber : connections) {
392-
executor.submit(
393-
new Runnable() {
394-
@Override
395-
public void run() {
396-
subscriber.addListener(connectionsListener, executor);
397-
try {
398-
subscriber.startAsync().awaitRunning();
399-
} finally {
400-
subscribersStarting.countDown();
401-
}
402-
}
403-
});
390+
for (ApiService subscriber : connections) {
391+
subscriber.addListener(connectionsListener, executor);
392+
subscriber.startAsync();
404393
}
405-
try {
406-
subscribersStarting.await();
407-
} catch (InterruptedException e) {
408-
throw new RuntimeException(e);
394+
for (ApiService subscriber : connections) {
395+
subscriber.awaitRunning();
409396
}
410397
}
411398

@@ -415,26 +402,16 @@ private void stopConnections(List<? extends ApiService> connections) {
415402
liveConnections = new ArrayList<ApiService>(connections);
416403
connections.clear();
417404
}
418-
final CountDownLatch connectionsStopping = new CountDownLatch(liveConnections.size());
419-
for (final ApiService subscriberConnection : liveConnections) {
420-
executor.submit(
421-
new Runnable() {
422-
@Override
423-
public void run() {
424-
try {
425-
subscriberConnection.stopAsync().awaitTerminated();
426-
} catch (IllegalStateException ignored) {
427-
// It is expected for some connections to be already in state failed so stop will
428-
// throw this expection.
429-
}
430-
connectionsStopping.countDown();
431-
}
432-
});
405+
for (ApiService subscriber : liveConnections) {
406+
subscriber.stopAsync();
433407
}
434-
try {
435-
connectionsStopping.await();
436-
} catch (InterruptedException e) {
437-
throw new IllegalStateException(e);
408+
for (ApiService subscriber : liveConnections) {
409+
try {
410+
subscriber.awaitTerminated();
411+
} catch (IllegalStateException e) {
412+
// It is expected for some connections to be already in state failed so stop will
413+
// throw this expection.
414+
}
438415
}
439416
}
440417

0 commit comments

Comments
 (0)