Skip to content

Commit bbfb6db

Browse files
authored
pubsub: GetSubscription only once (#2127)
Previous each subscriber connection makes one GetSubscription call to get subscription detail. This spams the service with many Get requests, that will likely all return the same information. This PR makes Subscriber call Get only once, before starting all connections. Fixes #2111.
1 parent 6b47486 commit bbfb6db

4 files changed

Lines changed: 54 additions & 33 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PollingSubscriberConnection.java

Lines changed: 14 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.google.common.util.concurrent.Futures;
2929
import com.google.common.util.concurrent.ListenableFuture;
3030
import com.google.pubsub.v1.AcknowledgeRequest;
31-
import com.google.pubsub.v1.GetSubscriptionRequest;
3231
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
3332
import com.google.pubsub.v1.PullRequest;
3433
import com.google.pubsub.v1.PullResponse;
@@ -46,23 +45,24 @@
4645
* Implementation of {@link AckProcessor} based on Cloud Pub/Sub pull and acknowledge operations.
4746
*/
4847
final class PollingSubscriberConnection extends AbstractApiService implements AckProcessor {
48+
static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(60);
49+
4950
private static final int MAX_PER_REQUEST_CHANGES = 1000;
50-
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(10);
5151
private static final int DEFAULT_MAX_MESSAGES = 1000;
5252
private static final Duration INITIAL_BACKOFF = Duration.ofMillis(100); // 100ms
5353
private static final Duration MAX_BACKOFF = Duration.ofSeconds(10); // 10s
5454

5555
private static final Logger logger =
5656
Logger.getLogger(PollingSubscriberConnection.class.getName());
5757

58-
private final String subscription;
58+
private final Subscription subscription;
5959
private final ScheduledExecutorService pollingExecutor;
6060
private final SubscriberFutureStub stub;
6161
private final MessageDispatcher messageDispatcher;
6262
private final int maxDesiredPulledMessages;
6363

6464
public PollingSubscriberConnection(
65-
String subscription,
65+
Subscription subscription,
6666
MessageReceiver receiver,
6767
Duration ackExpirationPadding,
6868
Duration maxAckExtensionPeriod,
@@ -87,7 +87,7 @@ public PollingSubscriberConnection(
8787
executor,
8888
systemExecutor,
8989
clock);
90-
messageDispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS);
90+
messageDispatcher.setMessageDeadlineSeconds(subscription.getAckDeadlineSeconds());
9191
this.maxDesiredPulledMessages =
9292
maxDesiredPulledMessages != null
9393
? Ints.saturatedCast(maxDesiredPulledMessages)
@@ -97,27 +97,8 @@ public PollingSubscriberConnection(
9797
@Override
9898
protected void doStart() {
9999
logger.config("Starting subscriber.");
100-
ListenableFuture<Subscription> subscriptionInfo =
101-
stub.withDeadlineAfter(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
102-
.getSubscription(
103-
GetSubscriptionRequest.newBuilder().setSubscription(subscription).build());
104-
105-
Futures.addCallback(
106-
subscriptionInfo,
107-
new FutureCallback<Subscription>() {
108-
@Override
109-
public void onSuccess(Subscription result) {
110-
messageDispatcher.setMessageDeadlineSeconds(result.getAckDeadlineSeconds());
111-
pullMessages(INITIAL_BACKOFF);
112-
notifyStarted();
113-
}
114-
115-
@Override
116-
public void onFailure(Throwable cause) {
117-
notifyFailed(cause);
118-
}
119-
},
120-
pollingExecutor);
100+
pullMessages(INITIAL_BACKOFF);
101+
notifyStarted();
121102
}
122103

123104
@Override
@@ -126,14 +107,14 @@ protected void doStop() {
126107
notifyStopped();
127108
}
128109

129-
private void pullMessages(final Duration backoff) {
110+
private ListenableFuture<PullResponse> pullMessages(final Duration backoff) {
130111
if (!isAlive()) {
131-
return;
112+
return Futures.immediateCancelledFuture();
132113
}
133114
ListenableFuture<PullResponse> pullResult =
134115
stub.pull(
135116
PullRequest.newBuilder()
136-
.setSubscription(subscription)
117+
.setSubscription(subscription.getName())
137118
.setMaxMessages(maxDesiredPulledMessages)
138119
.setReturnImmediately(false)
139120
.build());
@@ -200,6 +181,8 @@ public void run() {
200181
}
201182
},
202183
pollingExecutor);
184+
185+
return pullResult;
203186
}
204187

205188
private boolean isAlive() {
@@ -219,7 +202,7 @@ public void sendAckOperations(
219202
stub.withDeadlineAfter(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
220203
.modifyAckDeadline(
221204
ModifyAckDeadlineRequest.newBuilder()
222-
.setSubscription(subscription)
205+
.setSubscription(subscription.getName())
223206
.addAllAckIds(ackIdChunk)
224207
.setAckDeadlineSeconds(modifyAckDeadline.deadlineExtensionSeconds)
225208
.build());
@@ -230,7 +213,7 @@ public void sendAckOperations(
230213
stub.withDeadlineAfter(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
231214
.acknowledge(
232215
AcknowledgeRequest.newBuilder()
233-
.setSubscription(subscription)
216+
.setSubscription(subscription.getName())
234217
.addAllAckIds(ackChunk)
235218
.build());
236219
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@
3535
import com.google.common.base.Optional;
3636
import com.google.common.base.Preconditions;
3737
import com.google.common.primitives.Ints;
38+
import com.google.pubsub.v1.GetSubscriptionRequest;
3839
import com.google.pubsub.v1.SubscriberGrpc;
3940
import com.google.pubsub.v1.SubscriberGrpc.SubscriberFutureStub;
41+
import com.google.pubsub.v1.Subscription;
4042
import com.google.pubsub.v1.SubscriptionName;
4143
import io.grpc.CallCredentials;
4244
import io.grpc.ManagedChannel;
@@ -282,19 +284,34 @@ protected void doStop() {
282284
}
283285
}
284286

287+
// Starts polling connections. Blocks until all connections declare themselves running.
285288
private void startPollingConnections() throws IOException {
286289
synchronized (pollingSubscriberConnections) {
287290
Credentials credentials = credentialsProvider.getCredentials();
288291
CallCredentials callCredentials =
289292
credentials == null ? null : MoreCallCredentials.from(credentials);
293+
294+
SubscriberGrpc.SubscriberBlockingStub getSubStub =
295+
SubscriberGrpc.newBlockingStub(channels.get(0))
296+
.withDeadlineAfter(
297+
PollingSubscriberConnection.DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
298+
if (callCredentials != null) {
299+
getSubStub = getSubStub.withCallCredentials(callCredentials);
300+
}
301+
Subscription subscriptionInfo =
302+
getSubStub.getSubscription(
303+
GetSubscriptionRequest.newBuilder()
304+
.setSubscription(cachedSubscriptionNameString)
305+
.build());
306+
290307
for (int i = 0; i < numChannels; i++) {
291308
SubscriberFutureStub stub = SubscriberGrpc.newFutureStub(channels.get(i));
292309
if (callCredentials != null) {
293310
stub = stub.withCallCredentials(callCredentials);
294311
}
295312
pollingSubscriberConnections.add(
296313
new PollingSubscriberConnection(
297-
cachedSubscriptionNameString,
314+
subscriptionInfo,
298315
receiver,
299316
ackExpirationPadding,
300317
maxAckExtensionPeriod,

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeSubscriberServiceImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.google.cloud.pubsub.v1;
1818

1919
import com.google.api.client.util.Preconditions;
20+
import com.google.common.annotations.VisibleForTesting;
2021
import com.google.common.collect.ImmutableList;
2122
import com.google.protobuf.Empty;
2223
import com.google.pubsub.v1.AcknowledgeRequest;
@@ -51,6 +52,7 @@ class FakeSubscriberServiceImpl extends SubscriberImplBase {
5152
private String subscription = "";
5253
private final AtomicInteger messageAckDeadline =
5354
new AtomicInteger(Subscriber.MIN_ACK_DEADLINE_SECONDS);
55+
private final AtomicInteger getSubscriptionCalled = new AtomicInteger();
5456
private final List<Stream> openedStreams = new ArrayList<>();
5557
private final List<Stream> closedStreams = new ArrayList<>();
5658
private final List<String> acks = new ArrayList<>();
@@ -225,6 +227,7 @@ public void enqueuePullResponse(PullResponse response) {
225227
@Override
226228
public void getSubscription(
227229
GetSubscriptionRequest request, StreamObserver<Subscription> responseObserver) {
230+
getSubscriptionCalled.incrementAndGet();
228231
responseObserver.onNext(
229232
Subscription.newBuilder()
230233
.setName(request.getSubscription())
@@ -234,6 +237,12 @@ public void getSubscription(
234237
responseObserver.onCompleted();
235238
}
236239

240+
/** Returns the number of times getSubscription is called. */
241+
@VisibleForTesting
242+
int getSubscriptionCalledCount() {
243+
return getSubscriptionCalled.get();
244+
}
245+
237246
@Override
238247
public void pull(PullRequest request, StreamObserver<PullResponse> responseObserver) {
239248
synchronized (receivedPullRequest) {

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import java.util.List;
4848
import java.util.concurrent.CountDownLatch;
4949
import java.util.concurrent.LinkedBlockingQueue;
50-
import org.threeten.bp.Duration;
5150
import org.junit.After;
5251
import org.junit.Before;
5352
import org.junit.Rule;
@@ -56,6 +55,7 @@
5655
import org.junit.runner.RunWith;
5756
import org.junit.runners.Parameterized;
5857
import org.junit.runners.Parameterized.Parameters;
58+
import org.threeten.bp.Duration;
5959

6060
/** Tests for {@link Subscriber}. */
6161
@RunWith(Parameterized.class)
@@ -203,6 +203,18 @@ public void testAckSingleMessage() throws Exception {
203203
assertEquivalent(testAckIds, fakeSubscriberServiceImpl.waitAndConsumeReceivedAcks(1));
204204
}
205205

206+
@Test
207+
public void testGetSubscriptionOnce() throws Exception {
208+
Subscriber subscriber = startSubscriber(getTestSubscriberBuilder(testReceiver));
209+
210+
sendMessages(ImmutableList.of("A"));
211+
212+
// Trigger ack sending
213+
subscriber.stopAsync().awaitTerminated();
214+
215+
assertEquals(1, fakeSubscriberServiceImpl.getSubscriptionCalledCount());
216+
}
217+
206218
@Test
207219
public void testNackSingleMessage() throws Exception {
208220
Subscriber subscriber = startSubscriber(getTestSubscriberBuilder(testReceiver));

0 commit comments

Comments
 (0)