Skip to content

Commit a30b55c

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 19425 b: refs/heads/autosynth-containeranalysis c: f5fae7e h: refs/heads/master i: 19423: 0ec3734
1 parent c60e22c commit a30b55c

2 files changed

Lines changed: 16 additions & 9 deletions

File tree

  • branches/autosynth-containeranalysis/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ refs/heads/autosynth-bigquerydatatransfer: 564833a85642d4194adc025f021e10e723154
127127
refs/heads/autosynth-bigquerystorage: a75c34ed7a11741669121be69a7021a00f1133ce
128128
refs/heads/autosynth-bigtable: 45891a7178142c84a16d6a0b792f8ecb950a3159
129129
refs/heads/autosynth-bigtable-admin: 6379a2bc712f2736c83de0e009b4d26da4fa82ca
130-
refs/heads/autosynth-containeranalysis: b24085700f045f878818380e9cbf6d056a710a33
130+
refs/heads/autosynth-containeranalysis: f5fae7efce06272dffd4f76b9cfa2ae13dee3c68
131131
refs/heads/autosynth-datastore: 9acd400b484d6691a080c9152a331d88d24fefc1
132132
refs/heads/autosynth-dialogflow: d7477419376eac9b6dcc7dbcede581152527351d
133133
refs/heads/autosynth-errorreporting: 9891e73a56af7c097829ca7a521b0e862ba6af30

branches/autosynth-containeranalysis/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.io.IOException;
4646
import java.util.ArrayList;
4747
import java.util.List;
48+
import java.util.concurrent.Executors;
4849
import java.util.concurrent.ScheduledExecutorService;
4950
import java.util.logging.Level;
5051
import java.util.logging.Logger;
@@ -93,9 +94,6 @@ public class Subscriber extends AbstractApiService {
9394
private static final Duration UNARY_TIMEOUT = Duration.ofSeconds(60);
9495
private static final Duration ACK_EXPIRATION_PADDING = Duration.ofSeconds(5);
9596

96-
private static final ScheduledExecutorService SHARED_SYSTEM_EXECUTOR =
97-
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(6).build().getExecutor();
98-
9997
private static final Logger logger = Logger.getLogger(Subscriber.class.getName());
10098

10199
private final String subscriptionName;
@@ -132,6 +130,8 @@ private Subscriber(Builder builder) {
132130
.setLimitExceededBehavior(LimitExceededBehavior.ThrowException)
133131
.build());
134132

133+
this.numPullers = builder.parallelPullCount;
134+
135135
executor = builder.executorProvider.getExecutor();
136136
if (builder.executorProvider.shouldAutoClose()) {
137137
closeables.add(
@@ -142,8 +142,16 @@ public void close() throws IOException {
142142
}
143143
});
144144
}
145-
alarmsExecutor = builder.systemExecutorProvider.getExecutor();
146-
if (builder.systemExecutorProvider.shouldAutoClose()) {
145+
146+
ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider;
147+
if (systemExecutorProvider == null) {
148+
systemExecutorProvider =
149+
FixedExecutorProvider.create(
150+
Executors.newScheduledThreadPool(Math.max(6, 2 * numPullers)));
151+
}
152+
153+
alarmsExecutor = systemExecutorProvider.getExecutor();
154+
if (systemExecutorProvider.shouldAutoClose()) {
147155
closeables.add(
148156
new AutoCloseable() {
149157
@Override
@@ -153,7 +161,6 @@ public void close() throws IOException {
153161
});
154162
}
155163

156-
this.numPullers = builder.parallelPullCount;
157164
TransportChannelProvider channelProvider = builder.channelProvider;
158165
if (channelProvider.acceptsPoolSize()) {
159166
channelProvider = channelProvider.withPoolSize(numPullers);
@@ -162,7 +169,7 @@ public void close() throws IOException {
162169
try {
163170
this.subStubSettings =
164171
SubscriberStubSettings.newBuilder()
165-
.setExecutorProvider(FixedExecutorProvider.create(alarmsExecutor))
172+
.setExecutorProvider(systemExecutorProvider)
166173
.setCredentialsProvider(builder.credentialsProvider)
167174
.setTransportChannelProvider(channelProvider)
168175
.setHeaderProvider(builder.headerProvider)
@@ -404,7 +411,7 @@ public static final class Builder {
404411
FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).build();
405412

406413
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
407-
ExecutorProvider systemExecutorProvider = FixedExecutorProvider.create(SHARED_SYSTEM_EXECUTOR);
414+
ExecutorProvider systemExecutorProvider = null;
408415
TransportChannelProvider channelProvider =
409416
SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder()
410417
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)

0 commit comments

Comments
 (0)