Skip to content

Commit 07f1c28

Browse files
committed
pubsub: start subscriber connections quickly again
This PR undoes part of #2055. Refreshing credentials blocks thread for a period of time. Previously, each channel has its own credentials, so starting many channels spams executor with many blocking tasks. but the downside is we take a long time to fully start: about 2 minutes on my machine. Now, all connections share the same credentials, so refreshing only happens once. We can start quickly again. Also rename setLeaseAlarmsExecutorProvider, since the executor is now also used for polling.
1 parent 3168ca3 commit 07f1c28

2 files changed

Lines changed: 8 additions & 7 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -335,9 +335,10 @@ private void startConnections(
335335
List<? extends ApiService> connections, final ApiService.Listener connectionsListener) {
336336
for (ApiService subscriber : connections) {
337337
subscriber.addListener(connectionsListener, executor);
338-
// Starting each connection submits a blocking task to the executor.
339-
// We start connections one at a time to avoid swamping executor with blocking tasks.
340-
subscriber.startAsync().awaitRunning();
338+
subscriber.startAsync();
339+
}
340+
for (ApiService subscriber : connections) {
341+
subscriber.awaitRunning();
341342
}
342343
}
343344

@@ -468,10 +469,10 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
468469
}
469470

470471
/**
471-
* Gives the ability to set a custom executor for managing lease extensions. If none is provided
472-
* a shared one will be used by all {@link Subscriber} instances.
472+
* Gives the ability to set a custom executor for polling and managing lease extensions. If none
473+
* is provided a shared one will be used by all {@link Subscriber} instances.
473474
*/
474-
public Builder setLeaseAlarmsExecutorProvider(ExecutorProvider executorProvider) {
475+
public Builder setSystemExecutorProvider(ExecutorProvider executorProvider) {
475476
this.systemExecutorProvider = Preconditions.checkNotNull(executorProvider);
476477
return this;
477478
}

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ private void sendMessages(Iterable<String> ackIds) throws InterruptedException {
536536
private Builder getTestSubscriberBuilder(MessageReceiver receiver) {
537537
return Subscriber.defaultBuilder(TEST_SUBSCRIPTION, receiver)
538538
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
539-
.setLeaseAlarmsExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
539+
.setSystemExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
540540
.setChannelProvider(FixedChannelProvider.create(testChannel))
541541
.setCredentialsProvider(PublisherImplTest.NO_CREDENTIALS_PROVIDER)
542542
.setClock(fakeExecutor.getClock());

0 commit comments

Comments
 (0)