Skip to content

Commit fdde1a2

Browse files
authored
---
yaml --- r: 8483 b: refs/heads/master c: dadb6ff h: refs/heads/master i: 8481: d6b2839 8479: 3ccbaaa
1 parent f9c7eae commit fdde1a2

6 files changed

Lines changed: 235 additions & 242 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
refs/heads/master: a53f7a8420f63ee5c0bc20ef820ae3afd5eb0e57
2+
refs/heads/master: dadb6ff9e2cfdce1945a071f177fd0699ee2dd27
33
refs/heads/travis: 47e4fee4fd5af9b2a8ce46f23c72ec95f9b195b2
44
refs/heads/gh-pages: 3e16a39145437096333db5811e5c0292719c1823
55
refs/tags/0.0.9: 22f1839238f66c39e67ed4dfdcd273b1ae2e8444

trunk/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 61 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,10 @@
3232
import java.util.ArrayList;
3333
import java.util.Collections;
3434
import java.util.Deque;
35-
import java.util.HashSet;
3635
import java.util.LinkedList;
3736
import java.util.List;
3837
import java.util.PriorityQueue;
39-
import java.util.Set;
38+
import java.util.concurrent.LinkedBlockingQueue;
4039
import java.util.concurrent.ScheduledExecutorService;
4140
import java.util.concurrent.ScheduledFuture;
4241
import java.util.concurrent.TimeUnit;
@@ -73,8 +72,9 @@ class MessageDispatcher {
7372
private final MessageWaiter messagesWaiter;
7473

7574
private final PriorityQueue<ExtensionJob> outstandingAckHandlers;
76-
private final Set<String> pendingAcks;
77-
private final Set<String> pendingNacks;
75+
private final LinkedBlockingQueue<String> pendingAcks = new LinkedBlockingQueue<>();
76+
private final LinkedBlockingQueue<String> pendingNacks = new LinkedBlockingQueue<>();
77+
private final LinkedBlockingQueue<String> pendingReceipts = new LinkedBlockingQueue<>();
7878

7979
private final Lock alarmsLock;
8080
// The deadline should be set by the subscriber connection before use,
@@ -185,47 +185,41 @@ private class AckHandler implements FutureCallback<AckReply> {
185185
receivedTimeMillis = clock.millisTime();
186186
}
187187

188+
private void onBoth(LinkedBlockingQueue<String> destination) {
189+
acked.getAndSet(true);
190+
destination.add(ackId);
191+
flowController.release(1, outstandingBytes);
192+
messagesWaiter.incrementPendingMessages(-1);
193+
processOutstandingBatches();
194+
}
195+
188196
@Override
189197
public void onFailure(Throwable t) {
190198
logger.log(
191199
Level.WARNING,
192200
"MessageReceiver failed to processes ack ID: " + ackId + ", the message will be nacked.",
193201
t);
194-
acked.getAndSet(true);
195-
synchronized (pendingNacks) {
196-
pendingNacks.add(ackId);
197-
}
198-
setupPendingAcksAlarm();
199-
flowController.release(1, outstandingBytes);
200-
messagesWaiter.incrementPendingMessages(-1);
201-
processOutstandingBatches();
202+
onBoth(pendingNacks);
202203
}
203204

204205
@Override
205206
public void onSuccess(AckReply reply) {
206-
acked.getAndSet(true);
207+
LinkedBlockingQueue<String> destination;
207208
switch (reply) {
208209
case ACK:
209-
synchronized (pendingAcks) {
210-
pendingAcks.add(ackId);
211-
}
210+
destination = pendingAcks;
212211
// Record the latency rounded to the next closest integer.
213212
ackLatencyDistribution.record(
214213
Ints.saturatedCast(
215214
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
216215
break;
217216
case NACK:
218-
synchronized (pendingNacks) {
219-
pendingNacks.add(ackId);
220-
}
217+
destination = pendingNacks;
221218
break;
222219
default:
223220
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
224221
}
225-
setupPendingAcksAlarm();
226-
flowController.release(1, outstandingBytes);
227-
messagesWaiter.incrementPendingMessages(-1);
228-
processOutstandingBatches();
222+
onBoth(destination);
229223
}
230224
}
231225

@@ -254,8 +248,6 @@ void sendAckOperations(
254248
this.flowController = flowController;
255249
this.outstandingMessageBatches = outstandingMessageBatches;
256250
outstandingAckHandlers = new PriorityQueue<>();
257-
pendingAcks = new HashSet<>();
258-
pendingNacks = new HashSet<>();
259251
// 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
260252
this.ackLatencyDistribution = ackLatencyDistribution;
261253
alarmsLock = new ReentrantLock();
@@ -264,6 +256,25 @@ void sendAckOperations(
264256
this.clock = clock;
265257
}
266258

259+
public void start() {
260+
pendingAcksAlarm =
261+
systemExecutor.scheduleWithFixedDelay(
262+
new Runnable() {
263+
@Override
264+
public void run() {
265+
try {
266+
processOutstandingAckOperations();
267+
} catch (Throwable t) {
268+
// Catch everything so that one run failing doesn't prevent subsequent runs.
269+
logger.log(Level.WARNING, "failed to send acks/nacks", t);
270+
}
271+
}
272+
},
273+
PENDING_ACKS_SEND_DELAY.toMillis(),
274+
PENDING_ACKS_SEND_DELAY.toMillis(),
275+
TimeUnit.MILLISECONDS);
276+
}
277+
267278
public void stop() {
268279
messagesWaiter.waitNoMessages();
269280
alarmsLock.lock();
@@ -272,6 +283,10 @@ public void stop() {
272283
ackDeadlineExtensionAlarm.cancel(true);
273284
ackDeadlineExtensionAlarm = null;
274285
}
286+
if (pendingAcksAlarm != null) {
287+
pendingAcksAlarm.cancel(false);
288+
pendingAcksAlarm = null;
289+
}
275290
} finally {
276291
alarmsLock.unlock();
277292
}
@@ -328,6 +343,9 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
328343
return;
329344
}
330345
messagesWaiter.incrementPendingMessages(messages.size());
346+
for (ReceivedMessage message : messages) {
347+
pendingReceipts.add(message.getAckId());
348+
}
331349

332350
OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
333351
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(messages.size());
@@ -421,32 +439,6 @@ public void run() {
421439
}
422440
}
423441

424-
private void setupPendingAcksAlarm() {
425-
alarmsLock.lock();
426-
try {
427-
if (pendingAcksAlarm == null) {
428-
pendingAcksAlarm =
429-
systemExecutor.schedule(
430-
new Runnable() {
431-
@Override
432-
public void run() {
433-
alarmsLock.lock();
434-
try {
435-
pendingAcksAlarm = null;
436-
} finally {
437-
alarmsLock.unlock();
438-
}
439-
processOutstandingAckOperations();
440-
}
441-
},
442-
PENDING_ACKS_SEND_DELAY.toMillis(),
443-
TimeUnit.MILLISECONDS);
444-
}
445-
} finally {
446-
alarmsLock.unlock();
447-
}
448-
}
449-
450442
private class AckDeadlineAlarm implements Runnable {
451443
@Override
452444
public void run() {
@@ -574,31 +566,26 @@ private void processOutstandingAckOperations(
574566
List<PendingModifyAckDeadline> ackDeadlineExtensions) {
575567
List<PendingModifyAckDeadline> modifyAckDeadlinesToSend =
576568
Lists.newArrayList(ackDeadlineExtensions);
577-
List<String> acksToSend = new ArrayList<>(pendingAcks.size());
578-
synchronized (pendingAcks) {
579-
if (!pendingAcks.isEmpty()) {
580-
try {
581-
acksToSend = new ArrayList<>(pendingAcks);
582-
logger.log(Level.FINER, "Sending {0} acks", acksToSend.size());
583-
} finally {
584-
pendingAcks.clear();
585-
}
586-
}
587-
}
569+
570+
List<String> acksToSend = new ArrayList<>();
571+
pendingAcks.drainTo(acksToSend);
572+
logger.log(Level.FINER, "Sending {0} acks", acksToSend.size());
573+
588574
PendingModifyAckDeadline nacksToSend = new PendingModifyAckDeadline(0);
589-
synchronized (pendingNacks) {
590-
if (!pendingNacks.isEmpty()) {
591-
try {
592-
for (String ackId : pendingNacks) {
593-
nacksToSend.addAckId(ackId);
594-
}
595-
logger.log(Level.FINER, "Sending {0} nacks", pendingNacks.size());
596-
} finally {
597-
pendingNacks.clear();
598-
}
599-
modifyAckDeadlinesToSend.add(nacksToSend);
600-
}
575+
pendingNacks.drainTo(nacksToSend.ackIds);
576+
logger.log(Level.FINER, "Sending {0} nacks", nacksToSend.ackIds.size());
577+
if (!nacksToSend.ackIds.isEmpty()) {
578+
modifyAckDeadlinesToSend.add(nacksToSend);
601579
}
580+
581+
PendingModifyAckDeadline receiptsToSend =
582+
new PendingModifyAckDeadline(getMessageDeadlineSeconds());
583+
pendingReceipts.drainTo(receiptsToSend.ackIds);
584+
logger.log(Level.FINER, "Sending {0} receipts", receiptsToSend.ackIds.size());
585+
if (!receiptsToSend.ackIds.isEmpty()) {
586+
modifyAckDeadlinesToSend.add(receiptsToSend);
587+
}
588+
602589
ackProcessor.sendAckOperations(acksToSend, modifyAckDeadlinesToSend);
603590
}
604591
}

trunk/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PollingSubscriberConnection.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ public PollingSubscriberConnection(
100100
@Override
101101
protected void doStart() {
102102
logger.config("Starting subscriber.");
103+
messageDispatcher.start();
103104
pullMessages(INITIAL_BACKOFF);
104105
notifyStarted();
105106
}

trunk/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public StreamingSubscriberConnection(
103103
@Override
104104
protected void doStart() {
105105
logger.config("Starting subscriber.");
106+
messageDispatcher.start();
106107
initialize();
107108
notifyStarted();
108109
}

trunk/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeSubscriberServiceImpl.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -295,46 +295,52 @@ public String waitForRegistedSubscription() throws InterruptedException {
295295

296296
public List<String> waitAndConsumeReceivedAcks(int expectedCount) throws InterruptedException {
297297
synchronized (acks) {
298-
while (acks.size() < expectedCount) {
299-
acks.wait();
300-
}
298+
waitAtLeast(acks, expectedCount);
301299
List<String> receivedAcksCopy = ImmutableList.copyOf(acks.subList(0, expectedCount));
302-
acks.removeAll(receivedAcksCopy);
300+
acks.subList(0, expectedCount).clear();
303301
return receivedAcksCopy;
304302
}
305303
}
306304

307305
public List<ModifyAckDeadline> waitAndConsumeModifyAckDeadlines(int expectedCount)
308306
throws InterruptedException {
309307
synchronized (modAckDeadlines) {
310-
while (modAckDeadlines.size() < expectedCount) {
311-
modAckDeadlines.wait();
312-
}
308+
waitAtLeast(modAckDeadlines, expectedCount);
313309
List<ModifyAckDeadline> modAckDeadlinesCopy =
314310
ImmutableList.copyOf(modAckDeadlines.subList(0, expectedCount));
315-
modAckDeadlines.removeAll(modAckDeadlinesCopy);
311+
modAckDeadlines.subList(0, expectedCount).clear();
316312
return modAckDeadlinesCopy;
317313
}
318314
}
319315

320316
public int waitForClosedStreams(int expectedCount) throws InterruptedException {
321317
synchronized (closedStreams) {
322-
while (closedStreams.size() < expectedCount) {
323-
closedStreams.wait();
324-
}
318+
waitAtLeast(closedStreams, expectedCount);
325319
return closedStreams.size();
326320
}
327321
}
328322

329323
public int waitForOpenedStreams(int expectedCount) throws InterruptedException {
330324
synchronized (openedStreams) {
331-
while (openedStreams.size() < expectedCount) {
332-
openedStreams.wait();
333-
}
325+
waitAtLeast(openedStreams, expectedCount);
334326
return openedStreams.size();
335327
}
336328
}
337329

330+
// wait until the collection has at least target number of elements.
331+
// caller MUST hold the monitor for the collection.
332+
private static void waitAtLeast(Collection<?> collection, int target)
333+
throws InterruptedException {
334+
long untilMillis = System.currentTimeMillis() + 20_000;
335+
while (collection.size() < target) {
336+
long now = System.currentTimeMillis();
337+
if (now >= untilMillis) {
338+
throw new IllegalStateException("timed out, last state: " + collection);
339+
}
340+
collection.wait(untilMillis - now);
341+
}
342+
}
343+
338344
public void waitForStreamAckDeadline(int expectedValue) throws InterruptedException {
339345
synchronized (messageAckDeadline) {
340346
while (messageAckDeadline.get() != expectedValue) {

0 commit comments

Comments
 (0)