Skip to content

Commit 5ce3af7

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 22283 b: refs/heads/autosynth-asset c: f5fae7e h: refs/heads/master i: 22281: 5f26d4f 22279: 5a9a515
1 parent c85d4c3 commit 5ce3af7

2 files changed

Lines changed: 16 additions & 9 deletions

File tree

  • branches/autosynth-asset/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ refs/heads/autosynth-vision: 3866c747a5e87b5dfd530d7134907a7ed1fb16de
120120
refs/heads/spanner: b01127f885b4611bf1852abb0ce481eeb7fcc131
121121
refs/tags/v0.68.0: 9cc799fcf68c82ab431d425fefa58ef615ce8e5b
122122
refs/tags/v0.69.0: 78f67a29e8b9c46ba01de566a2eae0fd1c03edea
123-
refs/heads/autosynth-asset: b24085700f045f878818380e9cbf6d056a710a33
123+
refs/heads/autosynth-asset: f5fae7efce06272dffd4f76b9cfa2ae13dee3c68
124124
refs/heads/autosynth-automl: 2a8b018cf05811fd472e5d1053e67a12ceec0b64
125125
refs/heads/autosynth-bigquerydatatransfer: 564833a85642d4194adc025f021e10e723154246
126126
refs/heads/autosynth-bigquerystorage: a75c34ed7a11741669121be69a7021a00f1133ce

branches/autosynth-asset/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.io.IOException;
4646
import java.util.ArrayList;
4747
import java.util.List;
48+
import java.util.concurrent.Executors;
4849
import java.util.concurrent.ScheduledExecutorService;
4950
import java.util.logging.Level;
5051
import java.util.logging.Logger;
@@ -93,9 +94,6 @@ public class Subscriber extends AbstractApiService {
9394
private static final Duration UNARY_TIMEOUT = Duration.ofSeconds(60);
9495
private static final Duration ACK_EXPIRATION_PADDING = Duration.ofSeconds(5);
9596

96-
private static final ScheduledExecutorService SHARED_SYSTEM_EXECUTOR =
97-
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(6).build().getExecutor();
98-
9997
private static final Logger logger = Logger.getLogger(Subscriber.class.getName());
10098

10199
private final String subscriptionName;
@@ -132,6 +130,8 @@ private Subscriber(Builder builder) {
132130
.setLimitExceededBehavior(LimitExceededBehavior.ThrowException)
133131
.build());
134132

133+
this.numPullers = builder.parallelPullCount;
134+
135135
executor = builder.executorProvider.getExecutor();
136136
if (builder.executorProvider.shouldAutoClose()) {
137137
closeables.add(
@@ -142,8 +142,16 @@ public void close() throws IOException {
142142
}
143143
});
144144
}
145-
alarmsExecutor = builder.systemExecutorProvider.getExecutor();
146-
if (builder.systemExecutorProvider.shouldAutoClose()) {
145+
146+
ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider;
147+
if (systemExecutorProvider == null) {
148+
systemExecutorProvider =
149+
FixedExecutorProvider.create(
150+
Executors.newScheduledThreadPool(Math.max(6, 2 * numPullers)));
151+
}
152+
153+
alarmsExecutor = systemExecutorProvider.getExecutor();
154+
if (systemExecutorProvider.shouldAutoClose()) {
147155
closeables.add(
148156
new AutoCloseable() {
149157
@Override
@@ -153,7 +161,6 @@ public void close() throws IOException {
153161
});
154162
}
155163

156-
this.numPullers = builder.parallelPullCount;
157164
TransportChannelProvider channelProvider = builder.channelProvider;
158165
if (channelProvider.acceptsPoolSize()) {
159166
channelProvider = channelProvider.withPoolSize(numPullers);
@@ -162,7 +169,7 @@ public void close() throws IOException {
162169
try {
163170
this.subStubSettings =
164171
SubscriberStubSettings.newBuilder()
165-
.setExecutorProvider(FixedExecutorProvider.create(alarmsExecutor))
172+
.setExecutorProvider(systemExecutorProvider)
166173
.setCredentialsProvider(builder.credentialsProvider)
167174
.setTransportChannelProvider(channelProvider)
168175
.setHeaderProvider(builder.headerProvider)
@@ -404,7 +411,7 @@ public static final class Builder {
404411
FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).build();
405412

406413
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
407-
ExecutorProvider systemExecutorProvider = FixedExecutorProvider.create(SHARED_SYSTEM_EXECUTOR);
414+
ExecutorProvider systemExecutorProvider = null;
408415
TransportChannelProvider channelProvider =
409416
SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder()
410417
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)

0 commit comments

Comments
 (0)