Skip to content

Commit 9472df3

Browse files
authored
---
yaml --- r: 30511 b: refs/heads/autosynth-automl c: 28b211f h: refs/heads/master i: 30509: 9ba3674 30507: ed00e03 30503: b4f3ac2 30495: e80cfb7
1 parent 68479a4 commit 9472df3

2 files changed

Lines changed: 28 additions & 8 deletions

File tree

  • branches/autosynth-automl/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ refs/heads/spanner: b01127f885b4611bf1852abb0ce481eeb7fcc131
121121
refs/tags/v0.68.0: 9cc799fcf68c82ab431d425fefa58ef615ce8e5b
122122
refs/tags/v0.69.0: 78f67a29e8b9c46ba01de566a2eae0fd1c03edea
123123
refs/heads/autosynth-asset: bdb45634a0fe8f7a510692b56b31f5312e25f453
124-
refs/heads/autosynth-automl: 248ace9e690c7a5e00475a4b895b837fb63beb2f
124+
refs/heads/autosynth-automl: 28b211fe52ced1e256561eac518047a10676dde0
125125
refs/heads/autosynth-bigquerydatatransfer: d88aa5aae5fd9d3c6d75bbab1a05162c6d4d948f
126126
refs/heads/autosynth-bigquerystorage: d2c53da3b012e38c662e4df0738042435f19365f
127127
refs/heads/autosynth-bigtable: 9e5429f45cf9face9fed585d0233534993e36b58

branches/autosynth-automl/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import com.google.api.gax.core.CredentialsProvider;
3030
import com.google.api.gax.core.Distribution;
3131
import com.google.api.gax.core.ExecutorProvider;
32-
import com.google.api.gax.core.FixedExecutorProvider;
3332
import com.google.api.gax.core.InstantiatingExecutorProvider;
3433
import com.google.api.gax.rpc.HeaderProvider;
3534
import com.google.api.gax.rpc.NoHeaderProvider;
@@ -41,12 +40,15 @@
4140
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
4241
import com.google.common.base.Optional;
4342
import com.google.common.base.Preconditions;
43+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
4444
import com.google.pubsub.v1.ProjectSubscriptionName;
4545
import java.io.IOException;
4646
import java.util.ArrayList;
4747
import java.util.List;
4848
import java.util.concurrent.Executors;
4949
import java.util.concurrent.ScheduledExecutorService;
50+
import java.util.concurrent.ThreadFactory;
51+
import java.util.concurrent.atomic.AtomicInteger;
5052
import java.util.logging.Level;
5153
import java.util.logging.Logger;
5254
import javax.annotation.Nullable;
@@ -138,13 +140,8 @@ private Subscriber(Builder builder) {
138140
executorProvider = builder.executorProvider;
139141

140142
ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider;
141-
if (systemExecutorProvider == null) {
142-
systemExecutorProvider =
143-
FixedExecutorProvider.create(
144-
Executors.newScheduledThreadPool(Math.max(6, 2 * numPullers)));
145-
}
146-
147143
alarmsExecutor = systemExecutorProvider.getExecutor();
144+
148145
if (systemExecutorProvider.shouldAutoClose()) {
149146
closeables.add(
150147
new AutoCloseable() {
@@ -405,6 +402,7 @@ public static final class Builder {
405402
InstantiatingExecutorProvider.newBuilder()
406403
.setExecutorThreadCount(THREADS_PER_CHANNEL)
407404
.build();
405+
private static final AtomicInteger SYSTEM_EXECUTOR_COUNTER = new AtomicInteger();
408406

409407
private String subscriptionName;
410408
private MessageReceiver receiver;
@@ -543,6 +541,28 @@ Builder setClock(ApiClock clock) {
543541
}
544542

545543
public Subscriber build() {
544+
if (systemExecutorProvider == null) {
545+
ThreadFactory threadFactory =
546+
new ThreadFactoryBuilder()
547+
.setDaemon(true)
548+
.setNameFormat("Subscriber-SE-" + SYSTEM_EXECUTOR_COUNTER.incrementAndGet() + "-%d")
549+
.build();
550+
int threadCount = Math.max(6, 2 * parallelPullCount);
551+
final ScheduledExecutorService executor =
552+
Executors.newScheduledThreadPool(threadCount, threadFactory);
553+
systemExecutorProvider =
554+
new ExecutorProvider() {
555+
@Override
556+
public boolean shouldAutoClose() {
557+
return true;
558+
}
559+
560+
@Override
561+
public ScheduledExecutorService getExecutor() {
562+
return executor;
563+
}
564+
};
565+
}
546566
return new Subscriber(this);
547567
}
548568
}

0 commit comments

Comments
 (0)