Skip to content

Commit 09172a6

Browse files
committed
Fixing race conditions in tests that for thread scheduling reason
manifest only certain github testing environments.
1 parent 347c661 commit 09172a6

2 files changed

Lines changed: 47 additions & 12 deletions

File tree

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakeScheduledExecutorService.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import com.google.common.primitives.Ints;
2121
import com.google.common.util.concurrent.SettableFuture;
2222
import java.util.ArrayList;
23+
import java.util.Deque;
24+
import java.util.LinkedList;
2325
import java.util.List;
2426
import java.util.PriorityQueue;
2527
import java.util.concurrent.AbstractExecutorService;
@@ -79,6 +81,26 @@ public ScheduledFuture<?> scheduleWithFixedDelay(
7981
Duration.millis(unit.toMillis(initialDelay)), command, PendingCallableType.FIXED_DELAY));
8082
}
8183

84+
private Deque<Duration> expectedWorkQueue = new LinkedList<>();
85+
86+
public void setupScheduleExpectation(Duration delay) {
87+
synchronized (expectedWorkQueue) {
88+
expectedWorkQueue.add(delay);
89+
}
90+
}
91+
92+
public void waitForExpectedWork() {
93+
synchronized (expectedWorkQueue) {
94+
while (!expectedWorkQueue.isEmpty()) {
95+
try {
96+
expectedWorkQueue.wait();
97+
} catch (InterruptedException e) {
98+
// Wait uninterruptibly
99+
}
100+
}
101+
}
102+
}
103+
82104
/**
83105
* This will advance the reference time of the executor and execute (in the same thread) any
84106
* outstanding callable which execution time has passed.
@@ -100,7 +122,7 @@ private void work() {
100122
callable = pendingCallables.poll();
101123
}
102124
if (callable != null) {
103-
try{
125+
try {
104126
callable.call();
105127
} catch (Exception e) {
106128
// We ignore any callable exception, which should be set to the future but not relevant to
@@ -182,6 +204,13 @@ <V> ScheduledFuture<V> schedulePendingCallable(PendingCallable<V> callable) {
182204
pendingCallables.add(callable);
183205
}
184206
work();
207+
synchronized (expectedWorkQueue) {
208+
if (!expectedWorkQueue.isEmpty() && expectedWorkQueue.peek().equals(callable.delay)) {
209+
expectedWorkQueue.poll();
210+
}
211+
expectedWorkQueue.notifyAll();
212+
}
213+
185214
return callable.getScheduledFuture();
186215
}
187216

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -284,9 +284,12 @@ public void testModifyAckDeadline() throws Exception {
284284
// Send messages to be acked
285285
List<String> testAckIdsBatch = ImmutableList.of("A", "B", "C");
286286
testReceiver.setExplicitAck(true);
287+
// A modify ack deadline should be schedule for the next 9s
288+
fakeExecutor.setupScheduleExpectation(Duration.standardSeconds(9));
287289
sendMessages(testAckIdsBatch);
290+
// To ensure first modify ack deadline got scheduled
291+
fakeExecutor.waitForExpectedWork();
288292

289-
// Trigger modify ack deadline sending - 10s initial stream ack deadline - 1 padding
290293
fakeExecutor.advanceTime(Duration.standardSeconds(9));
291294

292295
assertEquivalentWithTransformation(
@@ -299,17 +302,16 @@ public ModifyAckDeadline apply(String ack) {
299302
}
300303
});
301304

302-
// Trigger modify ack deadline sending - 2s of the renewed deadlines
303-
fakeExecutor.advanceTime(Duration.standardSeconds(2));
305+
fakeExecutor.advanceTime(Duration.standardSeconds(1));
304306

305307
assertEquivalentWithTransformation(
306308
testAckIdsBatch,
307309
fakeSubscriberServiceImpl.waitAndConsumeModifyAckDeadlines(3),
308310
new Function<String, ModifyAckDeadline>() {
309311
@Override
310312
public ModifyAckDeadline apply(String ack) {
311-
return new ModifyAckDeadline(ack, 2); // It is expected that the deadline is renewed
312-
// only two more seconds to not pass the max
313+
return new ModifyAckDeadline(ack, 3); // It is expected that the deadline is renewed
314+
// only three more seconds to not pass the max
313315
// ack deadline ext.
314316
}
315317
});
@@ -332,9 +334,13 @@ public void testModifyAckDeadline_defaultMaxExtensionPeriod() throws Exception {
332334
// Send messages to be acked
333335
List<String> testAckIdsBatch = ImmutableList.of("A", "B", "C");
334336
testReceiver.setExplicitAck(true);
337+
// A modify ack deadline should be schedule for the next 9s
338+
fakeExecutor.setupScheduleExpectation(Duration.standardSeconds(9));
335339
sendMessages(testAckIdsBatch);
340+
// To ensure the first modify ack deadlines got scheduled
341+
fakeExecutor.waitForExpectedWork();
336342

337-
// Trigger modify ack deadline sending - 10s initial stream ack deadline - 1 padding
343+
// Next modify ack deadline should be schedule in the next 1s
338344
fakeExecutor.advanceTime(Duration.standardSeconds(9));
339345

340346
assertEquivalentWithTransformation(
@@ -347,12 +353,12 @@ public ModifyAckDeadline apply(String ack) {
347353
}
348354
});
349355

350-
int timeIncrementSecs = INITIAL_ACK_DEADLINE_EXTENSION_SECS * 2; // Second time increment
356+
fakeExecutor.advanceTime(Duration.standardSeconds(1));
357+
int timeIncrementSecs = INITIAL_ACK_DEADLINE_EXTENSION_SECS; // Second time increment
351358

352359
// Check ack deadline extensions while the current time has not reached 60 minutes
353-
while (fakeExecutor.getClock().millisTime() + (timeIncrementSecs * 1000) < 1000 * 60 * 60) {
354-
fakeExecutor.advanceTime(Duration.standardSeconds(timeIncrementSecs));
355-
360+
while (fakeExecutor.getClock().millisTime() + timeIncrementSecs - 1 < 1000 * 60 * 60) {
361+
timeIncrementSecs *= 2;
356362
final int expectedIncrementSecs = Math.min(600, timeIncrementSecs);
357363
assertEquivalentWithTransformation(
358364
testAckIdsBatch,
@@ -363,7 +369,7 @@ public ModifyAckDeadline apply(String ack) {
363369
return new ModifyAckDeadline(ack, expectedIncrementSecs);
364370
}
365371
});
366-
timeIncrementSecs *= 2;
372+
fakeExecutor.advanceTime(Duration.standardSeconds(timeIncrementSecs - 1));
367373
}
368374

369375
// No more modify ack deadline extension should be triggered at this point

0 commit comments

Comments
 (0)