Skip to content

Commit 05405c8

Browse files
authored
---
yaml --- r: 5713 b: refs/heads/master c: 8c4c4bd h: refs/heads/master i: 5711: 0fa716d
1 parent 271353c commit 05405c8

4 files changed

Lines changed: 23 additions & 28 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
refs/heads/master: a08d3fdf2b28eb49d4003bbfe77b4b730be2812a
2+
refs/heads/master: 8c4c4bd754eb167f6b10a1b5af3b7d0ede87c831
33
refs/heads/travis: dae77e558b884bc1b165155482d76c8e40b0fca4
44
refs/heads/gh-pages: f8ea70cdc599a5d39c2df480280877afb3bef9bd
55
refs/tags/0.0.9: 22f1839238f66c39e67ed4dfdcd273b1ae2e8444

trunk/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717
package com.google.cloud.pubsub.spi.v1;
1818

1919
import com.google.api.core.ApiClock;
20-
import com.google.api.gax.core.Distribution;
2120
import com.google.api.gax.batching.FlowController;
2221
import com.google.api.gax.batching.FlowController.FlowControlException;
23-
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
22+
import com.google.api.gax.core.Distribution;
2423
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.OutstandingMessagesBatch.OutstandingMessage;
2524
import com.google.common.annotations.VisibleForTesting;
2625
import com.google.common.collect.Lists;
@@ -46,7 +45,6 @@
4645
import java.util.concurrent.locks.ReentrantLock;
4746
import java.util.logging.Level;
4847
import java.util.logging.Logger;
49-
import javax.annotation.Nullable;
5048
import org.threeten.bp.Duration;
5149
import org.threeten.bp.Instant;
5250

@@ -61,11 +59,8 @@ class MessageDispatcher {
6159
@VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);
6260
private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60; // 10m
6361

64-
private static final ScheduledExecutorService SHARED_ALARMS_EXECUTOR =
65-
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(2).build().getExecutor();
66-
6762
private final ScheduledExecutorService executor;
68-
private final ScheduledExecutorService alarmsExecutor;
63+
private final ScheduledExecutorService systemExecutor;
6964
private final ApiClock clock;
7065

7166
private final Duration ackExpirationPadding;
@@ -245,10 +240,10 @@ void sendAckOperations(
245240
Distribution ackLatencyDistribution,
246241
FlowController flowController,
247242
ScheduledExecutorService executor,
248-
@Nullable ScheduledExecutorService alarmsExecutor,
243+
ScheduledExecutorService systemExecutor,
249244
ApiClock clock) {
250245
this.executor = executor;
251-
this.alarmsExecutor = alarmsExecutor == null ? SHARED_ALARMS_EXECUTOR : alarmsExecutor;
246+
this.systemExecutor = systemExecutor;
252247
this.ackExpirationPadding = ackExpirationPadding;
253248
this.maxAckExtensionPeriod = maxAckExtensionPeriod;
254249
this.receiver = receiver;
@@ -428,7 +423,7 @@ private void setupPendingAcksAlarm() {
428423
try {
429424
if (pendingAcksAlarm == null) {
430425
pendingAcksAlarm =
431-
alarmsExecutor.schedule(
426+
systemExecutor.schedule(
432427
new Runnable() {
433428
@Override
434429
public void run() {
@@ -557,7 +552,7 @@ private void setupNextAckDeadlineExtensionAlarm(Instant expiration) {
557552
nextAckDeadlineExtensionAlarmTime = possibleNextAlarmTime;
558553

559554
ackDeadlineExtensionAlarm =
560-
alarmsExecutor.schedule(
555+
systemExecutor.schedule(
561556
new AckDeadlineAlarm(),
562557
nextAckDeadlineExtensionAlarmTime.toEpochMilli() - clock.millisTime(),
563558
TimeUnit.MILLISECONDS);

trunk/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ final class PollingSubscriberConnection extends AbstractApiService implements Ac
5858
Logger.getLogger(PollingSubscriberConnection.class.getName());
5959

6060
private final String subscription;
61-
private final ScheduledExecutorService executor;
61+
private final ScheduledExecutorService pollingExecutor;
6262
private final SubscriberFutureStub stub;
6363
private final MessageDispatcher messageDispatcher;
6464
private final int maxDesiredPulledMessages;
@@ -73,10 +73,10 @@ public PollingSubscriberConnection(
7373
FlowController flowController,
7474
@Nullable Long maxDesiredPulledMessages,
7575
ScheduledExecutorService executor,
76-
@Nullable ScheduledExecutorService alarmsExecutor,
76+
ScheduledExecutorService systemExecutor,
7777
ApiClock clock) {
7878
this.subscription = subscription;
79-
this.executor = executor;
79+
this.pollingExecutor = systemExecutor;
8080
stub = SubscriberGrpc.newFutureStub(channel);
8181
messageDispatcher =
8282
new MessageDispatcher(
@@ -87,7 +87,7 @@ public PollingSubscriberConnection(
8787
ackLatencyDistribution,
8888
flowController,
8989
executor,
90-
alarmsExecutor,
90+
systemExecutor,
9191
clock);
9292
messageDispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS);
9393
this.maxDesiredPulledMessages =
@@ -119,7 +119,7 @@ public void onFailure(Throwable cause) {
119119
notifyFailed(cause);
120120
}
121121
},
122-
executor);
122+
pollingExecutor);
123123
}
124124

125125
@Override
@@ -146,7 +146,7 @@ public void onSuccess(PullResponse pullResponse) {
146146
if (pullResponse.getReceivedMessagesCount() == 0) {
147147
// No messages in response, possibly caught up in backlog, we backoff to avoid
148148
// slamming the server.
149-
executor.schedule(
149+
pollingExecutor.schedule(
150150
new Runnable() {
151151
@Override
152152
public void run() {
@@ -180,7 +180,7 @@ public void onFailure(Throwable cause) {
180180
}
181181
if (StatusUtil.isRetryable(cause)) {
182182
logger.log(Level.WARNING, "Failed to pull messages (recoverable): ", cause);
183-
executor.schedule(
183+
pollingExecutor.schedule(
184184
new Runnable() {
185185
@Override
186186
public void run() {
@@ -199,7 +199,7 @@ public void run() {
199199
}
200200
}
201201
},
202-
executor);
202+
pollingExecutor);
203203
}
204204

205205
private boolean isAlive() {

trunk/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.api.gax.core.Distribution;
2727
import com.google.api.gax.grpc.ChannelProvider;
2828
import com.google.api.gax.grpc.ExecutorProvider;
29+
import com.google.api.gax.grpc.FixedExecutorProvider;
2930
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
3031
import com.google.auth.oauth2.GoogleCredentials;
3132
import com.google.common.annotations.VisibleForTesting;
@@ -85,6 +86,9 @@ public class Subscriber extends AbstractApiService {
8586
private static final Duration ACK_DEADLINE_UPDATE_PERIOD = Duration.ofMinutes(1);
8687
private static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9;
8788

89+
private static final ScheduledExecutorService SHARED_SYSTEM_EXECUTOR =
90+
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(6).build().getExecutor();
91+
8892
private static final Logger logger = Logger.getLogger(Subscriber.class.getName());
8993

9094
private final SubscriptionName subscriptionName;
@@ -140,9 +144,8 @@ public void close() throws IOException {
140144
}
141145
});
142146
}
143-
if (builder.alarmsExecutorProvider != null) {
144-
alarmsExecutor = builder.alarmsExecutorProvider.getExecutor();
145-
if (builder.alarmsExecutorProvider.shouldAutoClose()) {
147+
alarmsExecutor = builder.systemExecutorProvider.getExecutor();
148+
if (builder.systemExecutorProvider.shouldAutoClose()) {
146149
closeables.add(
147150
new AutoCloseable() {
148151
@Override
@@ -151,9 +154,6 @@ public void close() throws IOException {
151154
}
152155
});
153156
}
154-
} else {
155-
alarmsExecutor = null;
156-
}
157157

158158
channelProvider = builder.channelProvider;
159159

@@ -455,7 +455,7 @@ public static final class Builder {
455455
.build();
456456

457457
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
458-
@Nullable ExecutorProvider alarmsExecutorProvider;
458+
ExecutorProvider systemExecutorProvider = FixedExecutorProvider.create(SHARED_SYSTEM_EXECUTOR);
459459
ChannelProvider channelProvider =
460460
SubscriptionAdminSettings.defaultChannelProviderBuilder()
461461
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
@@ -530,7 +530,7 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) {
530530
* a shared one will be used by all {@link Subscriber} instances.
531531
*/
532532
public Builder setLeaseAlarmsExecutorProvider(ExecutorProvider executorProvider) {
533-
this.alarmsExecutorProvider = Preconditions.checkNotNull(executorProvider);
533+
this.systemExecutorProvider = Preconditions.checkNotNull(executorProvider);
534534
return this;
535535
}
536536

0 commit comments

Comments
 (0)