|
29 | 29 | import com.google.api.gax.core.CredentialsProvider; |
30 | 30 | import com.google.api.gax.core.Distribution; |
31 | 31 | import com.google.api.gax.core.ExecutorProvider; |
32 | | -import com.google.api.gax.core.FixedExecutorProvider; |
33 | 32 | import com.google.api.gax.core.InstantiatingExecutorProvider; |
34 | 33 | import com.google.api.gax.rpc.HeaderProvider; |
35 | 34 | import com.google.api.gax.rpc.NoHeaderProvider; |
|
41 | 40 | import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; |
42 | 41 | import com.google.common.base.Optional; |
43 | 42 | import com.google.common.base.Preconditions; |
| 43 | +import com.google.common.util.concurrent.ThreadFactoryBuilder; |
44 | 44 | import com.google.pubsub.v1.ProjectSubscriptionName; |
45 | 45 | import java.io.IOException; |
46 | 46 | import java.util.ArrayList; |
47 | 47 | import java.util.List; |
48 | 48 | import java.util.concurrent.Executors; |
49 | 49 | import java.util.concurrent.ScheduledExecutorService; |
| 50 | +import java.util.concurrent.ThreadFactory; |
| 51 | +import java.util.concurrent.atomic.AtomicInteger; |
50 | 52 | import java.util.logging.Level; |
51 | 53 | import java.util.logging.Logger; |
52 | 54 | import javax.annotation.Nullable; |
@@ -138,13 +140,8 @@ private Subscriber(Builder builder) { |
138 | 140 | executorProvider = builder.executorProvider; |
139 | 141 |
|
140 | 142 | ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider; |
141 | | - if (systemExecutorProvider == null) { |
142 | | - systemExecutorProvider = |
143 | | - FixedExecutorProvider.create( |
144 | | - Executors.newScheduledThreadPool(Math.max(6, 2 * numPullers))); |
145 | | - } |
146 | | - |
147 | 143 | alarmsExecutor = systemExecutorProvider.getExecutor(); |
| 144 | + |
148 | 145 | if (systemExecutorProvider.shouldAutoClose()) { |
149 | 146 | closeables.add( |
150 | 147 | new AutoCloseable() { |
@@ -405,6 +402,7 @@ public static final class Builder { |
405 | 402 | InstantiatingExecutorProvider.newBuilder() |
406 | 403 | .setExecutorThreadCount(THREADS_PER_CHANNEL) |
407 | 404 | .build(); |
| 405 | + private static final AtomicInteger SYSTEM_EXECUTOR_COUNTER = new AtomicInteger(); |
408 | 406 |
|
409 | 407 | private String subscriptionName; |
410 | 408 | private MessageReceiver receiver; |
@@ -543,6 +541,28 @@ Builder setClock(ApiClock clock) { |
543 | 541 | } |
544 | 542 |
|
545 | 543 | public Subscriber build() { |
| 544 | + if (systemExecutorProvider == null) { |
| 545 | + ThreadFactory threadFactory = |
| 546 | + new ThreadFactoryBuilder() |
| 547 | + .setDaemon(true) |
| 548 | + .setNameFormat("Subscriber-SE-" + SYSTEM_EXECUTOR_COUNTER.incrementAndGet() + "-%d") |
| 549 | + .build(); |
| 550 | + int threadCount = Math.max(6, 2 * parallelPullCount); |
| 551 | + final ScheduledExecutorService executor = |
| 552 | + Executors.newScheduledThreadPool(threadCount, threadFactory); |
| 553 | + systemExecutorProvider = |
| 554 | + new ExecutorProvider() { |
| 555 | + @Override |
| 556 | + public boolean shouldAutoClose() { |
| 557 | + return true; |
| 558 | + } |
| 559 | + |
| 560 | + @Override |
| 561 | + public ScheduledExecutorService getExecutor() { |
| 562 | + return executor; |
| 563 | + } |
| 564 | + }; |
| 565 | + } |
546 | 566 | return new Subscriber(this); |
547 | 567 | } |
548 | 568 | } |
|
0 commit comments