Skip to content

Commit f049ea4

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 25979 b: refs/heads/pubsub-ordering-keys c: f5fae7e h: refs/heads/master i: 25977: d1db052 25975: a2c0d7f
1 parent 66c4dfb commit f049ea4

2 files changed

Lines changed: 16 additions & 9 deletions

File tree

  • branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ refs/tags/v0.72.0: a7703f2593ba312c0b2dde6fdfd4f5c764bb55ac
155155
refs/tags/v0.73.0: 21241ea8be9439cc5764c4944cdce21d34ce4f9e
156156
refs/tags/v0.74.0: 9d1f733dbbf790de7b494418523b69c4a9a57638
157157
refs/heads/ignoretest: 23c412ae07af3d0ab1caa2d44d5bc5c0ccb8b31d
158-
refs/heads/pubsub-ordering-keys: b24085700f045f878818380e9cbf6d056a710a33
158+
refs/heads/pubsub-ordering-keys: f5fae7efce06272dffd4f76b9cfa2ae13dee3c68
159159
"refs/heads/update_mvn_badge": ae2d773814db0f71197ccf5a8612ee1d8056f8de
160160
refs/tags/v0.75.0: c3673089ae09a897c1b4cf7dfe167fe4f8ab32fb
161161
refs/tags/v0.76.0: 395b016826d3ddf9cb8b34919636df15a4dbd032

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.io.IOException;
4646
import java.util.ArrayList;
4747
import java.util.List;
48+
import java.util.concurrent.Executors;
4849
import java.util.concurrent.ScheduledExecutorService;
4950
import java.util.logging.Level;
5051
import java.util.logging.Logger;
@@ -93,9 +94,6 @@ public class Subscriber extends AbstractApiService {
9394
private static final Duration UNARY_TIMEOUT = Duration.ofSeconds(60);
9495
private static final Duration ACK_EXPIRATION_PADDING = Duration.ofSeconds(5);
9596

96-
private static final ScheduledExecutorService SHARED_SYSTEM_EXECUTOR =
97-
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(6).build().getExecutor();
98-
9997
private static final Logger logger = Logger.getLogger(Subscriber.class.getName());
10098

10199
private final String subscriptionName;
@@ -132,6 +130,8 @@ private Subscriber(Builder builder) {
132130
.setLimitExceededBehavior(LimitExceededBehavior.ThrowException)
133131
.build());
134132

133+
this.numPullers = builder.parallelPullCount;
134+
135135
executor = builder.executorProvider.getExecutor();
136136
if (builder.executorProvider.shouldAutoClose()) {
137137
closeables.add(
@@ -142,8 +142,16 @@ public void close() throws IOException {
142142
}
143143
});
144144
}
145-
alarmsExecutor = builder.systemExecutorProvider.getExecutor();
146-
if (builder.systemExecutorProvider.shouldAutoClose()) {
145+
146+
ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider;
147+
if (systemExecutorProvider == null) {
148+
systemExecutorProvider =
149+
FixedExecutorProvider.create(
150+
Executors.newScheduledThreadPool(Math.max(6, 2 * numPullers)));
151+
}
152+
153+
alarmsExecutor = systemExecutorProvider.getExecutor();
154+
if (systemExecutorProvider.shouldAutoClose()) {
147155
closeables.add(
148156
new AutoCloseable() {
149157
@Override
@@ -153,7 +161,6 @@ public void close() throws IOException {
153161
});
154162
}
155163

156-
this.numPullers = builder.parallelPullCount;
157164
TransportChannelProvider channelProvider = builder.channelProvider;
158165
if (channelProvider.acceptsPoolSize()) {
159166
channelProvider = channelProvider.withPoolSize(numPullers);
@@ -162,7 +169,7 @@ public void close() throws IOException {
162169
try {
163170
this.subStubSettings =
164171
SubscriberStubSettings.newBuilder()
165-
.setExecutorProvider(FixedExecutorProvider.create(alarmsExecutor))
172+
.setExecutorProvider(systemExecutorProvider)
166173
.setCredentialsProvider(builder.credentialsProvider)
167174
.setTransportChannelProvider(channelProvider)
168175
.setHeaderProvider(builder.headerProvider)
@@ -404,7 +411,7 @@ public static final class Builder {
404411
FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).build();
405412

406413
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
407-
ExecutorProvider systemExecutorProvider = FixedExecutorProvider.create(SHARED_SYSTEM_EXECUTOR);
414+
ExecutorProvider systemExecutorProvider = null;
408415
TransportChannelProvider channelProvider =
409416
SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder()
410417
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)

0 commit comments

Comments
 (0)