Skip to content

Commit 501ba7d

Browse files
committed
Skip removed/re-added messages when scheduling new renewal
1 parent d4b8f6c commit 501ba7d

1 file changed

Lines changed: 12 additions & 1 deletion

File tree

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,19 @@ private void unsetAndScheduleNextRenewal() {
172172
private void scheduleNextRenewal() {
173173
// Schedules next renewal if there are still messages to process and no renewals scheduled that
174174
// could handle them, otherwise does nothing
175+
Message nextMessage;
176+
synchronized (lock) {
177+
Message peek = messageQueue.peek();
178+
// We remove from the queue messages that were removed from the ack deadline renewer (and
179+
// possibly re-added)
180+
while (peek != null && (!messageDeadlines.containsKey(peek.messageId())
181+
|| messageDeadlines.get(peek.messageId()) > peek.expectedDeadline())) {
182+
messageQueue.poll();
183+
peek = messageQueue.peek();
184+
}
185+
nextMessage = peek;
186+
}
175187
synchronized (futureLock) {
176-
Message nextMessage = messageQueue.peek();
177188
if (renewerFuture == null && nextMessage != null) {
178189
long delay =
179190
(nextMessage.expectedDeadline() - clock.millis()) - NEXT_RENEWAL_THRESHOLD_MILLIS;

0 commit comments

Comments
 (0)