|
41 | 41 | import java.util.concurrent.ScheduledFuture; |
42 | 42 | import java.util.concurrent.TimeUnit; |
43 | 43 | import java.util.concurrent.atomic.AtomicBoolean; |
| 44 | +import java.util.concurrent.atomic.AtomicInteger; |
44 | 45 | import java.util.concurrent.locks.Lock; |
45 | 46 | import java.util.concurrent.locks.ReentrantLock; |
46 | 47 | import java.util.logging.Level; |
@@ -76,7 +77,9 @@ class MessageDispatcher { |
76 | 77 | private final Set<String> pendingNacks; |
77 | 78 |
|
78 | 79 | private final Lock alarmsLock; |
79 | | - private int messageDeadlineSeconds; |
| 80 | + // The deadline should be set by the subscriber connection before use, |
| 81 | + // but set it to some reasonable value just in case. |
| 82 | + private final AtomicInteger messageDeadlineSeconds = new AtomicInteger(10); |
80 | 83 | private ScheduledFuture<?> ackDeadlineExtensionAlarm; |
81 | 84 | private Instant nextAckDeadlineExtensionAlarmTime; |
82 | 85 | private ScheduledFuture<?> pendingAcksAlarm; |
@@ -276,11 +279,11 @@ public void stop() { |
276 | 279 | } |
277 | 280 |
|
278 | 281 | public void setMessageDeadlineSeconds(int messageDeadlineSeconds) { |
279 | | - this.messageDeadlineSeconds = messageDeadlineSeconds; |
| 282 | + this.messageDeadlineSeconds.set(messageDeadlineSeconds); |
280 | 283 | } |
281 | 284 |
|
282 | 285 | public int getMessageDeadlineSeconds() { |
283 | | - return messageDeadlineSeconds; |
| 286 | + return messageDeadlineSeconds.get(); |
284 | 287 | } |
285 | 288 |
|
286 | 289 | static class OutstandingMessageBatch { |
@@ -336,7 +339,7 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don |
336 | 339 | } |
337 | 340 |
|
338 | 341 | Instant expiration = Instant.ofEpochMilli(clock.millisTime()) |
339 | | - .plusSeconds(messageDeadlineSeconds); |
| 342 | + .plusSeconds(messageDeadlineSeconds.get()); |
340 | 343 | synchronized (outstandingAckHandlers) { |
341 | 344 | outstandingAckHandlers.add( |
342 | 345 | new ExtensionJob( |
|
0 commit comments