Skip to content

Commit 268e126

Browse files
authored
---
yaml --- r: 29629 b: refs/heads/autosynth-monitoring c: 28b211f h: refs/heads/master i: 29627: 25fff8b
1 parent 3f5547c commit 268e126

2 files changed

Lines changed: 28 additions & 8 deletions

File tree

  • branches/autosynth-monitoring/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ refs/tags/v0.66.0: ed6a3f57cbdaa20339a1995f7d7d53b172a5b8ef
113113
refs/tags/v0.67.0: 30b56f02092efc6f3c3667650ea8b8825003e0b7
114114
refs/heads/autosynth-compute: c0ad4e5b54b2b167d47e508ca215c039404837d3
115115
refs/heads/autosynth-container: e9a9a8564756e20770320ed54eafa867040a7690
116-
refs/heads/autosynth-monitoring: 248ace9e690c7a5e00475a4b895b837fb63beb2f
116+
refs/heads/autosynth-monitoring: 28b211fe52ced1e256561eac518047a10676dde0
117117
refs/heads/autosynth-pubsub: 8aac5a6202d83e8d67f5cb1d58f0adf9f2622aeb
118118
refs/heads/autosynth-video-intelligence: 82b145665822830c46d47eb8925edb2d842d6815
119119
refs/heads/autosynth-vision: 01ef61bcef344ceee0198f56f18bc1db20201261

branches/autosynth-monitoring/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import com.google.api.gax.core.CredentialsProvider;
3030
import com.google.api.gax.core.Distribution;
3131
import com.google.api.gax.core.ExecutorProvider;
32-
import com.google.api.gax.core.FixedExecutorProvider;
3332
import com.google.api.gax.core.InstantiatingExecutorProvider;
3433
import com.google.api.gax.rpc.HeaderProvider;
3534
import com.google.api.gax.rpc.NoHeaderProvider;
@@ -41,12 +40,15 @@
4140
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
4241
import com.google.common.base.Optional;
4342
import com.google.common.base.Preconditions;
43+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
4444
import com.google.pubsub.v1.ProjectSubscriptionName;
4545
import java.io.IOException;
4646
import java.util.ArrayList;
4747
import java.util.List;
4848
import java.util.concurrent.Executors;
4949
import java.util.concurrent.ScheduledExecutorService;
50+
import java.util.concurrent.ThreadFactory;
51+
import java.util.concurrent.atomic.AtomicInteger;
5052
import java.util.logging.Level;
5153
import java.util.logging.Logger;
5254
import javax.annotation.Nullable;
@@ -138,13 +140,8 @@ private Subscriber(Builder builder) {
138140
executorProvider = builder.executorProvider;
139141

140142
ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider;
141-
if (systemExecutorProvider == null) {
142-
systemExecutorProvider =
143-
FixedExecutorProvider.create(
144-
Executors.newScheduledThreadPool(Math.max(6, 2 * numPullers)));
145-
}
146-
147143
alarmsExecutor = systemExecutorProvider.getExecutor();
144+
148145
if (systemExecutorProvider.shouldAutoClose()) {
149146
closeables.add(
150147
new AutoCloseable() {
@@ -405,6 +402,7 @@ public static final class Builder {
405402
InstantiatingExecutorProvider.newBuilder()
406403
.setExecutorThreadCount(THREADS_PER_CHANNEL)
407404
.build();
405+
private static final AtomicInteger SYSTEM_EXECUTOR_COUNTER = new AtomicInteger();
408406

409407
private String subscriptionName;
410408
private MessageReceiver receiver;
@@ -543,6 +541,28 @@ Builder setClock(ApiClock clock) {
543541
}
544542

545543
public Subscriber build() {
544+
if (systemExecutorProvider == null) {
545+
ThreadFactory threadFactory =
546+
new ThreadFactoryBuilder()
547+
.setDaemon(true)
548+
.setNameFormat("Subscriber-SE-" + SYSTEM_EXECUTOR_COUNTER.incrementAndGet() + "-%d")
549+
.build();
550+
int threadCount = Math.max(6, 2 * parallelPullCount);
551+
final ScheduledExecutorService executor =
552+
Executors.newScheduledThreadPool(threadCount, threadFactory);
553+
systemExecutorProvider =
554+
new ExecutorProvider() {
555+
@Override
556+
public boolean shouldAutoClose() {
557+
return true;
558+
}
559+
560+
@Override
561+
public ScheduledExecutorService getExecutor() {
562+
return executor;
563+
}
564+
};
565+
}
546566
return new Subscriber(this);
547567
}
548568
}

0 commit comments

Comments
 (0)