Skip to content

Commit 21aefc0

Browse files
authored
---
yaml --- r: 33283 b: refs/heads/autosynth-language c: 28b211f h: refs/heads/master i: 33281: f895746 33279: 03db471
1 parent 23c81fa commit 21aefc0

2 files changed

Lines changed: 28 additions & 8 deletions

File tree

  • branches/autosynth-language/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ refs/heads/autosynth-errorreporting: effe8001d110ad584187b30aafc473db0dd4a15f
133133
refs/heads/autosynth-firestore: e79eeb26930dfae4439424ad2fda5874eeca54c8
134134
refs/heads/autosynth-iot: 044be280805a59e06d09658688c9ee474a9815ad
135135
refs/heads/autosynth-kms: d31449d6621a50fb16a4bef4f30f0f3051d27d7c
136-
refs/heads/autosynth-language: 248ace9e690c7a5e00475a4b895b837fb63beb2f
136+
refs/heads/autosynth-language: 28b211fe52ced1e256561eac518047a10676dde0
137137
refs/heads/autosynth-os-login: 123ba209c5769d0ee067e0ce5848bec13b42a4f4
138138
refs/heads/autosynth-redis: 6bedce4d7c7c6ca6a22e83ad1780e08fdc565a9e
139139
refs/heads/autosynth-scheduler: 57f9fdb1e7de30c85f4ec7198931a07f50603e55

branches/autosynth-language/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import com.google.api.gax.core.CredentialsProvider;
3030
import com.google.api.gax.core.Distribution;
3131
import com.google.api.gax.core.ExecutorProvider;
32-
import com.google.api.gax.core.FixedExecutorProvider;
3332
import com.google.api.gax.core.InstantiatingExecutorProvider;
3433
import com.google.api.gax.rpc.HeaderProvider;
3534
import com.google.api.gax.rpc.NoHeaderProvider;
@@ -41,12 +40,15 @@
4140
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
4241
import com.google.common.base.Optional;
4342
import com.google.common.base.Preconditions;
43+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
4444
import com.google.pubsub.v1.ProjectSubscriptionName;
4545
import java.io.IOException;
4646
import java.util.ArrayList;
4747
import java.util.List;
4848
import java.util.concurrent.Executors;
4949
import java.util.concurrent.ScheduledExecutorService;
50+
import java.util.concurrent.ThreadFactory;
51+
import java.util.concurrent.atomic.AtomicInteger;
5052
import java.util.logging.Level;
5153
import java.util.logging.Logger;
5254
import javax.annotation.Nullable;
@@ -138,13 +140,8 @@ private Subscriber(Builder builder) {
138140
executorProvider = builder.executorProvider;
139141

140142
ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider;
141-
if (systemExecutorProvider == null) {
142-
systemExecutorProvider =
143-
FixedExecutorProvider.create(
144-
Executors.newScheduledThreadPool(Math.max(6, 2 * numPullers)));
145-
}
146-
147143
alarmsExecutor = systemExecutorProvider.getExecutor();
144+
148145
if (systemExecutorProvider.shouldAutoClose()) {
149146
closeables.add(
150147
new AutoCloseable() {
@@ -405,6 +402,7 @@ public static final class Builder {
405402
InstantiatingExecutorProvider.newBuilder()
406403
.setExecutorThreadCount(THREADS_PER_CHANNEL)
407404
.build();
405+
private static final AtomicInteger SYSTEM_EXECUTOR_COUNTER = new AtomicInteger();
408406

409407
private String subscriptionName;
410408
private MessageReceiver receiver;
@@ -543,6 +541,28 @@ Builder setClock(ApiClock clock) {
543541
}
544542

545543
public Subscriber build() {
544+
if (systemExecutorProvider == null) {
545+
ThreadFactory threadFactory =
546+
new ThreadFactoryBuilder()
547+
.setDaemon(true)
548+
.setNameFormat("Subscriber-SE-" + SYSTEM_EXECUTOR_COUNTER.incrementAndGet() + "-%d")
549+
.build();
550+
int threadCount = Math.max(6, 2 * parallelPullCount);
551+
final ScheduledExecutorService executor =
552+
Executors.newScheduledThreadPool(threadCount, threadFactory);
553+
systemExecutorProvider =
554+
new ExecutorProvider() {
555+
@Override
556+
public boolean shouldAutoClose() {
557+
return true;
558+
}
559+
560+
@Override
561+
public ScheduledExecutorService getExecutor() {
562+
return executor;
563+
}
564+
};
565+
}
546566
return new Subscriber(this);
547567
}
548568
}

0 commit comments

Comments
 (0)