File tree Expand file tree Collapse file tree
gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -172,8 +172,19 @@ private void unsetAndScheduleNextRenewal() {
172172 private void scheduleNextRenewal () {
173173 // Schedules next renewal if there are still messages to process and no renewals scheduled that
174174 // could handle them, otherwise does nothing
175+ Message nextMessage ;
176+ synchronized (lock ) {
177+ Message peek = messageQueue .peek ();
178+ // We remove from the queue messages that were removed from the ack deadline renewer (and
179+ // possibly re-added)
180+ while (peek != null && (!messageDeadlines .containsKey (peek .messageId ())
181+ || messageDeadlines .get (peek .messageId ()) > peek .expectedDeadline ())) {
182+ messageQueue .poll ();
183+ peek = messageQueue .peek ();
184+ }
185+ nextMessage = peek ;
186+ }
175187 synchronized (futureLock ) {
176- Message nextMessage = messageQueue .peek ();
177188 if (renewerFuture == null && nextMessage != null ) {
178189 long delay =
179190 (nextMessage .expectedDeadline () - clock .millis ()) - NEXT_RENEWAL_THRESHOLD_MILLIS ;
You can’t perform that action at this time.
0 commit comments