Skip to content

Commit ce47990

Browse files
committed
pr comment
1 parent 34aca11 commit ce47990

1 file changed

Lines changed: 19 additions & 0 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,32 @@ private void pullMessages(final Duration backoff) {
132132
PullRequest.newBuilder()
133133
.setSubscription(subscription)
134134
.setMaxMessages(maxDesiredPulledMessages)
135+
.setReturnImmediately(false)
135136
.build());
136137

137138
Futures.addCallback(
138139
pullResult,
139140
new FutureCallback<PullResponse>() {
140141
@Override
141142
public void onSuccess(PullResponse pullResponse) {
143+
if (pullResponse.getReceivedMessagesCount() == 0) {
144+
// No messages in response, possibly caught up in backlog, we backoff to avoid
145+
// slamming the server.
146+
pollingExecutor.schedule(
147+
new Runnable() {
148+
@Override
149+
public void run() {
150+
Duration newBackoff = backoff.multipliedBy(2);
151+
if (newBackoff.compareTo(MAX_BACKOFF) > 0) {
152+
newBackoff = MAX_BACKOFF;
153+
}
154+
pullMessages(newBackoff);
155+
}
156+
},
157+
backoff.toMillis(),
158+
TimeUnit.MILLISECONDS);
159+
return;
160+
}
142161
messageDispatcher.processReceivedMessages(
143162
pullResponse.getReceivedMessagesList(),
144163
new Runnable() {

0 commit comments

Comments
 (0)