2929import java .util .Map ;
3030import java .util .Objects ;
3131import java .util .Queue ;
32+ import java .util .concurrent .Future ;
3233import java .util .concurrent .ScheduledExecutorService ;
33- import java .util .concurrent .ScheduledFuture ;
3434import java .util .concurrent .TimeUnit ;
3535
3636/**
4141 */
4242class AckDeadlineRenewer implements AutoCloseable {
4343
44- private static final int MIN_DEADLINE_MILLISECONDS = 10_000 ;
45- private static final int RENEW_THRESHOLD_MILLISECONDS = 2_000 ;
44+ private static final int MIN_DEADLINE_MILLIS = 9_000 ;
45+ private static final int RENEW_THRESHOLD_MILLIS = 3_000 ;
46+ private static final int NEXT_RENEWAL_THRESHOLD_MILLIS = 1_000 ;
4647
4748 private final PubSub pubsub ;
4849 private final ScheduledExecutorService executor ;
4950 private final ExecutorFactory executorFactory ;
5051 private final Clock clock ;
5152 private final Queue <Message > messageQueue ;
5253 private final Map <MessageId , Long > messageDeadlines ;
53- private final ScheduledFuture <?> renewerFuture ;
5454 private final Object lock = new Object ();
55+ private final Object futureLock = new Object ();
56+ private Future <?> renewerFuture ;
57+ private boolean closed ;
5558
5659 /**
5760 * This class holds the identity of a message to renew: subscription and acknowledge id.
@@ -157,19 +160,38 @@ public String toString() {
157160 this .clock = options .clock ();
158161 this .messageQueue = new LinkedList <>();
159162 this .messageDeadlines = new HashMap <>();
160- this .renewerFuture = this .executor .scheduleWithFixedDelay (new Runnable () {
161- @ Override
162- public void run () {
163- renewAckDeadlines ();
163+ }
164+
165+ private void unsetAndScheduleNextRenewal () {
166+ synchronized (futureLock ) {
167+ renewerFuture = null ;
168+ scheduleNextRenewal ();
169+ }
170+ }
171+
172+ private void scheduleNextRenewal () {
173+ // Schedules next renewal if there are still messages to process and no renewals scheduled that
174+ // could handle them, otherwise does nothing
175+ synchronized (futureLock ) {
176+ Message nextMessage = messageQueue .peek ();
177+ if (renewerFuture == null && nextMessage != null ) {
178+ long delay =
179+ (nextMessage .expectedDeadline () - clock .millis ()) - NEXT_RENEWAL_THRESHOLD_MILLIS ;
180+ renewerFuture = executor .schedule (new Runnable () {
181+ @ Override
182+ public void run () {
183+ renewAckDeadlines ();
184+ }
185+ }, delay , TimeUnit .MILLISECONDS );
164186 }
165- }, 0 , 1 , TimeUnit . SECONDS );
187+ }
166188 }
167189
168190 private void renewAckDeadlines () {
169191 ListMultimap <String , String > messagesToRenewNext = LinkedListMultimap .create ();
170- // At every activation we renew all ack deadlines that will expier in the following
171- // RENEW_THRESHOLD_MILLISECONDS
172- long threshold = clock .millis () + RENEW_THRESHOLD_MILLISECONDS ;
192+ // At every activation we renew all ack deadlines that will expire in the following
193+ // RENEW_THRESHOLD_MILLIS
194+ long threshold = clock .millis () + RENEW_THRESHOLD_MILLIS ;
173195 Message message ;
174196 while ((message = nextMessageToRenew (threshold )) != null ) {
175197 // If the expected deadline is null the message was removed and we must ignore it, otherwise
@@ -180,9 +202,10 @@ private void renewAckDeadlines() {
180202 }
181203 for (Map .Entry <String , List <String >> entry : Multimaps .asMap (messagesToRenewNext ).entrySet ()) {
182204 // We send all ack deadline renewals for a subscription
183- pubsub .modifyAckDeadlineAsync (entry .getKey (), MIN_DEADLINE_MILLISECONDS ,
205+ pubsub .modifyAckDeadlineAsync (entry .getKey (), MIN_DEADLINE_MILLIS ,
184206 TimeUnit .MILLISECONDS , entry .getValue ());
185207 }
208+ unsetAndScheduleNextRenewal ();
186209 }
187210
188211 private Message nextMessageToRenew (long threshold ) {
@@ -211,39 +234,41 @@ private Message nextMessageToRenew(long threshold) {
211234 /**
212235 * Adds a new message for which the acknowledge deadline should be automatically renewed. The
213236 * message is identified by the subscription from which it was pulled and its acknowledge id.
214- * Auto-renewal will take place until the message is removed (see {@link #remove(String, String)}
215- * or {@link #remove(String, Iterable )}).
237+ * Auto-renewal will take place until the message is removed (see
238+ * {@link #remove(String, String )}).
216239 *
217240 * @param subscription the subscription from which the message has been pulled
218241 * @param ackId the message's acknowledge id
219242 */
220243 void add (String subscription , String ackId ) {
221244 synchronized (lock ) {
222- long deadline = clock .millis () + MIN_DEADLINE_MILLISECONDS ;
245+ long deadline = clock .millis () + MIN_DEADLINE_MILLIS ;
223246 Message message = new Message (new MessageId (subscription , ackId ), deadline );
224247 messageQueue .add (message );
225248 messageDeadlines .put (message .messageId (), deadline );
226249 }
250+ scheduleNextRenewal ();
227251 }
228252
229253 /**
230254 * Adds new messages for which the acknowledge deadlined should be automatically renewed. The
231255 * messages are identified by the subscription from which they were pulled and their
232256 * acknowledge id. Auto-renewal will take place until the messages are removed (see
233- * {@link #remove(String, String)} or {@link #remove(String, Iterable)} ).
257+ * {@link #remove(String, String)}).
234258 *
235259 * @param subscription the subscription from which the messages have been pulled
236260 * @param ackIds the acknowledge ids of the messages
237261 */
238262 void add (String subscription , Iterable <String > ackIds ) {
239263 synchronized (lock ) {
240- long deadline = clock .millis () + MIN_DEADLINE_MILLISECONDS ;
264+ long deadline = clock .millis () + MIN_DEADLINE_MILLIS ;
241265 for (String ackId : ackIds ) {
242266 Message message = new Message (new MessageId (subscription , ackId ), deadline );
243267 messageQueue .add (message );
244268 messageDeadlines .put (message .messageId (), deadline );
245269 }
246270 }
271+ scheduleNextRenewal ();
247272 }
248273
249274 /**
@@ -262,7 +287,19 @@ void remove(String subscription, String ackId) {
262287
263288 @ Override
264289 public void close () throws Exception {
265- renewerFuture .cancel (false );
290+ if (closed ) {
291+ return ;
292+ }
293+ closed = true ;
294+ synchronized (lock ) {
295+ messageDeadlines .clear ();
296+ messageQueue .clear ();
297+ }
298+ synchronized (futureLock ) {
299+ if (renewerFuture != null ) {
300+ renewerFuture .cancel (true );
301+ }
302+ }
266303 executorFactory .release (executor );
267304 }
268305}
0 commit comments