3838
3939import java .util .concurrent .CountDownLatch ;
4040import java .util .concurrent .ExecutorService ;
41+ import java .util .concurrent .Future ;
4142
4243public class MessageConsumerImplTest {
4344
@@ -213,20 +214,33 @@ public void testMessageConsumerMultipleCallsAck() throws Exception {
213214 PullResponse response1 = PullResponse .newBuilder ()
214215 .addReceivedMessages (MESSAGE1_PB )
215216 .build ();
216- PullResponse response2 = PullResponse .newBuilder ()
217+ final PullResponse response2 = PullResponse .newBuilder ()
217218 .addReceivedMessages (MESSAGE2_PB )
218219 .build ();
219220 EasyMock .expect (options .rpc ()).andReturn (pubsubRpc );
220221 EasyMock .expect (options .service ()).andReturn (pubsub );
221222 EasyMock .expect (options .projectId ()).andReturn (PROJECT ).anyTimes ();
223+ final CountDownLatch nextPullLatch = new CountDownLatch (1 );
222224 final CountDownLatch latch = new CountDownLatch (2 );
223225 EasyMock .expect (pubsub .options ()).andReturn (options );
224- EasyMock .expect (pubsub .ackAsync (SUBSCRIPTION , ACK_ID1 )).andReturn (null );
226+ EasyMock .expect (pubsub .ackAsync (SUBSCRIPTION , ACK_ID1 )).andAnswer (new IAnswer <Future <Void >>() {
227+ @ Override
228+ public Future <Void > answer () throws Throwable {
229+ nextPullLatch .await ();
230+ return null ;
231+ }
232+ });
225233 EasyMock .expect (pubsub .options ()).andReturn (options );
226234 EasyMock .expect (pubsub .ackAsync (SUBSCRIPTION , ACK_ID2 )).andReturn (null );
227235 EasyMock .replay (pubsub );
228236 EasyMock .expect (pubsubRpc .pull (request1 )).andReturn (new TestPullFuture (response1 ));
229- EasyMock .expect (pubsubRpc .pull (request2 )).andReturn (new TestPullFuture (response2 ));
237+ EasyMock .expect (pubsubRpc .pull (request2 )).andAnswer (new IAnswer <PullFuture >() {
238+ @ Override
239+ public PullFuture answer () throws Throwable {
240+ nextPullLatch .countDown ();
241+ return new TestPullFuture (response2 );
242+ }
243+ });
230244 EasyMock .expect (pubsubRpc .pull (EasyMock .<PullRequest >anyObject ()))
231245 .andReturn (new TestPullFuture (EMPTY_RESPONSE )).anyTimes ();
232246 renewer .add (SUBSCRIPTION , ACK_ID1 );
@@ -253,20 +267,33 @@ public void testMessageConsumerMultipleCallsNack() throws Exception {
253267 PullResponse response1 = PullResponse .newBuilder ()
254268 .addReceivedMessages (MESSAGE1_PB )
255269 .build ();
256- PullResponse response2 = PullResponse .newBuilder ()
270+ final PullResponse response2 = PullResponse .newBuilder ()
257271 .addReceivedMessages (MESSAGE2_PB )
258272 .build ();
259273 EasyMock .expect (options .rpc ()).andReturn (pubsubRpc );
260274 EasyMock .expect (options .service ()).andReturn (pubsub );
261275 EasyMock .expect (options .projectId ()).andReturn (PROJECT ).anyTimes ();
276+ final CountDownLatch nextPullLatch = new CountDownLatch (1 );
262277 final CountDownLatch latch = new CountDownLatch (2 );
263278 EasyMock .expect (pubsub .options ()).andReturn (options );
264- EasyMock .expect (pubsub .nackAsync (SUBSCRIPTION , ACK_ID1 )).andReturn (null );
279+ EasyMock .expect (pubsub .nackAsync (SUBSCRIPTION , ACK_ID1 )).andAnswer (new IAnswer <Future <Void >>() {
280+ @ Override
281+ public Future <Void > answer () throws Throwable {
282+ nextPullLatch .await ();
283+ return null ;
284+ }
285+ });
265286 EasyMock .expect (pubsub .options ()).andReturn (options );
266287 EasyMock .expect (pubsub .nackAsync (SUBSCRIPTION , ACK_ID2 )).andReturn (null );
267288 EasyMock .replay (pubsub );
268289 EasyMock .expect (pubsubRpc .pull (request1 )).andReturn (new TestPullFuture (response1 ));
269- EasyMock .expect (pubsubRpc .pull (request2 )).andReturn (new TestPullFuture (response2 ));
290+ EasyMock .expect (pubsubRpc .pull (request2 )).andAnswer (new IAnswer <PullFuture >() {
291+ @ Override
292+ public PullFuture answer () throws Throwable {
293+ nextPullLatch .countDown ();
294+ return new TestPullFuture (response2 );
295+ }
296+ });
270297 EasyMock .expect (pubsubRpc .pull (EasyMock .<PullRequest >anyObject ()))
271298 .andReturn (new TestPullFuture (EMPTY_RESPONSE )).anyTimes ();
272299 renewer .add (SUBSCRIPTION , ACK_ID1 );
@@ -289,22 +316,35 @@ public void testMessageConsumerMultipleCallsNack() throws Exception {
289316 @ Test
290317 public void testMessageConsumerMaxCallbacksAck () throws Exception {
291318 PullRequest request1 = pullRequest (2 );
292- PullRequest request2 = pullRequest (2 );
293- PullResponse otherPullResponse = PullResponse .newBuilder ()
319+ PullRequest request2 = pullRequest (1 );
320+ final PullResponse otherPullResponse = PullResponse .newBuilder ()
294321 .addReceivedMessages (MESSAGE1_PB )
295322 .build ();
296323 EasyMock .expect (options .rpc ()).andReturn (pubsubRpc );
297324 EasyMock .expect (options .service ()).andReturn (pubsub );
298325 EasyMock .expect (options .projectId ()).andReturn (PROJECT ).anyTimes ();
299326 EasyMock .expect (pubsub .options ()).andReturn (options ).times (2 );
327+ final CountDownLatch nextPullLatch = new CountDownLatch (1 );
300328 final CountDownLatch latch = new CountDownLatch (3 );
301329 EasyMock .expect (pubsub .ackAsync (SUBSCRIPTION , ACK_ID1 )).andReturn (null );
302- EasyMock .expect (pubsub .ackAsync (SUBSCRIPTION , ACK_ID2 )).andReturn (null );
330+ EasyMock .expect (pubsub .ackAsync (SUBSCRIPTION , ACK_ID2 )).andAnswer (new IAnswer <Future <Void >>() {
331+ @ Override
332+ public Future <Void > answer () throws Throwable {
333+ nextPullLatch .await ();
334+ return null ;
335+ }
336+ });
303337 EasyMock .expect (pubsub .options ()).andReturn (options );
304338 EasyMock .expect (pubsub .ackAsync (SUBSCRIPTION , ACK_ID1 )).andReturn (null );
305339 EasyMock .replay (pubsub );
306340 EasyMock .expect (pubsubRpc .pull (request1 )).andReturn (new TestPullFuture (PULL_RESPONSE ));
307- EasyMock .expect (pubsubRpc .pull (request2 )).andReturn (new TestPullFuture (otherPullResponse ));
341+ EasyMock .expect (pubsubRpc .pull (request2 )).andAnswer (new IAnswer <PullFuture >() {
342+ @ Override
343+ public PullFuture answer () throws Throwable {
344+ nextPullLatch .countDown ();
345+ return new TestPullFuture (otherPullResponse );
346+ }
347+ });
308348 EasyMock .expect (pubsubRpc .pull (EasyMock .<PullRequest >anyObject ()))
309349 .andReturn (new TestPullFuture (EMPTY_RESPONSE )).anyTimes ();
310350 renewer .add (SUBSCRIPTION , ACK_ID1 );
@@ -331,22 +371,35 @@ public void testMessageConsumerMaxCallbacksAck() throws Exception {
331371 @ Test
332372 public void testMessageConsumerMaxCallbacksNack () throws Exception {
333373 PullRequest request1 = pullRequest (2 );
334- PullRequest request2 = pullRequest (2 );
335- PullResponse otherPullResponse = PullResponse .newBuilder ()
374+ PullRequest request2 = pullRequest (1 );
375+ final PullResponse otherPullResponse = PullResponse .newBuilder ()
336376 .addReceivedMessages (MESSAGE1_PB )
337377 .build ();
338378 EasyMock .expect (options .rpc ()).andReturn (pubsubRpc );
339379 EasyMock .expect (options .service ()).andReturn (pubsub );
340380 EasyMock .expect (options .projectId ()).andReturn (PROJECT ).anyTimes ();
341381 EasyMock .expect (pubsub .options ()).andReturn (options ).times (2 );
382+ final CountDownLatch nextPullLatch = new CountDownLatch (1 );
342383 final CountDownLatch latch = new CountDownLatch (3 );
343384 EasyMock .expect (pubsub .nackAsync (SUBSCRIPTION , ACK_ID1 )).andReturn (null );
344- EasyMock .expect (pubsub .nackAsync (SUBSCRIPTION , ACK_ID2 )).andReturn (null );
385+ EasyMock .expect (pubsub .nackAsync (SUBSCRIPTION , ACK_ID2 )).andAnswer (new IAnswer <Future <Void >>() {
386+ @ Override
387+ public Future <Void > answer () throws Throwable {
388+ nextPullLatch .await ();
389+ return null ;
390+ }
391+ });
345392 EasyMock .expect (pubsub .options ()).andReturn (options );
346393 EasyMock .expect (pubsub .nackAsync (SUBSCRIPTION , ACK_ID1 )).andReturn (null );
347394 EasyMock .replay (pubsub );
348395 EasyMock .expect (pubsubRpc .pull (request1 )).andReturn (new TestPullFuture (PULL_RESPONSE ));
349- EasyMock .expect (pubsubRpc .pull (request2 )).andReturn (new TestPullFuture (otherPullResponse ));
396+ EasyMock .expect (pubsubRpc .pull (request2 )).andAnswer (new IAnswer <PullFuture >() {
397+ @ Override
398+ public PullFuture answer () throws Throwable {
399+ nextPullLatch .countDown ();
400+ return new TestPullFuture (otherPullResponse );
401+ }
402+ });
350403 EasyMock .expect (pubsubRpc .pull (EasyMock .<PullRequest >anyObject ()))
351404 .andReturn (new TestPullFuture (EMPTY_RESPONSE )).anyTimes ();
352405 renewer .add (SUBSCRIPTION , ACK_ID1 );
0 commit comments