Skip to content

Commit 343cc0e

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 27387 b: refs/heads/mrschmidt-collectiongroup c: 67bf8fc h: refs/heads/master i: 27385: 2e6be49 27383: bc61f74
1 parent 953e38f commit 343cc0e

2 files changed

Lines changed: 23 additions & 16 deletions

File tree

  • branches/mrschmidt-collectiongroup/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ refs/heads/autosynth-securitycenter: f2f2feb8f3949c986d37e55cd23cdb0ce774f287
173173
refs/heads/autosynth-talent: 4ca901879f86aab61091cea52e8a9b653639df24
174174
refs/heads/cscc-samples: 620d105e6b574cfeeee04e413a157b7bd34ebc8b
175175
refs/heads/igorbernstein2-patch-1: f62464ee14df1e44a3b173cdc3976563d1b3078b
176-
refs/heads/mrschmidt-collectiongroup: 257aa8ea9d3bc3fcb55440cbdd3f550a3400c2be
176+
refs/heads/mrschmidt-collectiongroup: 67bf8fc05b6cdd789f8c5c029de36501670663b2
177177
refs/heads/release-google-cloud-java-v0.83.0: 4b55ec1b81b3886ede61ae868391a3cdf7eed90e
178178
refs/heads/release-google-cloud-java-v0.83.1-SNAPSHOT: 8d6db7ee534d12b1df38d8cf314871df76f87577
179179
refs/heads/v4support: aa837fd70877f5a0628d8036e88952db035b792c

branches/mrschmidt-collectiongroup/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,10 @@ public class Subscriber extends AbstractApiService {
9999
private final String subscriptionName;
100100
private final FlowControlSettings flowControlSettings;
101101
private final Duration maxAckExtensionPeriod;
102-
private final ScheduledExecutorService executor;
102+
// The ExecutorProvider used to generate executors for processing messages.
103+
private final ExecutorProvider executorProvider;
104+
// An instantiation of the SystemExecutorProvider used for processing acks
105+
// and other system actions.
103106
@Nullable private final ScheduledExecutorService alarmsExecutor;
104107
private final Distribution ackLatencyDistribution =
105108
new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);
@@ -132,16 +135,7 @@ private Subscriber(Builder builder) {
132135

133136
this.numPullers = builder.parallelPullCount;
134137

135-
executor = builder.executorProvider.getExecutor();
136-
if (builder.executorProvider.shouldAutoClose()) {
137-
closeables.add(
138-
new AutoCloseable() {
139-
@Override
140-
public void close() throws IOException {
141-
executor.shutdown();
142-
}
143-
});
144-
}
138+
executorProvider = builder.executorProvider;
145139

146140
ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider;
147141
if (systemExecutorProvider == null) {
@@ -322,6 +316,17 @@ public void run() {
322316
private void startStreamingConnections() {
323317
synchronized (streamingSubscriberConnections) {
324318
for (int i = 0; i < numPullers; i++) {
319+
final ScheduledExecutorService executor = executorProvider.getExecutor();
320+
if (executorProvider.shouldAutoClose()) {
321+
closeables.add(
322+
new AutoCloseable() {
323+
@Override
324+
public void close() {
325+
executor.shutdown();
326+
}
327+
});
328+
}
329+
325330
streamingSubscriberConnections.add(
326331
new StreamingSubscriberConnection(
327332
subscriptionName,
@@ -364,7 +369,7 @@ private void stopAllStreamingConnections() {
364369
private void startConnections(
365370
List<? extends ApiService> connections, final ApiService.Listener connectionsListener) {
366371
for (ApiService subscriber : connections) {
367-
subscriber.addListener(connectionsListener, executor);
372+
subscriber.addListener(connectionsListener, alarmsExecutor);
368373
subscriber.startAsync();
369374
}
370375
for (ApiService subscriber : connections) {
@@ -398,8 +403,7 @@ public static final class Builder {
398403

399404
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
400405
InstantiatingExecutorProvider.newBuilder()
401-
.setExecutorThreadCount(
402-
THREADS_PER_CHANNEL * Runtime.getRuntime().availableProcessors())
406+
.setExecutorThreadCount(THREADS_PER_CHANNEL)
403407
.build();
404408

405409
String subscriptionName;
@@ -502,7 +506,10 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
502506
return this;
503507
}
504508

505-
/** Gives the ability to set a custom executor. */
509+
/**
510+
* Gives the ability to set a custom executor. {@link ExecutorProvider#getExecutor()} will be
511+
* called {@link Builder#parallelPullCount} times.
512+
*/
506513
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
507514
this.executorProvider = Preconditions.checkNotNull(executorProvider);
508515
return this;

0 commit comments

Comments
 (0)