Skip to content

Commit 098f375

Browse files
authored
---
yaml --- r: 34535 b: refs/heads/autosynth-tasks c: 28b211f h: refs/heads/master i: 34533: 047d519 34531: 4c60008 34527: 40f4205
1 parent e72f187 commit 098f375

2 files changed

Lines changed: 28 additions & 8 deletions

File tree

  • branches/autosynth-tasks/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ refs/heads/autosynth-redis: 2b698ebe7faaa0b93246576a88941682e0888e57
139139
refs/heads/autosynth-scheduler: a3de6480746d1cd586ca8b9d75c55a636f371539
140140
refs/heads/autosynth-spanner: d963fe4368e79cf6abae5d511785e8ced8ac57f4
141141
refs/heads/autosynth-speech: c563dcd420cce0a37c39b1b9c24be1b9ba604dc7
142-
refs/heads/autosynth-tasks: 248ace9e690c7a5e00475a4b895b837fb63beb2f
142+
refs/heads/autosynth-tasks: 28b211fe52ced1e256561eac518047a10676dde0
143143
refs/heads/autosynth-texttospeech: 2c442fe0b7f089fbab266edfe4dd83c532e82dd0
144144
refs/heads/autosynth-trace: c94eef6e4d9c6fd24888216e28ca7271959c1cf0
145145
refs/heads/autosynth-websecurityscanner: fa561b356aabcd92d415ae8dc88fd8d87dbc5b23

branches/autosynth-tasks/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import com.google.api.gax.core.CredentialsProvider;
3030
import com.google.api.gax.core.Distribution;
3131
import com.google.api.gax.core.ExecutorProvider;
32-
import com.google.api.gax.core.FixedExecutorProvider;
3332
import com.google.api.gax.core.InstantiatingExecutorProvider;
3433
import com.google.api.gax.rpc.HeaderProvider;
3534
import com.google.api.gax.rpc.NoHeaderProvider;
@@ -41,12 +40,15 @@
4140
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
4241
import com.google.common.base.Optional;
4342
import com.google.common.base.Preconditions;
43+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
4444
import com.google.pubsub.v1.ProjectSubscriptionName;
4545
import java.io.IOException;
4646
import java.util.ArrayList;
4747
import java.util.List;
4848
import java.util.concurrent.Executors;
4949
import java.util.concurrent.ScheduledExecutorService;
50+
import java.util.concurrent.ThreadFactory;
51+
import java.util.concurrent.atomic.AtomicInteger;
5052
import java.util.logging.Level;
5153
import java.util.logging.Logger;
5254
import javax.annotation.Nullable;
@@ -138,13 +140,8 @@ private Subscriber(Builder builder) {
138140
executorProvider = builder.executorProvider;
139141

140142
ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider;
141-
if (systemExecutorProvider == null) {
142-
systemExecutorProvider =
143-
FixedExecutorProvider.create(
144-
Executors.newScheduledThreadPool(Math.max(6, 2 * numPullers)));
145-
}
146-
147143
alarmsExecutor = systemExecutorProvider.getExecutor();
144+
148145
if (systemExecutorProvider.shouldAutoClose()) {
149146
closeables.add(
150147
new AutoCloseable() {
@@ -405,6 +402,7 @@ public static final class Builder {
405402
InstantiatingExecutorProvider.newBuilder()
406403
.setExecutorThreadCount(THREADS_PER_CHANNEL)
407404
.build();
405+
private static final AtomicInteger SYSTEM_EXECUTOR_COUNTER = new AtomicInteger();
408406

409407
private String subscriptionName;
410408
private MessageReceiver receiver;
@@ -543,6 +541,28 @@ Builder setClock(ApiClock clock) {
543541
}
544542

545543
public Subscriber build() {
544+
if (systemExecutorProvider == null) {
545+
ThreadFactory threadFactory =
546+
new ThreadFactoryBuilder()
547+
.setDaemon(true)
548+
.setNameFormat("Subscriber-SE-" + SYSTEM_EXECUTOR_COUNTER.incrementAndGet() + "-%d")
549+
.build();
550+
int threadCount = Math.max(6, 2 * parallelPullCount);
551+
final ScheduledExecutorService executor =
552+
Executors.newScheduledThreadPool(threadCount, threadFactory);
553+
systemExecutorProvider =
554+
new ExecutorProvider() {
555+
@Override
556+
public boolean shouldAutoClose() {
557+
return true;
558+
}
559+
560+
@Override
561+
public ScheduledExecutorService getExecutor() {
562+
return executor;
563+
}
564+
};
565+
}
546566
return new Subscriber(this);
547567
}
548568
}

0 commit comments

Comments
 (0)