Skip to content

Commit 18c3022

Browse files
authored
---
yaml --- r: 8605 b: refs/heads/master c: f6ed7fc h: refs/heads/master i: 8603: 70e3d0b
1 parent 669e2d3 commit 18c3022

3 files changed

Lines changed: 53 additions & 13 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
refs/heads/master: 931fb9a808d9c6f8c83d2effcae778f55aedfc7f
2+
refs/heads/master: f6ed7fc6ea10b05fc6a4013fd99bc9d56177e5f0
33
refs/heads/travis: 47e4fee4fd5af9b2a8ce46f23c72ec95f9b195b2
44
refs/heads/gh-pages: 6daca92127d91b7c2c99490080ecf8a13fa94cde
55
refs/tags/0.0.9: 22f1839238f66c39e67ed4dfdcd273b1ae2e8444

trunk/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,16 @@
2222
import com.google.api.gax.batching.FlowController.FlowControlException;
2323
import com.google.api.gax.core.Distribution;
2424
import com.google.cloud.pubsub.v1.MessageDispatcher.OutstandingMessageBatch.OutstandingMessage;
25+
import com.google.common.collect.ArrayListMultimap;
2526
import com.google.common.primitives.Ints;
2627
import com.google.common.util.concurrent.FutureCallback;
2728
import com.google.common.util.concurrent.Futures;
2829
import com.google.common.util.concurrent.SettableFuture;
2930
import com.google.pubsub.v1.PubsubMessage;
3031
import com.google.pubsub.v1.ReceivedMessage;
3132
import java.util.ArrayList;
33+
import java.util.Arrays;
34+
import java.util.Collection;
3235
import java.util.Collections;
3336
import java.util.Deque;
3437
import java.util.Iterator;
@@ -50,6 +53,7 @@
5053
import java.util.logging.Logger;
5154
import org.threeten.bp.Duration;
5255
import org.threeten.bp.Instant;
56+
import org.threeten.bp.temporal.ChronoUnit;
5357

5458
/**
5559
* Dispatches messages to a message receiver while handling the messages acking and lease
@@ -98,11 +102,12 @@ static class PendingModifyAckDeadline {
98102
final int deadlineExtensionSeconds;
99103

100104
PendingModifyAckDeadline(int deadlineExtensionSeconds, String... ackIds) {
101-
this.ackIds = new ArrayList<String>();
105+
this(deadlineExtensionSeconds, Arrays.asList(ackIds));
106+
}
107+
108+
private PendingModifyAckDeadline(int deadlineExtensionSeconds, Collection<String> ackIds) {
109+
this.ackIds = new ArrayList<String>(ackIds);
102110
this.deadlineExtensionSeconds = deadlineExtensionSeconds;
103-
for (String ackId : ackIds) {
104-
addAckId(ackId);
105-
}
106111
}
107112

108113
public void addAckId(String ackId) {
@@ -426,21 +431,39 @@ int computeDeadlineSeconds() {
426431

427432
@InternalApi
428433
void extendDeadlines() {
429-
List<String> acksToSend = Collections.<String>emptyList();
430-
PendingModifyAckDeadline modack = new PendingModifyAckDeadline(getMessageDeadlineSeconds());
434+
int extendSeconds = getMessageDeadlineSeconds();
435+
List<PendingModifyAckDeadline> modacks = new ArrayList<>();
436+
PendingModifyAckDeadline modack = new PendingModifyAckDeadline(extendSeconds);
431437
Instant now = now();
438+
Instant extendTo = now.plusSeconds(extendSeconds);
432439

440+
int count = 0;
433441
Iterator<Map.Entry<String, Instant>> it = pendingMessages.entrySet().iterator();
434442
while (it.hasNext()) {
435443
Map.Entry<String, Instant> entry = it.next();
436-
if (entry.getValue().isBefore(now)) {
437-
it.remove();
438-
} else {
439-
modack.ackIds.add(entry.getKey());
444+
String ackId = entry.getKey();
445+
Instant totalExpiration = entry.getValue();
446+
// TODO(pongad): PendingModifyAckDeadline is created to dance around polling pull,
447+
// since one modack RPC only takes one expiration.
448+
// Whenever we delete polling pull, we should also delete PendingModifyAckDeadline,
449+
// and just construct StreamingPullRequest directly.
450+
if (totalExpiration.isAfter(extendTo)) {
451+
modack.ackIds.add(ackId);
452+
count++;
453+
continue;
454+
}
455+
it.remove();
456+
if (totalExpiration.isAfter(now)) {
457+
int sec = Math.max(1, (int) now.until(totalExpiration, ChronoUnit.SECONDS));
458+
modacks.add(new PendingModifyAckDeadline(sec, ackId));
459+
count++;
440460
}
441461
}
462+
modacks.add(modack);
463+
logger.log(Level.FINER, "Sending {0} modacks", count);
442464

443-
ackProcessor.sendAckOperations(acksToSend, Collections.singletonList(modack));
465+
List<String> acksToSend = Collections.<String>emptyList();
466+
ackProcessor.sendAckOperations(acksToSend, modacks);
444467
}
445468

446469
@InternalApi

trunk/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,13 +155,30 @@ public void testExtension() throws Exception {
155155
}
156156

157157
@Test
158-
public void testExtension_GiveUp() throws Exception {
158+
public void testExtension_Close() throws Exception {
159159
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
160160
dispatcher.extendDeadlines();
161161
assertThat(sentModAcks)
162162
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
163+
sentModAcks.clear();
164+
165+
// Default total expiration is an hour (60*60 seconds). We normally would extend by 10s.
166+
// However, only extend by 5s here, since there's only 5s left before total expiration.
167+
clock.advance(60 * 60 - 5, TimeUnit.SECONDS);
168+
dispatcher.extendDeadlines();
169+
assertThat(sentModAcks).contains(ModAckItem.of(TEST_MESSAGE.getAckId(), 5));
170+
}
163171

172+
@Test
173+
public void testExtension_GiveUp() throws Exception {
174+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
175+
dispatcher.extendDeadlines();
176+
assertThat(sentModAcks)
177+
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
164178
sentModAcks.clear();
179+
180+
// If we run extendDeadlines after totalExpiration, we shouldn't send anything.
181+
// In particular, don't send negative modacks.
165182
clock.advance(1, TimeUnit.DAYS);
166183
dispatcher.extendDeadlines();
167184
assertThat(sentModAcks).isEmpty();

0 commit comments

Comments
 (0)