Skip to content

Commit 1a8a438

Browse files
Using fake clock in AckDeadlineRenewerTest (#1413)
This reduces the running time of the test from 3-4 minutes down to 0.03 seconds.
1 parent 3f3e7c0 commit 1a8a438

3 files changed

Lines changed: 261 additions & 15 deletions

File tree

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewerTest.java

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,13 @@
2020

2121
import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
2222
import com.google.common.collect.ImmutableList;
23-
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Future;
26+
import java.util.concurrent.ScheduledExecutorService;
27+
import java.util.concurrent.ScheduledFuture;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicLong;
2430
import org.easymock.EasyMock;
2531
import org.easymock.IAnswer;
2632
import org.junit.After;
@@ -29,16 +35,10 @@
2935
import org.junit.Test;
3036
import org.junit.rules.Timeout;
3137

32-
import java.util.concurrent.CountDownLatch;
33-
import java.util.concurrent.Future;
34-
import java.util.concurrent.ScheduledExecutorService;
35-
import java.util.concurrent.ScheduledFuture;
36-
import java.util.concurrent.TimeUnit;
37-
import java.util.concurrent.atomic.AtomicLong;
38-
3938
public class AckDeadlineRenewerTest {
4039

4140
private static final int MIN_DEADLINE_MILLIS = 10_000;
41+
private static final int TIME_ADVANCE = 9_000;
4242

4343
private static final String SUBSCRIPTION1 = "subscription1";
4444
private static final String SUBSCRIPTION2 = "subscription2";
@@ -47,16 +47,31 @@ public class AckDeadlineRenewerTest {
4747
private static final String ACK_ID3 = "ack-id3";
4848

4949
private PubSub pubsub;
50+
private FakeScheduledExecutorService executorService;
5051
private AckDeadlineRenewer ackDeadlineRenewer;
52+
private final FakeClock clock = new FakeClock();
5153

5254
@Rule
5355
public Timeout globalTimeout = Timeout.seconds(60);
5456

5557
@Before
5658
public void setUp() {
5759
pubsub = EasyMock.createStrictMock(PubSub.class);
60+
executorService = new FakeScheduledExecutorService(4, clock);
61+
ExecutorFactory executorFactory = new ExecutorFactory() {
62+
@Override
63+
public ExecutorService get() {
64+
return executorService;
65+
}
66+
@Override
67+
public void release(ExecutorService executor) {
68+
executorService.shutdown();
69+
}
70+
};
5871
PubSubOptions options = PubSubOptions.newBuilder()
5972
.setProjectId("projectId")
73+
.setExecutorFactory(executorFactory)
74+
.setClock(clock)
6075
.build();
6176
EasyMock.expect(pubsub.getOptions()).andReturn(options);
6277
EasyMock.replay(pubsub);
@@ -69,13 +84,13 @@ public void tearDown() throws Exception {
6984
ackDeadlineRenewer.close();
7085
}
7186

72-
private static IAnswer<Future<Void>> createAnswer(final CountDownLatch latch,
87+
private IAnswer<Future<Void>> createAnswer(final CountDownLatch latch,
7388
final AtomicLong renewal) {
7489
return new IAnswer<Future<Void>>() {
7590
@Override
7691
public Future<Void> answer() throws Throwable {
7792
latch.countDown();
78-
renewal.set(System.currentTimeMillis());
93+
renewal.set(clock.millis());
7994
return null;
8095
}
8196
};
@@ -95,10 +110,12 @@ public void testAddOneMessage() throws InterruptedException {
95110
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
96111
.andAnswer(createAnswer(secondLatch, secondRenewal));
97112
EasyMock.replay(pubsub);
98-
long addTime = System.currentTimeMillis();
113+
long addTime = clock.millis();
99114
ackDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1);
115+
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
100116
firstLatch.await();
101117
assertTrue(firstRenewal.get() < (addTime + MIN_DEADLINE_MILLIS));
118+
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
102119
secondLatch.await();
103120
assertTrue(secondRenewal.get() < (firstRenewal.get() + MIN_DEADLINE_MILLIS));
104121
}
@@ -125,13 +142,15 @@ public void testAddMessages() throws InterruptedException {
125142
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID3)))
126143
.andAnswer(createAnswer(secondLatch, secondRenewalSub2));
127144
EasyMock.replay(pubsub);
128-
long addTime1 = System.currentTimeMillis();
145+
long addTime1 = clock.millis();
129146
ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
130147
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
148+
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
131149
firstLatch.await();
132150
assertTrue(firstRenewalSub1.get() < (addTime1 + MIN_DEADLINE_MILLIS));
133151
assertTrue(firstRenewalSub2.get() < (addTime1 + MIN_DEADLINE_MILLIS));
134152
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID3);
153+
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
135154
secondLatch.await();
136155
assertTrue(secondRenewalSub1.get() < (firstRenewalSub1.get() + MIN_DEADLINE_MILLIS));
137156
assertTrue(secondRenewalSub2.get() < (firstRenewalSub2.get() + MIN_DEADLINE_MILLIS));
@@ -159,13 +178,15 @@ public void testAddExistingMessage() throws InterruptedException {
159178
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
160179
.andAnswer(createAnswer(secondLatch, secondRenewalSub2));
161180
EasyMock.replay(pubsub);
162-
long addTime1 = System.currentTimeMillis();
181+
long addTime1 = clock.millis();
163182
ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
164183
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
184+
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
165185
firstLatch.await();
166186
assertTrue(firstRenewalSub1.get() < (addTime1 + MIN_DEADLINE_MILLIS));
167187
assertTrue(firstRenewalSub2.get() < (addTime1 + MIN_DEADLINE_MILLIS));
168188
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
189+
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
169190
secondLatch.await();
170191
assertTrue(secondRenewalSub1.get() < (firstRenewalSub1.get() + MIN_DEADLINE_MILLIS));
171192
assertTrue(secondRenewalSub2.get() < (firstRenewalSub2.get() + MIN_DEADLINE_MILLIS));
@@ -193,13 +214,15 @@ public void testRemoveNonExistingMessage() throws InterruptedException {
193214
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
194215
.andAnswer(createAnswer(secondLatch, secondRenewalSub2));
195216
EasyMock.replay(pubsub);
196-
long addTime1 = System.currentTimeMillis();
217+
long addTime1 = clock.millis();
197218
ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
198219
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
220+
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
199221
firstLatch.await();
200222
assertTrue(firstRenewalSub1.get() < (addTime1 + MIN_DEADLINE_MILLIS));
201223
assertTrue(firstRenewalSub2.get() < (addTime1 + MIN_DEADLINE_MILLIS));
202224
ackDeadlineRenewer.remove(SUBSCRIPTION1, ACK_ID3);
225+
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
203226
secondLatch.await();
204227
assertTrue(secondRenewalSub1.get() < (firstRenewalSub1.get() + MIN_DEADLINE_MILLIS));
205228
assertTrue(secondRenewalSub2.get() < (firstRenewalSub2.get() + MIN_DEADLINE_MILLIS));
@@ -227,13 +250,15 @@ public void testRemoveMessage() throws InterruptedException {
227250
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
228251
.andAnswer(createAnswer(secondLatch, secondRenewalSub2));
229252
EasyMock.replay(pubsub);
230-
long addTime1 = System.currentTimeMillis();
253+
long addTime1 = clock.millis();
231254
ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
232255
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
256+
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
233257
firstLatch.await();
234258
assertTrue(firstRenewalSub1.get() < (addTime1 + MIN_DEADLINE_MILLIS));
235259
assertTrue(firstRenewalSub2.get() < (addTime1 + MIN_DEADLINE_MILLIS));
236260
ackDeadlineRenewer.remove(SUBSCRIPTION1, ACK_ID2);
261+
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
237262
secondLatch.await();
238263
assertTrue(secondRenewalSub1.get() < (firstRenewalSub1.get() + MIN_DEADLINE_MILLIS));
239264
assertTrue(secondRenewalSub2.get() < (firstRenewalSub2.get() + MIN_DEADLINE_MILLIS));
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2016 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsub;
18+
19+
import com.google.cloud.Clock;
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.atomic.AtomicLong;
22+
23+
/**
24+
* A Clock to help with testing time-based logic.
25+
*/
26+
class FakeClock extends Clock {
27+
28+
private final AtomicLong millis = new AtomicLong();
29+
30+
// Advances the clock value by {@code time} in {@code timeUnit}.
31+
void advance(long time, TimeUnit timeUnit) {
32+
millis.addAndGet(timeUnit.toMillis(time));
33+
}
34+
35+
@Override
36+
public long millis() {
37+
return millis.get();
38+
}
39+
}
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Copyright 2016 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsub;
18+
19+
import java.util.ArrayList;
20+
import java.util.Iterator;
21+
import java.util.List;
22+
import java.util.concurrent.Callable;
23+
import java.util.concurrent.Delayed;
24+
import java.util.concurrent.ExecutionException;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.ScheduledFuture;
27+
import java.util.concurrent.ScheduledThreadPoolExecutor;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.TimeoutException;
30+
31+
/**
32+
* A ScheduledExecutorService to help with testing.
33+
*/
34+
class FakeScheduledExecutorService extends ScheduledThreadPoolExecutor {
35+
private final FakeClock clock;
36+
private final List<FakeScheduledFuture> futures = new ArrayList<>();
37+
38+
public FakeScheduledExecutorService(int corePoolSize, FakeClock clock) {
39+
super(corePoolSize);
40+
this.clock = clock;
41+
}
42+
43+
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
44+
synchronized (this) {
45+
long runAtMillis = clock.millis() + unit.toMillis(delay);
46+
FakeScheduledFuture scheduledFuture =
47+
new FakeScheduledFuture(command, delay, unit, runAtMillis);
48+
futures.add(scheduledFuture);
49+
return scheduledFuture;
50+
}
51+
}
52+
53+
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
54+
throw new UnsupportedOperationException(
55+
"FakeScheduledExecutorService.schedule(Callable, long, TimeUnit) not supported");
56+
}
57+
58+
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,
59+
TimeUnit unit) {
60+
throw new UnsupportedOperationException(
61+
"FakeScheduledExecutorService.scheduleAtFixedRate not supported");
62+
}
63+
64+
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
65+
TimeUnit unit) {
66+
throw new UnsupportedOperationException(
67+
"FakeScheduledExecutorService.scheduleAtFixedRate not supported");
68+
}
69+
70+
public void tick(long time, TimeUnit unit) {
71+
List<FakeScheduledFuture> runnablesToGo = new ArrayList<>();
72+
synchronized (this) {
73+
clock.advance(time, unit);
74+
Iterator<FakeScheduledFuture> iter = futures.iterator();
75+
while (iter.hasNext()) {
76+
FakeScheduledFuture scheduledFuture = iter.next();
77+
if (scheduledFuture.runAtMillis <= clock.millis()) {
78+
runnablesToGo.add(scheduledFuture);
79+
iter.remove();
80+
}
81+
}
82+
}
83+
for (FakeScheduledFuture scheduledFuture : runnablesToGo) {
84+
scheduledFuture.run();
85+
}
86+
}
87+
88+
private boolean cancel(FakeScheduledFuture toCancel) {
89+
synchronized (this) {
90+
Iterator<FakeScheduledFuture> iter = futures.iterator();
91+
while (iter.hasNext()) {
92+
FakeScheduledFuture scheduledFuture = iter.next();
93+
if (scheduledFuture == toCancel) {
94+
iter.remove();
95+
return true;
96+
}
97+
}
98+
return false;
99+
}
100+
}
101+
102+
private class FakeScheduledFuture implements ScheduledFuture<Object> {
103+
final Callable<Object> callable;
104+
final long delay;
105+
final TimeUnit unit;
106+
final long runAtMillis;
107+
108+
volatile boolean isDone;
109+
volatile boolean isCancelled;
110+
volatile Exception exception;
111+
volatile Object result;
112+
113+
FakeScheduledFuture(Runnable runnable, long delay, TimeUnit unit, long runAtMillis) {
114+
this.callable = Executors.callable(runnable);
115+
this.delay = delay;
116+
this.unit = unit;
117+
this.runAtMillis = runAtMillis;
118+
}
119+
120+
@Override
121+
public long getDelay(TimeUnit requestedUnit) {
122+
return unit.convert(delay, requestedUnit);
123+
}
124+
125+
@Override
126+
public int compareTo(Delayed other) {
127+
return Long.compare(getDelay(TimeUnit.MILLISECONDS), other.getDelay(TimeUnit.MILLISECONDS));
128+
}
129+
130+
@Override
131+
public boolean cancel(boolean var1) {
132+
if (isCancelled) {
133+
return isCancelled;
134+
}
135+
isCancelled = FakeScheduledExecutorService.this.cancel(this);
136+
return isCancelled;
137+
}
138+
139+
@Override
140+
public boolean isCancelled() {
141+
return isCancelled;
142+
}
143+
144+
@Override
145+
public boolean isDone() {
146+
return isDone;
147+
}
148+
149+
@Override
150+
public Object get() throws InterruptedException, ExecutionException {
151+
if (!isDone()) {
152+
throw new UnsupportedOperationException("FakeScheduledFuture: blocking get not supported");
153+
}
154+
155+
if (exception != null) {
156+
throw new ExecutionException(exception);
157+
}
158+
159+
return result;
160+
}
161+
162+
@Override
163+
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
164+
TimeoutException {
165+
return get();
166+
}
167+
168+
public void run() {
169+
if (isDone()) {
170+
throw new UnsupportedOperationException("FakeScheduledFuture already done.");
171+
}
172+
173+
try {
174+
result = callable.call();
175+
} catch (Exception e) {
176+
exception = e;
177+
}
178+
179+
isDone = true;
180+
}
181+
}
182+
}

0 commit comments

Comments
 (0)