Skip to content

Commit 340b48c

Browse files
authored
---
yaml --- r: 35689 b: refs/heads/autosynth-iamcredentials c: 28b211f h: refs/heads/master i: 35687: ea0863f
1 parent 229d606 commit 340b48c

2 files changed

Lines changed: 28 additions & 8 deletions

File tree

  • branches/autosynth-iamcredentials/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ refs/heads/pubsub-ordering-keys: 3ea3dc93288c90e1776339a2c059a076e2e2fd1d
159159
refs/tags/v0.75.0: c3673089ae09a897c1b4cf7dfe167fe4f8ab32fb
160160
refs/tags/v0.76.0: 395b016826d3ddf9cb8b34919636df15a4dbd032
161161
refs/tags/v0.77.0: 28a85a77883ccf5d48f297fd0ef3b3dca6ce01f0
162-
refs/heads/autosynth-iamcredentials: 248ace9e690c7a5e00475a4b895b837fb63beb2f
162+
refs/heads/autosynth-iamcredentials: 28b211fe52ced1e256561eac518047a10676dde0
163163
refs/heads/release-google-cloud-java-v0.78.0: fae5e980779cf0173a152636b278015b9f60ee55
164164
refs/tags/v0.78.0: 62d4bd30605ab3578f9a08d84487fb0b33ac2ff5
165165
refs/tags/v0.79.0: 82287b570708748c411d05c40f3932cff9606feb

branches/autosynth-iamcredentials/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import com.google.api.gax.core.CredentialsProvider;
3030
import com.google.api.gax.core.Distribution;
3131
import com.google.api.gax.core.ExecutorProvider;
32-
import com.google.api.gax.core.FixedExecutorProvider;
3332
import com.google.api.gax.core.InstantiatingExecutorProvider;
3433
import com.google.api.gax.rpc.HeaderProvider;
3534
import com.google.api.gax.rpc.NoHeaderProvider;
@@ -41,12 +40,15 @@
4140
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
4241
import com.google.common.base.Optional;
4342
import com.google.common.base.Preconditions;
43+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
4444
import com.google.pubsub.v1.ProjectSubscriptionName;
4545
import java.io.IOException;
4646
import java.util.ArrayList;
4747
import java.util.List;
4848
import java.util.concurrent.Executors;
4949
import java.util.concurrent.ScheduledExecutorService;
50+
import java.util.concurrent.ThreadFactory;
51+
import java.util.concurrent.atomic.AtomicInteger;
5052
import java.util.logging.Level;
5153
import java.util.logging.Logger;
5254
import javax.annotation.Nullable;
@@ -138,13 +140,8 @@ private Subscriber(Builder builder) {
138140
executorProvider = builder.executorProvider;
139141

140142
ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider;
141-
if (systemExecutorProvider == null) {
142-
systemExecutorProvider =
143-
FixedExecutorProvider.create(
144-
Executors.newScheduledThreadPool(Math.max(6, 2 * numPullers)));
145-
}
146-
147143
alarmsExecutor = systemExecutorProvider.getExecutor();
144+
148145
if (systemExecutorProvider.shouldAutoClose()) {
149146
closeables.add(
150147
new AutoCloseable() {
@@ -405,6 +402,7 @@ public static final class Builder {
405402
InstantiatingExecutorProvider.newBuilder()
406403
.setExecutorThreadCount(THREADS_PER_CHANNEL)
407404
.build();
405+
private static final AtomicInteger SYSTEM_EXECUTOR_COUNTER = new AtomicInteger();
408406

409407
private String subscriptionName;
410408
private MessageReceiver receiver;
@@ -543,6 +541,28 @@ Builder setClock(ApiClock clock) {
543541
}
544542

545543
public Subscriber build() {
544+
if (systemExecutorProvider == null) {
545+
ThreadFactory threadFactory =
546+
new ThreadFactoryBuilder()
547+
.setDaemon(true)
548+
.setNameFormat("Subscriber-SE-" + SYSTEM_EXECUTOR_COUNTER.incrementAndGet() + "-%d")
549+
.build();
550+
int threadCount = Math.max(6, 2 * parallelPullCount);
551+
final ScheduledExecutorService executor =
552+
Executors.newScheduledThreadPool(threadCount, threadFactory);
553+
systemExecutorProvider =
554+
new ExecutorProvider() {
555+
@Override
556+
public boolean shouldAutoClose() {
557+
return true;
558+
}
559+
560+
@Override
561+
public ScheduledExecutorService getExecutor() {
562+
return executor;
563+
}
564+
};
565+
}
546566
return new Subscriber(this);
547567
}
548568
}

0 commit comments

Comments
 (0)