Skip to content

Commit f4fbb06

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 25451 b: refs/heads/autosynth-texttospeech c: 67bf8fc h: refs/heads/master i: 25449: 401fda2 25447: 33075e5
1 parent af5067b commit f4fbb06

2 files changed

Lines changed: 23 additions & 16 deletions

File tree

  • branches/autosynth-texttospeech/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ refs/heads/autosynth-scheduler: 57f9fdb1e7de30c85f4ec7198931a07f50603e55
140140
refs/heads/autosynth-spanner: cbd30ccc550e9d0419ce4e5e6cfef4951ea170b1
141141
refs/heads/autosynth-speech: 64692f6db11364f663921be02c08072b966b6e7b
142142
refs/heads/autosynth-tasks: eb03eeab747e925175890db923945384d89b273a
143-
refs/heads/autosynth-texttospeech: 257aa8ea9d3bc3fcb55440cbdd3f550a3400c2be
143+
refs/heads/autosynth-texttospeech: 67bf8fc05b6cdd789f8c5c029de36501670663b2
144144
refs/heads/autosynth-trace: 8804c46bfe147702ee9c95669f17f42d3790cf23
145145
refs/heads/autosynth-websecurityscanner: a3c778316a0f78f7ad4bac3dc3721da5ca832d3c
146146
refs/heads/bigquerystorage: 06db74d123d7f8a3ef48755c2fcabed09faf8e64

branches/autosynth-texttospeech/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,10 @@ public class Subscriber extends AbstractApiService {
9999
private final String subscriptionName;
100100
private final FlowControlSettings flowControlSettings;
101101
private final Duration maxAckExtensionPeriod;
102-
private final ScheduledExecutorService executor;
102+
// The ExecutorProvider used to generate executors for processing messages.
103+
private final ExecutorProvider executorProvider;
104+
// An instantiation of the SystemExecutorProvider used for processing acks
105+
// and other system actions.
103106
@Nullable private final ScheduledExecutorService alarmsExecutor;
104107
private final Distribution ackLatencyDistribution =
105108
new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);
@@ -132,16 +135,7 @@ private Subscriber(Builder builder) {
132135

133136
this.numPullers = builder.parallelPullCount;
134137

135-
executor = builder.executorProvider.getExecutor();
136-
if (builder.executorProvider.shouldAutoClose()) {
137-
closeables.add(
138-
new AutoCloseable() {
139-
@Override
140-
public void close() throws IOException {
141-
executor.shutdown();
142-
}
143-
});
144-
}
138+
executorProvider = builder.executorProvider;
145139

146140
ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider;
147141
if (systemExecutorProvider == null) {
@@ -322,6 +316,17 @@ public void run() {
322316
private void startStreamingConnections() {
323317
synchronized (streamingSubscriberConnections) {
324318
for (int i = 0; i < numPullers; i++) {
319+
final ScheduledExecutorService executor = executorProvider.getExecutor();
320+
if (executorProvider.shouldAutoClose()) {
321+
closeables.add(
322+
new AutoCloseable() {
323+
@Override
324+
public void close() {
325+
executor.shutdown();
326+
}
327+
});
328+
}
329+
325330
streamingSubscriberConnections.add(
326331
new StreamingSubscriberConnection(
327332
subscriptionName,
@@ -364,7 +369,7 @@ private void stopAllStreamingConnections() {
364369
private void startConnections(
365370
List<? extends ApiService> connections, final ApiService.Listener connectionsListener) {
366371
for (ApiService subscriber : connections) {
367-
subscriber.addListener(connectionsListener, executor);
372+
subscriber.addListener(connectionsListener, alarmsExecutor);
368373
subscriber.startAsync();
369374
}
370375
for (ApiService subscriber : connections) {
@@ -398,8 +403,7 @@ public static final class Builder {
398403

399404
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
400405
InstantiatingExecutorProvider.newBuilder()
401-
.setExecutorThreadCount(
402-
THREADS_PER_CHANNEL * Runtime.getRuntime().availableProcessors())
406+
.setExecutorThreadCount(THREADS_PER_CHANNEL)
403407
.build();
404408

405409
String subscriptionName;
@@ -502,7 +506,10 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
502506
return this;
503507
}
504508

505-
/** Gives the ability to set a custom executor. */
509+
/**
510+
* Gives the ability to set a custom executor. {@link ExecutorProvider#getExecutor()} will be
511+
* called {@link Builder#parallelPullCount} times.
512+
*/
506513
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
507514
this.executorProvider = Preconditions.checkNotNull(executorProvider);
508515
return this;

0 commit comments

Comments
 (0)