Skip to content

Commit 40bba80

Browse files
committed
pubsub: move polling task to alarm executor
"alarm executor". This commit moves polling tasks to the same executor, and give the said executor more threads to handle the extra workload. This commit puts all client's "house keeping" tasks in one executor, and all user code in another. In this way, long running user code cannot starve the client of CPU.
1 parent 0fbd82e commit 40bba80

3 files changed

Lines changed: 10 additions & 15 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717
package com.google.cloud.pubsub.spi.v1;
1818

1919
import com.google.api.core.ApiClock;
20-
import com.google.api.gax.core.Distribution;
2120
import com.google.api.gax.batching.FlowController;
2221
import com.google.api.gax.batching.FlowController.FlowControlException;
23-
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
22+
import com.google.api.gax.core.Distribution;
2423
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.OutstandingMessagesBatch.OutstandingMessage;
2524
import com.google.common.annotations.VisibleForTesting;
2625
import com.google.common.collect.Lists;
@@ -46,7 +45,6 @@
4645
import java.util.concurrent.locks.ReentrantLock;
4746
import java.util.logging.Level;
4847
import java.util.logging.Logger;
49-
import javax.annotation.Nullable;
5048
import org.threeten.bp.Duration;
5149
import org.threeten.bp.Instant;
5250

@@ -61,9 +59,6 @@ class MessageDispatcher {
6159
@VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);
6260
private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60; // 10m
6361

64-
private static final ScheduledExecutorService SHARED_ALARMS_EXECUTOR =
65-
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(2).build().getExecutor();
66-
6762
private final ScheduledExecutorService executor;
6863
private final ScheduledExecutorService alarmsExecutor;
6964
private final ApiClock clock;
@@ -245,10 +240,10 @@ void sendAckOperations(
245240
Distribution ackLatencyDistribution,
246241
FlowController flowController,
247242
ScheduledExecutorService executor,
248-
@Nullable ScheduledExecutorService alarmsExecutor,
243+
ScheduledExecutorService alarmsExecutor,
249244
ApiClock clock) {
250245
this.executor = executor;
251-
this.alarmsExecutor = alarmsExecutor == null ? SHARED_ALARMS_EXECUTOR : alarmsExecutor;
246+
this.alarmsExecutor = alarmsExecutor;
252247
this.ackExpirationPadding = ackExpirationPadding;
253248
this.maxAckExtensionPeriod = maxAckExtensionPeriod;
254249
this.receiver = receiver;

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@ public PollingSubscriberConnection(
7373
FlowController flowController,
7474
@Nullable Long maxDesiredPulledMessages,
7575
ScheduledExecutorService executor,
76-
@Nullable ScheduledExecutorService alarmsExecutor,
76+
ScheduledExecutorService alarmsExecutor,
7777
ApiClock clock) {
7878
this.subscription = subscription;
79-
this.executor = executor;
79+
this.executor = alarmsExecutor;
8080
stub = SubscriberGrpc.newFutureStub(channel);
8181
messageDispatcher =
8282
new MessageDispatcher(

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.api.gax.core.Distribution;
2727
import com.google.api.gax.grpc.ChannelProvider;
2828
import com.google.api.gax.grpc.ExecutorProvider;
29+
import com.google.api.gax.grpc.FixedExecutorProvider;
2930
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
3031
import com.google.auth.oauth2.GoogleCredentials;
3132
import com.google.common.annotations.VisibleForTesting;
@@ -85,6 +86,9 @@ public class Subscriber extends AbstractApiService {
8586
private static final Duration ACK_DEADLINE_UPDATE_PERIOD = Duration.ofMinutes(1);
8687
private static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9;
8788

89+
private static final ScheduledExecutorService SHARED_ALARMS_EXECUTOR =
90+
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(6).build().getExecutor();
91+
8892
private static final Logger logger = Logger.getLogger(Subscriber.class.getName());
8993

9094
private final SubscriptionName subscriptionName;
@@ -140,7 +144,6 @@ public void close() throws IOException {
140144
}
141145
});
142146
}
143-
if (builder.alarmsExecutorProvider != null) {
144147
alarmsExecutor = builder.alarmsExecutorProvider.getExecutor();
145148
if (builder.alarmsExecutorProvider.shouldAutoClose()) {
146149
closeables.add(
@@ -151,9 +154,6 @@ public void close() throws IOException {
151154
}
152155
});
153156
}
154-
} else {
155-
alarmsExecutor = null;
156-
}
157157

158158
channelProvider = builder.channelProvider;
159159

@@ -455,7 +455,7 @@ public static final class Builder {
455455
.build();
456456

457457
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
458-
@Nullable ExecutorProvider alarmsExecutorProvider;
458+
ExecutorProvider alarmsExecutorProvider = FixedExecutorProvider.create(SHARED_ALARMS_EXECUTOR);
459459
ChannelProvider channelProvider =
460460
SubscriptionAdminSettings.defaultChannelProviderBuilder()
461461
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)

0 commit comments

Comments
 (0)