Skip to content

Commit faadac0

Browse files
authored
---
yaml --- r: 8777 b: refs/heads/lesv-patch-1 c: d2419d4 h: refs/heads/master i: 8775: ab296a3
1 parent da44d1e commit faadac0

2 files changed

Lines changed: 8 additions & 5 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ refs/tags/v0.22.0: 18b298fe4bfe8ec2f20b0e0bf7ffdcce5cc3c5fe
6666
refs/heads/vam-google-patch-1: d0c8fee3a4074d0bf7360ce8c4f7f7223d0ee7b9
6767
refs/heads/vam-google-patch-CODEOWNERS: 2ac1616e25229e51d08a984708ef1918f91a35ee
6868
refs/heads/danoscarmike-patch-1: 7342a9916bce4ed00002c7202e2a16c5d46afaea
69-
refs/heads/lesv-patch-1: 8f7792580e9e3de9d3b8e1fb5f61f9dba413a660
69+
refs/heads/lesv-patch-1: d2419d40e9a28f1a79d17b793b740c3c6c140fc9
7070
refs/heads/ml-update-branch: 079dd6610017f5c51b9d1938c12d6d55b61513cf
7171
refs/heads/vkedia-patch-2: 7d8241388a9769a5c069334761b06c7012c878e7
7272
refs/heads/vkedia-patch-3: 4d128043acaa7db9160faf439d2ca6104e8a88cb

branches/lesv-patch-1/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.concurrent.ScheduledFuture;
4242
import java.util.concurrent.TimeUnit;
4343
import java.util.concurrent.atomic.AtomicBoolean;
44+
import java.util.concurrent.atomic.AtomicInteger;
4445
import java.util.concurrent.locks.Lock;
4546
import java.util.concurrent.locks.ReentrantLock;
4647
import java.util.logging.Level;
@@ -76,7 +77,9 @@ class MessageDispatcher {
7677
private final Set<String> pendingNacks;
7778

7879
private final Lock alarmsLock;
79-
private int messageDeadlineSeconds;
80+
// The deadline should be set by the subscriber connection before use,
81+
// but set it to some reasonable value just in case.
82+
private final AtomicInteger messageDeadlineSeconds = new AtomicInteger(10);
8083
private ScheduledFuture<?> ackDeadlineExtensionAlarm;
8184
private Instant nextAckDeadlineExtensionAlarmTime;
8285
private ScheduledFuture<?> pendingAcksAlarm;
@@ -276,11 +279,11 @@ public void stop() {
276279
}
277280

278281
public void setMessageDeadlineSeconds(int messageDeadlineSeconds) {
279-
this.messageDeadlineSeconds = messageDeadlineSeconds;
282+
this.messageDeadlineSeconds.set(messageDeadlineSeconds);
280283
}
281284

282285
public int getMessageDeadlineSeconds() {
283-
return messageDeadlineSeconds;
286+
return messageDeadlineSeconds.get();
284287
}
285288

286289
static class OutstandingMessageBatch {
@@ -336,7 +339,7 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
336339
}
337340

338341
Instant expiration = Instant.ofEpochMilli(clock.millisTime())
339-
.plusSeconds(messageDeadlineSeconds);
342+
.plusSeconds(messageDeadlineSeconds.get());
340343
synchronized (outstandingAckHandlers) {
341344
outstandingAckHandlers.add(
342345
new ExtensionJob(

0 commit comments

Comments
 (0)