File tree Expand file tree Collapse file tree
google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1 Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -132,13 +132,32 @@ private void pullMessages(final Duration backoff) {
132132 PullRequest .newBuilder ()
133133 .setSubscription (subscription )
134134 .setMaxMessages (maxDesiredPulledMessages )
135+ .setReturnImmediately (false )
135136 .build ());
136137
137138 Futures .addCallback (
138139 pullResult ,
139140 new FutureCallback <PullResponse >() {
140141 @ Override
141142 public void onSuccess (PullResponse pullResponse ) {
143+ if (pullResponse .getReceivedMessagesCount () == 0 ) {
144+ // No messages in response, possibly caught up in backlog, we backoff to avoid
145+ // slamming the server.
146+ pollingExecutor .schedule (
147+ new Runnable () {
148+ @ Override
149+ public void run () {
150+ Duration newBackoff = backoff .multipliedBy (2 );
151+ if (newBackoff .compareTo (MAX_BACKOFF ) > 0 ) {
152+ newBackoff = MAX_BACKOFF ;
153+ }
154+ pullMessages (newBackoff );
155+ }
156+ },
157+ backoff .toMillis (),
158+ TimeUnit .MILLISECONDS );
159+ return ;
160+ }
142161 messageDispatcher .processReceivedMessages (
143162 pullResponse .getReceivedMessagesList (),
144163 new Runnable () {
You can’t perform that action at this time.
0 commit comments