Skip to content

Commit 27dd7fd

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 18787 b: refs/heads/autosynth-pubsub c: 67bf8fc h: refs/heads/master i: 18785: f68eab6 18783: c8d60f3
1 parent 7160a34 commit 27dd7fd

2 files changed

Lines changed: 23 additions & 16 deletions

File tree

  • branches/autosynth-pubsub/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ refs/tags/v0.67.0: 30b56f02092efc6f3c3667650ea8b8825003e0b7
115115
refs/heads/autosynth-compute: afe19e1678334117f3daf13b968b667857f7c8ca
116116
refs/heads/autosynth-container: f6384095f50b31bfc8208542a49181e158d3681c
117117
refs/heads/autosynth-monitoring: 832528c131ac32b115fd7a369701d9e6e4ec62e5
118-
refs/heads/autosynth-pubsub: 257aa8ea9d3bc3fcb55440cbdd3f550a3400c2be
118+
refs/heads/autosynth-pubsub: 67bf8fc05b6cdd789f8c5c029de36501670663b2
119119
refs/heads/autosynth-video-intelligence: e44c9746407fe00fac42e1bb10ac50f493dd37b0
120120
refs/heads/autosynth-vision: b8e47d76578b5f150ef530072ea7e485e2b02ca0
121121
refs/heads/spanner: b01127f885b4611bf1852abb0ce481eeb7fcc131

branches/autosynth-pubsub/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,10 @@ public class Subscriber extends AbstractApiService {
9999
private final String subscriptionName;
100100
private final FlowControlSettings flowControlSettings;
101101
private final Duration maxAckExtensionPeriod;
102-
private final ScheduledExecutorService executor;
102+
// The ExecutorProvider used to generate executors for processing messages.
103+
private final ExecutorProvider executorProvider;
104+
// An instantiation of the SystemExecutorProvider used for processing acks
105+
// and other system actions.
103106
@Nullable private final ScheduledExecutorService alarmsExecutor;
104107
private final Distribution ackLatencyDistribution =
105108
new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);
@@ -132,16 +135,7 @@ private Subscriber(Builder builder) {
132135

133136
this.numPullers = builder.parallelPullCount;
134137

135-
executor = builder.executorProvider.getExecutor();
136-
if (builder.executorProvider.shouldAutoClose()) {
137-
closeables.add(
138-
new AutoCloseable() {
139-
@Override
140-
public void close() throws IOException {
141-
executor.shutdown();
142-
}
143-
});
144-
}
138+
executorProvider = builder.executorProvider;
145139

146140
ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider;
147141
if (systemExecutorProvider == null) {
@@ -322,6 +316,17 @@ public void run() {
322316
private void startStreamingConnections() {
323317
synchronized (streamingSubscriberConnections) {
324318
for (int i = 0; i < numPullers; i++) {
319+
final ScheduledExecutorService executor = executorProvider.getExecutor();
320+
if (executorProvider.shouldAutoClose()) {
321+
closeables.add(
322+
new AutoCloseable() {
323+
@Override
324+
public void close() {
325+
executor.shutdown();
326+
}
327+
});
328+
}
329+
325330
streamingSubscriberConnections.add(
326331
new StreamingSubscriberConnection(
327332
subscriptionName,
@@ -364,7 +369,7 @@ private void stopAllStreamingConnections() {
364369
private void startConnections(
365370
List<? extends ApiService> connections, final ApiService.Listener connectionsListener) {
366371
for (ApiService subscriber : connections) {
367-
subscriber.addListener(connectionsListener, executor);
372+
subscriber.addListener(connectionsListener, alarmsExecutor);
368373
subscriber.startAsync();
369374
}
370375
for (ApiService subscriber : connections) {
@@ -398,8 +403,7 @@ public static final class Builder {
398403

399404
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
400405
InstantiatingExecutorProvider.newBuilder()
401-
.setExecutorThreadCount(
402-
THREADS_PER_CHANNEL * Runtime.getRuntime().availableProcessors())
406+
.setExecutorThreadCount(THREADS_PER_CHANNEL)
403407
.build();
404408

405409
String subscriptionName;
@@ -502,7 +506,10 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
502506
return this;
503507
}
504508

505-
/** Gives the ability to set a custom executor. */
509+
/**
510+
* Gives the ability to set a custom executor. {@link ExecutorProvider#getExecutor()} will be
511+
* called {@link Builder#parallelPullCount} times.
512+
*/
506513
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
507514
this.executorProvider = Preconditions.checkNotNull(executorProvider);
508515
return this;

0 commit comments

Comments
 (0)