Skip to content

Commit 89d3cdc

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 23087 b: refs/heads/autosynth-containeranalysis c: 67bf8fc h: refs/heads/master i: 23085: 40edb0c 23083: 0fc5b16 23079: 4ee94c4 23071: 898266f
1 parent 2ee45b8 commit 89d3cdc

2 files changed

Lines changed: 23 additions & 16 deletions

File tree

  • branches/autosynth-containeranalysis/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ refs/heads/autosynth-bigquerydatatransfer: d88aa5aae5fd9d3c6d75bbab1a05162c6d4d9
126126
refs/heads/autosynth-bigquerystorage: d2c53da3b012e38c662e4df0738042435f19365f
127127
refs/heads/autosynth-bigtable: 9e5429f45cf9face9fed585d0233534993e36b58
128128
refs/heads/autosynth-bigtable-admin: 6379a2bc712f2736c83de0e009b4d26da4fa82ca
129-
refs/heads/autosynth-containeranalysis: 257aa8ea9d3bc3fcb55440cbdd3f550a3400c2be
129+
refs/heads/autosynth-containeranalysis: 67bf8fc05b6cdd789f8c5c029de36501670663b2
130130
refs/heads/autosynth-datastore: 9acd400b484d6691a080c9152a331d88d24fefc1
131131
refs/heads/autosynth-dialogflow: 7dbc2c1ea714328ccfa4f33645045f017ff080e7
132132
refs/heads/autosynth-errorreporting: 1101a04e8be074802c35332d5fcf8297f61cae32

branches/autosynth-containeranalysis/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,10 @@ public class Subscriber extends AbstractApiService {
9999
private final String subscriptionName;
100100
private final FlowControlSettings flowControlSettings;
101101
private final Duration maxAckExtensionPeriod;
102-
private final ScheduledExecutorService executor;
102+
// The ExecutorProvider used to generate executors for processing messages.
103+
private final ExecutorProvider executorProvider;
104+
// An instantiation of the SystemExecutorProvider used for processing acks
105+
// and other system actions.
103106
@Nullable private final ScheduledExecutorService alarmsExecutor;
104107
private final Distribution ackLatencyDistribution =
105108
new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);
@@ -132,16 +135,7 @@ private Subscriber(Builder builder) {
132135

133136
this.numPullers = builder.parallelPullCount;
134137

135-
executor = builder.executorProvider.getExecutor();
136-
if (builder.executorProvider.shouldAutoClose()) {
137-
closeables.add(
138-
new AutoCloseable() {
139-
@Override
140-
public void close() throws IOException {
141-
executor.shutdown();
142-
}
143-
});
144-
}
138+
executorProvider = builder.executorProvider;
145139

146140
ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider;
147141
if (systemExecutorProvider == null) {
@@ -322,6 +316,17 @@ public void run() {
322316
private void startStreamingConnections() {
323317
synchronized (streamingSubscriberConnections) {
324318
for (int i = 0; i < numPullers; i++) {
319+
final ScheduledExecutorService executor = executorProvider.getExecutor();
320+
if (executorProvider.shouldAutoClose()) {
321+
closeables.add(
322+
new AutoCloseable() {
323+
@Override
324+
public void close() {
325+
executor.shutdown();
326+
}
327+
});
328+
}
329+
325330
streamingSubscriberConnections.add(
326331
new StreamingSubscriberConnection(
327332
subscriptionName,
@@ -364,7 +369,7 @@ private void stopAllStreamingConnections() {
364369
private void startConnections(
365370
List<? extends ApiService> connections, final ApiService.Listener connectionsListener) {
366371
for (ApiService subscriber : connections) {
367-
subscriber.addListener(connectionsListener, executor);
372+
subscriber.addListener(connectionsListener, alarmsExecutor);
368373
subscriber.startAsync();
369374
}
370375
for (ApiService subscriber : connections) {
@@ -398,8 +403,7 @@ public static final class Builder {
398403

399404
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
400405
InstantiatingExecutorProvider.newBuilder()
401-
.setExecutorThreadCount(
402-
THREADS_PER_CHANNEL * Runtime.getRuntime().availableProcessors())
406+
.setExecutorThreadCount(THREADS_PER_CHANNEL)
403407
.build();
404408

405409
String subscriptionName;
@@ -502,7 +506,10 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
502506
return this;
503507
}
504508

505-
/** Gives the ability to set a custom executor. */
509+
/**
510+
* Gives the ability to set a custom executor. {@link ExecutorProvider#getExecutor()} will be
511+
* called {@link Builder#parallelPullCount} times.
512+
*/
506513
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
507514
this.executorProvider = Preconditions.checkNotNull(executorProvider);
508515
return this;

0 commit comments

Comments
 (0)