Skip to content

Commit d4b8f6c

Browse files
committed
Refactor renewer tests and wake up renewer only when needed
1 parent 01dbb40 commit d4b8f6c

2 files changed

Lines changed: 248 additions & 164 deletions

File tree

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java

Lines changed: 56 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import java.util.Map;
3030
import java.util.Objects;
3131
import java.util.Queue;
32+
import java.util.concurrent.Future;
3233
import java.util.concurrent.ScheduledExecutorService;
33-
import java.util.concurrent.ScheduledFuture;
3434
import java.util.concurrent.TimeUnit;
3535

3636
/**
@@ -41,17 +41,20 @@
4141
*/
4242
class AckDeadlineRenewer implements AutoCloseable {
4343

44-
private static final int MIN_DEADLINE_MILLISECONDS = 10_000;
45-
private static final int RENEW_THRESHOLD_MILLISECONDS = 2_000;
44+
private static final int MIN_DEADLINE_MILLIS = 9_000;
45+
private static final int RENEW_THRESHOLD_MILLIS = 3_000;
46+
private static final int NEXT_RENEWAL_THRESHOLD_MILLIS = 1_000;
4647

4748
private final PubSub pubsub;
4849
private final ScheduledExecutorService executor;
4950
private final ExecutorFactory executorFactory;
5051
private final Clock clock;
5152
private final Queue<Message> messageQueue;
5253
private final Map<MessageId, Long> messageDeadlines;
53-
private final ScheduledFuture<?> renewerFuture;
5454
private final Object lock = new Object();
55+
private final Object futureLock = new Object();
56+
private Future<?> renewerFuture;
57+
private boolean closed;
5558

5659
/**
5760
* This class holds the identity of a message to renew: subscription and acknowledge id.
@@ -157,19 +160,38 @@ public String toString() {
157160
this.clock = options.clock();
158161
this.messageQueue = new LinkedList<>();
159162
this.messageDeadlines = new HashMap<>();
160-
this.renewerFuture = this.executor.scheduleWithFixedDelay(new Runnable() {
161-
@Override
162-
public void run() {
163-
renewAckDeadlines();
163+
}
164+
165+
private void unsetAndScheduleNextRenewal() {
166+
synchronized (futureLock) {
167+
renewerFuture = null;
168+
scheduleNextRenewal();
169+
}
170+
}
171+
172+
private void scheduleNextRenewal() {
173+
// Schedules next renewal if there are still messages to process and no renewals scheduled that
174+
// could handle them, otherwise does nothing
175+
synchronized (futureLock) {
176+
Message nextMessage = messageQueue.peek();
177+
if (renewerFuture == null && nextMessage != null) {
178+
long delay =
179+
(nextMessage.expectedDeadline() - clock.millis()) - NEXT_RENEWAL_THRESHOLD_MILLIS;
180+
renewerFuture = executor.schedule(new Runnable() {
181+
@Override
182+
public void run() {
183+
renewAckDeadlines();
184+
}
185+
}, delay, TimeUnit.MILLISECONDS);
164186
}
165-
}, 0, 1, TimeUnit.SECONDS);
187+
}
166188
}
167189

168190
private void renewAckDeadlines() {
169191
ListMultimap<String, String> messagesToRenewNext = LinkedListMultimap.create();
170-
// At every activation we renew all ack deadlines that will expier in the following
171-
// RENEW_THRESHOLD_MILLISECONDS
172-
long threshold = clock.millis() + RENEW_THRESHOLD_MILLISECONDS;
192+
// At every activation we renew all ack deadlines that will expire in the following
193+
// RENEW_THRESHOLD_MILLIS
194+
long threshold = clock.millis() + RENEW_THRESHOLD_MILLIS;
173195
Message message;
174196
while ((message = nextMessageToRenew(threshold)) != null) {
175197
// If the expected deadline is null the message was removed and we must ignore it, otherwise
@@ -180,9 +202,10 @@ private void renewAckDeadlines() {
180202
}
181203
for (Map.Entry<String, List<String>> entry : Multimaps.asMap(messagesToRenewNext).entrySet()) {
182204
// We send all ack deadline renewals for a subscription
183-
pubsub.modifyAckDeadlineAsync(entry.getKey(), MIN_DEADLINE_MILLISECONDS,
205+
pubsub.modifyAckDeadlineAsync(entry.getKey(), MIN_DEADLINE_MILLIS,
184206
TimeUnit.MILLISECONDS, entry.getValue());
185207
}
208+
unsetAndScheduleNextRenewal();
186209
}
187210

188211
private Message nextMessageToRenew(long threshold) {
@@ -211,39 +234,41 @@ private Message nextMessageToRenew(long threshold) {
211234
/**
212235
* Adds a new message for which the acknowledge deadline should be automatically renewed. The
213236
* message is identified by the subscription from which it was pulled and its acknowledge id.
214-
* Auto-renewal will take place until the message is removed (see {@link #remove(String, String)}
215-
* or {@link #remove(String, Iterable)}).
237+
* Auto-renewal will take place until the message is removed (see
238+
* {@link #remove(String, String)}).
216239
*
217240
* @param subscription the subscription from which the message has been pulled
218241
* @param ackId the message's acknowledge id
219242
*/
220243
void add(String subscription, String ackId) {
221244
synchronized (lock) {
222-
long deadline = clock.millis() + MIN_DEADLINE_MILLISECONDS;
245+
long deadline = clock.millis() + MIN_DEADLINE_MILLIS;
223246
Message message = new Message(new MessageId(subscription, ackId), deadline);
224247
messageQueue.add(message);
225248
messageDeadlines.put(message.messageId(), deadline);
226249
}
250+
scheduleNextRenewal();
227251
}
228252

229253
/**
230254
* Adds new messages for which the acknowledge deadlined should be automatically renewed. The
231255
* messages are identified by the subscription from which they were pulled and their
232256
* acknowledge id. Auto-renewal will take place until the messages are removed (see
233-
* {@link #remove(String, String)} or {@link #remove(String, Iterable)}).
257+
* {@link #remove(String, String)}).
234258
*
235259
* @param subscription the subscription from which the messages have been pulled
236260
* @param ackIds the acknowledge ids of the messages
237261
*/
238262
void add(String subscription, Iterable<String> ackIds) {
239263
synchronized (lock) {
240-
long deadline = clock.millis() + MIN_DEADLINE_MILLISECONDS;
264+
long deadline = clock.millis() + MIN_DEADLINE_MILLIS;
241265
for (String ackId : ackIds) {
242266
Message message = new Message(new MessageId(subscription, ackId), deadline);
243267
messageQueue.add(message);
244268
messageDeadlines.put(message.messageId(), deadline);
245269
}
246270
}
271+
scheduleNextRenewal();
247272
}
248273

249274
/**
@@ -262,7 +287,19 @@ void remove(String subscription, String ackId) {
262287

263288
@Override
264289
public void close() throws Exception {
265-
renewerFuture.cancel(false);
290+
if (closed) {
291+
return;
292+
}
293+
closed = true;
294+
synchronized (lock) {
295+
messageDeadlines.clear();
296+
messageQueue.clear();
297+
}
298+
synchronized (futureLock) {
299+
if (renewerFuture != null) {
300+
renewerFuture.cancel(true);
301+
}
302+
}
266303
executorFactory.release(executor);
267304
}
268305
}

0 commit comments

Comments
 (0)