Skip to content

Commit 4c988ec

Browse files
committed
pubsub: start connections slowly
Starting a new connection seems to add a task which blocks a thread in our thread pool. It does not block for long, but Subscriber starts many connections, slamming the pool. If the number of threads the pool is low, the pool is blocked for a long time, causing strange delays and RPC failures. This PR alleviates this problem by starting connections one at a time. - Each connection now make one successful call before declaring itself running. - The Subscriber waits for a connection to be in running state before starting another. Together, these changes limit the number of blocking tasks registered to the pool. Since starting the Subscriber can now take a significant amount of time, and `doStart` should return promptly, a new thread is used to start up the connections.
1 parent 48da998 commit 4c988ec

2 files changed

Lines changed: 25 additions & 48 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,6 @@ public PollingSubscriberConnection(
9999
@Override
100100
protected void doStart() {
101101
logger.config("Starting subscriber.");
102-
initialize();
103-
notifyStarted();
104-
}
105-
106-
private void initialize() {
107102
ListenableFuture<Subscription> subscriptionInfo =
108103
stub.withDeadlineAfter(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
109104
.getSubscription(
@@ -116,6 +111,7 @@ private void initialize() {
116111
public void onSuccess(Subscription result) {
117112
messageDispatcher.setMessageDeadlineSeconds(result.getAckDeadlineSeconds());
118113
pullMessages(INITIAL_BACKOFF);
114+
notifyStarted();
119115
}
120116

121117
@Override

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 24 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import java.io.IOException;
4040
import java.util.ArrayList;
4141
import java.util.List;
42-
import java.util.concurrent.CountDownLatch;
4342
import java.util.concurrent.ScheduledExecutorService;
4443
import java.util.concurrent.ScheduledFuture;
4544
import java.util.concurrent.TimeUnit;
@@ -244,10 +243,17 @@ public void close() {
244243
throw new IllegalStateException(e);
245244
}
246245

247-
// Streaming pull is not enabled on the service yet.
248-
// startStreamingConnections();
249-
startPollingConnections();
250-
notifyStarted();
246+
new Thread(new Runnable() {
247+
@Override
248+
public void run() {
249+
try {
250+
startPollingConnections();
251+
notifyStarted();
252+
} catch (Throwable t) {
253+
notifyFailed(t);
254+
}
255+
}
256+
}).start();
251257
}
252258

253259
@Override
@@ -387,25 +393,9 @@ private void stopAllPollingConnections() {
387393

388394
private void startConnections(
389395
List<? extends ApiService> connections, final ApiService.Listener connectionsListener) {
390-
final CountDownLatch subscribersStarting = new CountDownLatch(numChannels);
391-
for (final ApiService subscriber : connections) {
392-
executor.submit(
393-
new Runnable() {
394-
@Override
395-
public void run() {
396-
subscriber.addListener(connectionsListener, executor);
397-
try {
398-
subscriber.startAsync().awaitRunning();
399-
} finally {
400-
subscribersStarting.countDown();
401-
}
402-
}
403-
});
404-
}
405-
try {
406-
subscribersStarting.await();
407-
} catch (InterruptedException e) {
408-
throw new RuntimeException(e);
396+
for (ApiService subscriber : connections) {
397+
subscriber.addListener(connectionsListener, executor);
398+
subscriber.startAsync().awaitRunning();
409399
}
410400
}
411401

@@ -415,26 +405,17 @@ private void stopConnections(List<? extends ApiService> connections) {
415405
liveConnections = new ArrayList<ApiService>(connections);
416406
connections.clear();
417407
}
418-
final CountDownLatch connectionsStopping = new CountDownLatch(liveConnections.size());
419-
for (final ApiService subscriberConnection : liveConnections) {
420-
executor.submit(
421-
new Runnable() {
422-
@Override
423-
public void run() {
424-
try {
425-
subscriberConnection.stopAsync().awaitTerminated();
426-
} catch (IllegalStateException ignored) {
427-
// It is expected for some connections to be already in state failed so stop will
428-
// throw this expection.
429-
}
430-
connectionsStopping.countDown();
431-
}
432-
});
408+
for (ApiService subscriber : liveConnections) {
409+
subscriber.stopAsync();
433410
}
434-
try {
435-
connectionsStopping.await();
436-
} catch (InterruptedException e) {
437-
throw new IllegalStateException(e);
411+
for (ApiService subscriber : liveConnections) {
412+
try {
413+
subscriber.awaitTerminated();
414+
} catch (IllegalStateException e) {
415+
// If the service fails, awaitTerminated will throw an exception.
416+
// However, we could be stopping services because at least one
417+
// has already failed, so we just ignore this exception.
418+
}
438419
}
439420
}
440421

0 commit comments

Comments
 (0)