Skip to content

Commit f716280

Browse files
committed
Fixing merging issues as well as making the alarms executor configurable
by the user, this allows for proper unit testing plus adds the ability to the user to override the executor.
1 parent eec304e commit f716280

5 files changed

Lines changed: 39 additions & 2 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package com.google.cloud.pubsub.spi.v1;
1818

19-
import com.google.api.gax.core.ApiClock;
19+
import com.google.api.core.ApiClock;
2020
import com.google.api.gax.core.FlowController;
2121
import com.google.api.gax.core.FlowController.FlowControlException;
2222
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
@@ -46,6 +46,7 @@
4646
import java.util.concurrent.locks.ReentrantLock;
4747
import java.util.logging.Level;
4848
import java.util.logging.Logger;
49+
import javax.annotation.Nullable;
4950
import org.joda.time.Duration;
5051
import org.joda.time.Instant;
5152
import org.joda.time.Interval;
@@ -61,10 +62,11 @@ class MessageDispatcher {
6162
@VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration.millis(100);
6263
private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60; // 10m
6364

64-
private static final ScheduledExecutorService alarmsExecutor =
65+
private static final ScheduledExecutorService SHARED_ALARMS_EXECUTOR =
6566
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(2).build().getExecutor();
6667

6768
private final ScheduledExecutorService executor;
69+
private final ScheduledExecutorService alarmsExecutor;
6870
private final ApiClock clock;
6971

7072
private final Duration ackExpirationPadding;
@@ -243,8 +245,10 @@ void sendAckOperations(
243245
Distribution ackLatencyDistribution,
244246
FlowController flowController,
245247
ScheduledExecutorService executor,
248+
@Nullable ScheduledExecutorService alarmsExecutor,
246249
ApiClock clock) {
247250
this.executor = executor;
251+
this.alarmsExecutor = alarmsExecutor == null ? SHARED_ALARMS_EXECUTOR : alarmsExecutor;
248252
this.ackExpirationPadding = ackExpirationPadding;
249253
this.maxAckExtensionPeriod = maxAckExtensionPeriod;
250254
this.receiver = receiver;

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public PollingSubscriberConnection(
7272
FlowController flowController,
7373
@Nullable Integer maxDesiredPulledMessages,
7474
ScheduledExecutorService executor,
75+
@Nullable ScheduledExecutorService alarmsExecutor,
7576
ApiClock clock) {
7677
this.subscription = subscription;
7778
this.executor = executor;
@@ -85,6 +86,7 @@ public PollingSubscriberConnection(
8586
ackLatencyDistribution,
8687
flowController,
8788
executor,
89+
alarmsExecutor,
8890
clock);
8991
messageDispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS);
9092
this.maxDesiredPulledMessages =

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public StreamingSubscriberConnection(
7272
Channel channel,
7373
FlowController flowController,
7474
ScheduledExecutorService executor,
75+
@Nullable ScheduledExecutorService alarmsExecutor,
7576
ApiClock clock) {
7677
this.subscription = subscription;
7778
this.executor = executor;
@@ -85,6 +86,7 @@ public StreamingSubscriberConnection(
8586
ackLatencyDistribution,
8687
flowController,
8788
executor,
89+
alarmsExecutor,
8890
clock);
8991
messageDispatcher.setMessageDeadlineSeconds(streamAckDeadlineSeconds);
9092
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.concurrent.TimeUnit;
4747
import java.util.logging.Level;
4848
import java.util.logging.Logger;
49+
import javax.annotation.Nullable;
4950
import org.joda.time.Duration;
5051

5152
/**
@@ -93,6 +94,7 @@ public class Subscriber extends AbstractApiService {
9394
private final Duration ackExpirationPadding;
9495
private final Duration maxAckExtensionPeriod;
9596
private final ScheduledExecutorService executor;
97+
@Nullable private final ScheduledExecutorService alarmsExecutor;
9698
private final Distribution ackLatencyDistribution =
9799
new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);
98100
private final int numChannels;
@@ -138,6 +140,20 @@ public void close() throws IOException {
138140
}
139141
});
140142
}
143+
if (builder.alarmsExecutorProvider != null) {
144+
alarmsExecutor = builder.alarmsExecutorProvider.getExecutor();
145+
if (builder.alarmsExecutorProvider.shouldAutoClose()) {
146+
closeables.add(
147+
new AutoCloseable() {
148+
@Override
149+
public void close() throws IOException {
150+
alarmsExecutor.shutdown();
151+
}
152+
});
153+
}
154+
} else {
155+
alarmsExecutor = null;
156+
}
141157

142158
channelProvider = builder.channelProvider;
143159

@@ -261,6 +277,7 @@ private void startStreamingConnections() {
261277
channels.get(i),
262278
flowController,
263279
executor,
280+
alarmsExecutor,
264281
clock));
265282
}
266283
startConnections(
@@ -337,6 +354,7 @@ private void startPollingConnections() {
337354
flowController,
338355
flowControlSettings.getMaxOutstandingElementCount(),
339356
executor,
357+
alarmsExecutor,
340358
clock));
341359
}
342360
startConnections(
@@ -441,6 +459,7 @@ public static final class Builder {
441459
FlowControlSettings flowControlSettings = FlowControlSettings.getDefaultInstance();
442460

443461
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
462+
@Nullable ExecutorProvider alarmsExecutorProvider;
444463
ChannelProvider channelProvider =
445464
SubscriptionAdminSettings.defaultChannelProviderBuilder()
446465
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
@@ -520,6 +539,15 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) {
520539
return this;
521540
}
522541

542+
/**
543+
* Gives the ability to set a custom executor for managing lease extensions. If none is
544+
* provided a shared one will be used by all {@link Subscriber} instances.
545+
*/
546+
public Builder setLeaseAlarmsExecutorProvider(ExecutorProvider executorProvider) {
547+
this.alarmsExecutorProvider = Preconditions.checkNotNull(executorProvider);
548+
return this;
549+
}
550+
523551
/** Gives the ability to set a custom clock. */
524552
Builder setClock(ApiClock clock) {
525553
this.clock = Optional.of(clock);

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,7 @@ private void sendMessages(Iterable<String> ackIds) throws InterruptedException {
536536
private Builder getTestSubscriberBuilder(MessageReceiver receiver) {
537537
return Subscriber.defaultBuilder(TEST_SUBSCRIPTION, receiver)
538538
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
539+
.setLeaseAlarmsExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
539540
.setChannelProvider(FixedChannelProvider.create(testChannel))
540541
.setClock(fakeExecutor.getClock());
541542
}

0 commit comments

Comments
 (0)