2020
2121import com .google .cloud .GrpcServiceOptions .ExecutorFactory ;
2222import com .google .common .collect .ImmutableList ;
23-
23+ import java .util .concurrent .CountDownLatch ;
24+ import java .util .concurrent .ExecutorService ;
25+ import java .util .concurrent .Future ;
26+ import java .util .concurrent .ScheduledExecutorService ;
27+ import java .util .concurrent .ScheduledFuture ;
28+ import java .util .concurrent .TimeUnit ;
29+ import java .util .concurrent .atomic .AtomicLong ;
2430import org .easymock .EasyMock ;
2531import org .easymock .IAnswer ;
2632import org .junit .After ;
2935import org .junit .Test ;
3036import org .junit .rules .Timeout ;
3137
32- import java .util .concurrent .CountDownLatch ;
33- import java .util .concurrent .Future ;
34- import java .util .concurrent .ScheduledExecutorService ;
35- import java .util .concurrent .ScheduledFuture ;
36- import java .util .concurrent .TimeUnit ;
37- import java .util .concurrent .atomic .AtomicLong ;
38-
3938public class AckDeadlineRenewerTest {
4039
4140 private static final int MIN_DEADLINE_MILLIS = 10_000 ;
41+ private static final int TIME_ADVANCE = 9_000 ;
4242
4343 private static final String SUBSCRIPTION1 = "subscription1" ;
4444 private static final String SUBSCRIPTION2 = "subscription2" ;
@@ -47,16 +47,31 @@ public class AckDeadlineRenewerTest {
4747 private static final String ACK_ID3 = "ack-id3" ;
4848
4949 private PubSub pubsub ;
50+ private FakeScheduledExecutorService executorService ;
5051 private AckDeadlineRenewer ackDeadlineRenewer ;
52+ private final FakeClock clock = new FakeClock ();
5153
5254 @ Rule
5355 public Timeout globalTimeout = Timeout .seconds (60 );
5456
5557 @ Before
5658 public void setUp () {
5759 pubsub = EasyMock .createStrictMock (PubSub .class );
60+ executorService = new FakeScheduledExecutorService (4 , clock );
61+ ExecutorFactory executorFactory = new ExecutorFactory () {
62+ @ Override
63+ public ExecutorService get () {
64+ return executorService ;
65+ }
66+ @ Override
67+ public void release (ExecutorService executor ) {
68+ executorService .shutdown ();
69+ }
70+ };
5871 PubSubOptions options = PubSubOptions .newBuilder ()
5972 .setProjectId ("projectId" )
73+ .setExecutorFactory (executorFactory )
74+ .setClock (clock )
6075 .build ();
6176 EasyMock .expect (pubsub .getOptions ()).andReturn (options );
6277 EasyMock .replay (pubsub );
@@ -69,13 +84,13 @@ public void tearDown() throws Exception {
6984 ackDeadlineRenewer .close ();
7085 }
7186
72- private static IAnswer <Future <Void >> createAnswer (final CountDownLatch latch ,
87+ private IAnswer <Future <Void >> createAnswer (final CountDownLatch latch ,
7388 final AtomicLong renewal ) {
7489 return new IAnswer <Future <Void >>() {
7590 @ Override
7691 public Future <Void > answer () throws Throwable {
7792 latch .countDown ();
78- renewal .set (System . currentTimeMillis ());
93+ renewal .set (clock . millis ());
7994 return null ;
8095 }
8196 };
@@ -95,10 +110,12 @@ public void testAddOneMessage() throws InterruptedException {
95110 TimeUnit .MILLISECONDS , ImmutableList .of (ACK_ID1 )))
96111 .andAnswer (createAnswer (secondLatch , secondRenewal ));
97112 EasyMock .replay (pubsub );
98- long addTime = System . currentTimeMillis ();
113+ long addTime = clock . millis ();
99114 ackDeadlineRenewer .add (SUBSCRIPTION1 , ACK_ID1 );
115+ executorService .tick (TIME_ADVANCE , TimeUnit .MILLISECONDS );
100116 firstLatch .await ();
101117 assertTrue (firstRenewal .get () < (addTime + MIN_DEADLINE_MILLIS ));
118+ executorService .tick (TIME_ADVANCE , TimeUnit .MILLISECONDS );
102119 secondLatch .await ();
103120 assertTrue (secondRenewal .get () < (firstRenewal .get () + MIN_DEADLINE_MILLIS ));
104121 }
@@ -125,13 +142,15 @@ public void testAddMessages() throws InterruptedException {
125142 TimeUnit .MILLISECONDS , ImmutableList .of (ACK_ID1 , ACK_ID3 )))
126143 .andAnswer (createAnswer (secondLatch , secondRenewalSub2 ));
127144 EasyMock .replay (pubsub );
128- long addTime1 = System . currentTimeMillis ();
145+ long addTime1 = clock . millis ();
129146 ackDeadlineRenewer .add (SUBSCRIPTION1 , ImmutableList .of (ACK_ID1 , ACK_ID2 ));
130147 ackDeadlineRenewer .add (SUBSCRIPTION2 , ACK_ID1 );
148+ executorService .tick (TIME_ADVANCE , TimeUnit .MILLISECONDS );
131149 firstLatch .await ();
132150 assertTrue (firstRenewalSub1 .get () < (addTime1 + MIN_DEADLINE_MILLIS ));
133151 assertTrue (firstRenewalSub2 .get () < (addTime1 + MIN_DEADLINE_MILLIS ));
134152 ackDeadlineRenewer .add (SUBSCRIPTION2 , ACK_ID3 );
153+ executorService .tick (TIME_ADVANCE , TimeUnit .MILLISECONDS );
135154 secondLatch .await ();
136155 assertTrue (secondRenewalSub1 .get () < (firstRenewalSub1 .get () + MIN_DEADLINE_MILLIS ));
137156 assertTrue (secondRenewalSub2 .get () < (firstRenewalSub2 .get () + MIN_DEADLINE_MILLIS ));
@@ -159,13 +178,15 @@ public void testAddExistingMessage() throws InterruptedException {
159178 TimeUnit .MILLISECONDS , ImmutableList .of (ACK_ID1 )))
160179 .andAnswer (createAnswer (secondLatch , secondRenewalSub2 ));
161180 EasyMock .replay (pubsub );
162- long addTime1 = System . currentTimeMillis ();
181+ long addTime1 = clock . millis ();
163182 ackDeadlineRenewer .add (SUBSCRIPTION1 , ImmutableList .of (ACK_ID1 , ACK_ID2 ));
164183 ackDeadlineRenewer .add (SUBSCRIPTION2 , ACK_ID1 );
184+ executorService .tick (TIME_ADVANCE , TimeUnit .MILLISECONDS );
165185 firstLatch .await ();
166186 assertTrue (firstRenewalSub1 .get () < (addTime1 + MIN_DEADLINE_MILLIS ));
167187 assertTrue (firstRenewalSub2 .get () < (addTime1 + MIN_DEADLINE_MILLIS ));
168188 ackDeadlineRenewer .add (SUBSCRIPTION2 , ACK_ID1 );
189+ executorService .tick (TIME_ADVANCE , TimeUnit .MILLISECONDS );
169190 secondLatch .await ();
170191 assertTrue (secondRenewalSub1 .get () < (firstRenewalSub1 .get () + MIN_DEADLINE_MILLIS ));
171192 assertTrue (secondRenewalSub2 .get () < (firstRenewalSub2 .get () + MIN_DEADLINE_MILLIS ));
@@ -193,13 +214,15 @@ public void testRemoveNonExistingMessage() throws InterruptedException {
193214 TimeUnit .MILLISECONDS , ImmutableList .of (ACK_ID1 )))
194215 .andAnswer (createAnswer (secondLatch , secondRenewalSub2 ));
195216 EasyMock .replay (pubsub );
196- long addTime1 = System . currentTimeMillis ();
217+ long addTime1 = clock . millis ();
197218 ackDeadlineRenewer .add (SUBSCRIPTION1 , ImmutableList .of (ACK_ID1 , ACK_ID2 ));
198219 ackDeadlineRenewer .add (SUBSCRIPTION2 , ACK_ID1 );
220+ executorService .tick (TIME_ADVANCE , TimeUnit .MILLISECONDS );
199221 firstLatch .await ();
200222 assertTrue (firstRenewalSub1 .get () < (addTime1 + MIN_DEADLINE_MILLIS ));
201223 assertTrue (firstRenewalSub2 .get () < (addTime1 + MIN_DEADLINE_MILLIS ));
202224 ackDeadlineRenewer .remove (SUBSCRIPTION1 , ACK_ID3 );
225+ executorService .tick (TIME_ADVANCE , TimeUnit .MILLISECONDS );
203226 secondLatch .await ();
204227 assertTrue (secondRenewalSub1 .get () < (firstRenewalSub1 .get () + MIN_DEADLINE_MILLIS ));
205228 assertTrue (secondRenewalSub2 .get () < (firstRenewalSub2 .get () + MIN_DEADLINE_MILLIS ));
@@ -227,13 +250,15 @@ public void testRemoveMessage() throws InterruptedException {
227250 TimeUnit .MILLISECONDS , ImmutableList .of (ACK_ID1 )))
228251 .andAnswer (createAnswer (secondLatch , secondRenewalSub2 ));
229252 EasyMock .replay (pubsub );
230- long addTime1 = System . currentTimeMillis ();
253+ long addTime1 = clock . millis ();
231254 ackDeadlineRenewer .add (SUBSCRIPTION1 , ImmutableList .of (ACK_ID1 , ACK_ID2 ));
232255 ackDeadlineRenewer .add (SUBSCRIPTION2 , ACK_ID1 );
256+ executorService .tick (TIME_ADVANCE , TimeUnit .MILLISECONDS );
233257 firstLatch .await ();
234258 assertTrue (firstRenewalSub1 .get () < (addTime1 + MIN_DEADLINE_MILLIS ));
235259 assertTrue (firstRenewalSub2 .get () < (addTime1 + MIN_DEADLINE_MILLIS ));
236260 ackDeadlineRenewer .remove (SUBSCRIPTION1 , ACK_ID2 );
261+ executorService .tick (TIME_ADVANCE , TimeUnit .MILLISECONDS );
237262 secondLatch .await ();
238263 assertTrue (secondRenewalSub1 .get () < (firstRenewalSub1 .get () + MIN_DEADLINE_MILLIS ));
239264 assertTrue (secondRenewalSub2 .get () < (firstRenewalSub2 .get () + MIN_DEADLINE_MILLIS ));
0 commit comments