Skip to content

Commit 3c23da1

Browse files
authored
---
yaml --- r: 7797 b: refs/heads/tswast-patch-1 c: 2c858ee h: refs/heads/master i: 7795: 93f2edf
1 parent ef18c6f commit 3c23da1

3 files changed

Lines changed: 24 additions & 19 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,5 @@ refs/tags/v0.18.0: 9d193c4c4b9d1c6f21515dd8e50836b9194ec9bb
5757
refs/tags/v0.19.0: e67b56e4d8dad5f9a7b38c9b2107c23c828f2ed5
5858
refs/tags/v0.20.0: 839f7fb7156535146aa1cb2c5aadd8d375d854e8
5959
refs/tags/v0.20.1: 370471f437f1f4f68a11e068df5cd6bf39edb1fa
60-
refs/heads/tswast-patch-1: 4e45df5efe48002700227f98fa13ea76ce3cf174
60+
refs/heads/tswast-patch-1: 2c858ee0383f7b9063758ce9e401aab7fba1b94f
6161
refs/heads/pubsub-streaming-pull: 19262b752ee874eb2ca3b950eb2aef44d5a5267b

branches/tswast-patch-1/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,6 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
272272
}
273273
Instant expiration = now.plus(messageDeadlineSeconds * 1000);
274274
logger.debug("Received {} messages at {}", responseMessages.size(), now);
275-
setupNextAckDeadlineExtensionAlarm(expiration);
276275

277276
messagesWaiter.incrementPendingMessages(responseMessages.size());
278277
Iterator<AckHandler> acksIterator = ackHandlers.iterator();
@@ -288,15 +287,11 @@ public void run() {
288287
});
289288
}
290289

291-
// There is a race condition. setupNextAckDeadlineExtensionAlarm might set
292-
// an alarm that fires before this block can run.
293-
// The fix is to move setup below this block, but doing so aggravates another
294-
// race condition.
295-
// TODO(pongad): Fix both races.
296290
synchronized (outstandingAckHandlers) {
297291
outstandingAckHandlers.add(
298292
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers));
299293
}
294+
setupNextAckDeadlineExtensionAlarm(expiration);
300295

301296
try {
302297
flowController.reserve(receivedMessagesCount, totalByteCount);

branches/tswast-patch-1/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakeScheduledExecutorService.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -79,32 +79,41 @@ public ScheduledFuture<?> scheduleWithFixedDelay(
7979
Duration.millis(unit.toMillis(initialDelay)), command, PendingCallableType.FIXED_DELAY));
8080
}
8181

82-
public void tick(long time, TimeUnit unit) {
83-
advanceTime(Duration.millis(unit.toMillis(time)));
84-
}
85-
8682
/**
8783
* This will advance the reference time of the executor and execute (in the same thread) any
8884
* outstanding callable which execution time has passed.
8985
*/
9086
public void advanceTime(Duration toAdvance) {
9187
clock.advance(toAdvance.getMillis(), TimeUnit.MILLISECONDS);
88+
work();
89+
}
90+
91+
private void work() {
9292
DateTime cmpTime = new DateTime(clock.millis());
9393

94-
synchronized (pendingCallables) {
95-
while (!pendingCallables.isEmpty()
96-
&& pendingCallables.peek().getScheduledTime().compareTo(cmpTime) <= 0) {
97-
try {
98-
pendingCallables.poll().call();
99-
if (shutdown.get() && pendingCallables.isEmpty()) {
100-
pendingCallables.notifyAll();
101-
}
94+
for (;;) {
95+
PendingCallable<?> callable = null;
96+
synchronized (pendingCallables) {
97+
if (pendingCallables.isEmpty() || pendingCallables.peek().getScheduledTime().isAfter(cmpTime)) {
98+
break;
99+
}
100+
callable = pendingCallables.poll();
101+
}
102+
if (callable != null) {
103+
try{
104+
callable.call();
102105
} catch (Exception e) {
103106
// We ignore any callable exception, which should be set to the future but not relevant to
104107
// advanceTime.
105108
}
106109
}
107110
}
111+
112+
synchronized (pendingCallables) {
113+
if (shutdown.get() && pendingCallables.isEmpty()) {
114+
pendingCallables.notifyAll();
115+
}
116+
}
108117
}
109118

110119
@Override
@@ -172,6 +181,7 @@ <V> ScheduledFuture<V> schedulePendingCallable(PendingCallable<V> callable) {
172181
synchronized (pendingCallables) {
173182
pendingCallables.add(callable);
174183
}
184+
work();
175185
return callable.getScheduledFuture();
176186
}
177187

0 commit comments

Comments
 (0)