Skip to content

Commit c5e604e

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 21103 b: refs/heads/autosynth-dlp c: 67bf8fc h: refs/heads/master i: 21101: ea9281a 21099: 222337b 21095: 49d315f 21087: e0d6424
1 parent 54d1e8f commit c5e604e

2 files changed

Lines changed: 23 additions & 16 deletions

File tree

  • branches/autosynth-dlp/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ refs/tags/v0.60.0: 4cd518d0612329f8a8e53484eef4cd1651e32855
103103
refs/tags/v0.61.0: e4b526656bb1bf5eefd0ee578b7405147821225e
104104
refs/tags/v0.62.0: bbede7385d48ba08f487bdd29ec10668ace96396
105105
refs/heads/0.60.0-alpha: 10939381ffe0b8da32db4fe3087c86e3aa7f3e55
106-
refs/heads/autosynth-dlp: 257aa8ea9d3bc3fcb55440cbdd3f550a3400c2be
106+
refs/heads/autosynth-dlp: 67bf8fc05b6cdd789f8c5c029de36501670663b2
107107
refs/heads/autosynth-logging: ffd2c58e5ab2ff0eb1410bbab4b81682a85a84d2
108108
refs/heads/dupes: 3478c5d81fd242d0e985656645a679420a2060c2
109109
refs/tags/v0.63.0: 94f19b71d40f46b36120e7b9d78a1a3d41bfcbd6

branches/autosynth-dlp/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,10 @@ public class Subscriber extends AbstractApiService {
9999
private final String subscriptionName;
100100
private final FlowControlSettings flowControlSettings;
101101
private final Duration maxAckExtensionPeriod;
102-
private final ScheduledExecutorService executor;
102+
// The ExecutorProvider used to generate executors for processing messages.
103+
private final ExecutorProvider executorProvider;
104+
// An instantiation of the SystemExecutorProvider used for processing acks
105+
// and other system actions.
103106
@Nullable private final ScheduledExecutorService alarmsExecutor;
104107
private final Distribution ackLatencyDistribution =
105108
new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);
@@ -132,16 +135,7 @@ private Subscriber(Builder builder) {
132135

133136
this.numPullers = builder.parallelPullCount;
134137

135-
executor = builder.executorProvider.getExecutor();
136-
if (builder.executorProvider.shouldAutoClose()) {
137-
closeables.add(
138-
new AutoCloseable() {
139-
@Override
140-
public void close() throws IOException {
141-
executor.shutdown();
142-
}
143-
});
144-
}
138+
executorProvider = builder.executorProvider;
145139

146140
ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider;
147141
if (systemExecutorProvider == null) {
@@ -322,6 +316,17 @@ public void run() {
322316
private void startStreamingConnections() {
323317
synchronized (streamingSubscriberConnections) {
324318
for (int i = 0; i < numPullers; i++) {
319+
final ScheduledExecutorService executor = executorProvider.getExecutor();
320+
if (executorProvider.shouldAutoClose()) {
321+
closeables.add(
322+
new AutoCloseable() {
323+
@Override
324+
public void close() {
325+
executor.shutdown();
326+
}
327+
});
328+
}
329+
325330
streamingSubscriberConnections.add(
326331
new StreamingSubscriberConnection(
327332
subscriptionName,
@@ -364,7 +369,7 @@ private void stopAllStreamingConnections() {
364369
private void startConnections(
365370
List<? extends ApiService> connections, final ApiService.Listener connectionsListener) {
366371
for (ApiService subscriber : connections) {
367-
subscriber.addListener(connectionsListener, executor);
372+
subscriber.addListener(connectionsListener, alarmsExecutor);
368373
subscriber.startAsync();
369374
}
370375
for (ApiService subscriber : connections) {
@@ -398,8 +403,7 @@ public static final class Builder {
398403

399404
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
400405
InstantiatingExecutorProvider.newBuilder()
401-
.setExecutorThreadCount(
402-
THREADS_PER_CHANNEL * Runtime.getRuntime().availableProcessors())
406+
.setExecutorThreadCount(THREADS_PER_CHANNEL)
403407
.build();
404408

405409
String subscriptionName;
@@ -502,7 +506,10 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
502506
return this;
503507
}
504508

505-
/** Gives the ability to set a custom executor. */
509+
/**
510+
* Gives the ability to set a custom executor. {@link ExecutorProvider#getExecutor()} will be
511+
* called {@link Builder#parallelPullCount} times.
512+
*/
506513
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
507514
this.executorProvider = Preconditions.checkNotNull(executorProvider);
508515
return this;

0 commit comments

Comments
 (0)