Skip to content

Commit 319b819

Browse files
committed
Merge branch 'master' into sub-cred
2 parents 59fab43 + 8c4c4bd commit 319b819

3 files changed

Lines changed: 22 additions & 27 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717
package com.google.cloud.pubsub.spi.v1;
1818

1919
import com.google.api.core.ApiClock;
20-
import com.google.api.gax.core.Distribution;
2120
import com.google.api.gax.batching.FlowController;
2221
import com.google.api.gax.batching.FlowController.FlowControlException;
23-
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
22+
import com.google.api.gax.core.Distribution;
2423
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.OutstandingMessagesBatch.OutstandingMessage;
2524
import com.google.common.annotations.VisibleForTesting;
2625
import com.google.common.collect.Lists;
@@ -46,7 +45,6 @@
4645
import java.util.concurrent.locks.ReentrantLock;
4746
import java.util.logging.Level;
4847
import java.util.logging.Logger;
49-
import javax.annotation.Nullable;
5048
import org.threeten.bp.Duration;
5149
import org.threeten.bp.Instant;
5250

@@ -61,11 +59,8 @@ class MessageDispatcher {
6159
@VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);
6260
private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60; // 10m
6361

64-
private static final ScheduledExecutorService SHARED_ALARMS_EXECUTOR =
65-
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(2).build().getExecutor();
66-
6762
private final ScheduledExecutorService executor;
68-
private final ScheduledExecutorService alarmsExecutor;
63+
private final ScheduledExecutorService systemExecutor;
6964
private final ApiClock clock;
7065

7166
private final Duration ackExpirationPadding;
@@ -245,10 +240,10 @@ void sendAckOperations(
245240
Distribution ackLatencyDistribution,
246241
FlowController flowController,
247242
ScheduledExecutorService executor,
248-
@Nullable ScheduledExecutorService alarmsExecutor,
243+
ScheduledExecutorService systemExecutor,
249244
ApiClock clock) {
250245
this.executor = executor;
251-
this.alarmsExecutor = alarmsExecutor == null ? SHARED_ALARMS_EXECUTOR : alarmsExecutor;
246+
this.systemExecutor = systemExecutor;
252247
this.ackExpirationPadding = ackExpirationPadding;
253248
this.maxAckExtensionPeriod = maxAckExtensionPeriod;
254249
this.receiver = receiver;
@@ -428,7 +423,7 @@ private void setupPendingAcksAlarm() {
428423
try {
429424
if (pendingAcksAlarm == null) {
430425
pendingAcksAlarm =
431-
alarmsExecutor.schedule(
426+
systemExecutor.schedule(
432427
new Runnable() {
433428
@Override
434429
public void run() {
@@ -557,7 +552,7 @@ private void setupNextAckDeadlineExtensionAlarm(Instant expiration) {
557552
nextAckDeadlineExtensionAlarmTime = possibleNextAlarmTime;
558553

559554
ackDeadlineExtensionAlarm =
560-
alarmsExecutor.schedule(
555+
systemExecutor.schedule(
561556
new AckDeadlineAlarm(),
562557
nextAckDeadlineExtensionAlarmTime.toEpochMilli() - clock.millisTime(),
563558
TimeUnit.MILLISECONDS);

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ final class PollingSubscriberConnection extends AbstractApiService implements Ac
5656
Logger.getLogger(PollingSubscriberConnection.class.getName());
5757

5858
private final String subscription;
59-
private final ScheduledExecutorService executor;
59+
private final ScheduledExecutorService pollingExecutor;
6060
private final SubscriberFutureStub stub;
6161
private final MessageDispatcher messageDispatcher;
6262
private final int maxDesiredPulledMessages;
@@ -71,10 +71,10 @@ public PollingSubscriberConnection(
7171
FlowController flowController,
7272
@Nullable Long maxDesiredPulledMessages,
7373
ScheduledExecutorService executor,
74-
@Nullable ScheduledExecutorService alarmsExecutor,
74+
ScheduledExecutorService systemExecutor,
7575
ApiClock clock) {
7676
this.subscription = subscription;
77-
this.executor = executor;
77+
this.pollingExecutor = systemExecutor;
7878
this.stub = stub;
7979
messageDispatcher =
8080
new MessageDispatcher(
@@ -85,7 +85,7 @@ public PollingSubscriberConnection(
8585
ackLatencyDistribution,
8686
flowController,
8787
executor,
88-
alarmsExecutor,
88+
systemExecutor,
8989
clock);
9090
messageDispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS);
9191
this.maxDesiredPulledMessages =
@@ -117,7 +117,7 @@ public void onFailure(Throwable cause) {
117117
notifyFailed(cause);
118118
}
119119
},
120-
executor);
120+
pollingExecutor);
121121
}
122122

123123
@Override
@@ -144,7 +144,7 @@ public void onSuccess(PullResponse pullResponse) {
144144
if (pullResponse.getReceivedMessagesCount() == 0) {
145145
// No messages in response, possibly caught up in backlog, we backoff to avoid
146146
// slamming the server.
147-
executor.schedule(
147+
pollingExecutor.schedule(
148148
new Runnable() {
149149
@Override
150150
public void run() {
@@ -178,7 +178,7 @@ public void onFailure(Throwable cause) {
178178
}
179179
if (StatusUtil.isRetryable(cause)) {
180180
logger.log(Level.WARNING, "Failed to pull messages (recoverable): ", cause);
181-
executor.schedule(
181+
pollingExecutor.schedule(
182182
new Runnable() {
183183
@Override
184184
public void run() {
@@ -197,7 +197,7 @@ public void run() {
197197
}
198198
}
199199
},
200-
executor);
200+
pollingExecutor);
201201
}
202202

203203
private boolean isAlive() {

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.api.gax.core.Distribution;
2828
import com.google.api.gax.grpc.ChannelProvider;
2929
import com.google.api.gax.grpc.ExecutorProvider;
30+
import com.google.api.gax.grpc.FixedExecutorProvider;
3031
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
3132
import com.google.auth.Credentials;
3233
import com.google.auth.oauth2.GoogleCredentials;
@@ -89,6 +90,9 @@ public class Subscriber extends AbstractApiService {
8990
private static final Duration ACK_DEADLINE_UPDATE_PERIOD = Duration.ofMinutes(1);
9091
private static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9;
9192

93+
private static final ScheduledExecutorService SHARED_SYSTEM_EXECUTOR =
94+
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(6).build().getExecutor();
95+
9296
private static final Logger logger = Logger.getLogger(Subscriber.class.getName());
9397

9498
private final SubscriptionName subscriptionName;
@@ -145,9 +149,8 @@ public void close() throws IOException {
145149
}
146150
});
147151
}
148-
if (builder.alarmsExecutorProvider != null) {
149-
alarmsExecutor = builder.alarmsExecutorProvider.getExecutor();
150-
if (builder.alarmsExecutorProvider.shouldAutoClose()) {
152+
alarmsExecutor = builder.systemExecutorProvider.getExecutor();
153+
if (builder.systemExecutorProvider.shouldAutoClose()) {
151154
closeables.add(
152155
new AutoCloseable() {
153156
@Override
@@ -156,9 +159,6 @@ public void close() throws IOException {
156159
}
157160
});
158161
}
159-
} else {
160-
alarmsExecutor = null;
161-
}
162162

163163
channelProvider = builder.channelProvider;
164164
credentialsProvider = builder.credentialsProvider;
@@ -389,7 +389,7 @@ public static final class Builder {
389389
.build();
390390

391391
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
392-
@Nullable ExecutorProvider alarmsExecutorProvider;
392+
ExecutorProvider systemExecutorProvider = FixedExecutorProvider.create(SHARED_SYSTEM_EXECUTOR);
393393
ChannelProvider channelProvider =
394394
SubscriptionAdminSettings.defaultChannelProviderBuilder()
395395
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
@@ -472,7 +472,7 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
472472
* a shared one will be used by all {@link Subscriber} instances.
473473
*/
474474
public Builder setLeaseAlarmsExecutorProvider(ExecutorProvider executorProvider) {
475-
this.alarmsExecutorProvider = Preconditions.checkNotNull(executorProvider);
475+
this.systemExecutorProvider = Preconditions.checkNotNull(executorProvider);
476476
return this;
477477
}
478478

0 commit comments

Comments
 (0)