3131import com .google .api .gax .core .FixedExecutorProvider ;
3232import com .google .api .gax .core .InstantiatingExecutorProvider ;
3333import com .google .api .gax .core .NoCredentialsProvider ;
34+ import com .google .api .gax .grpc .GrpcTransportChannel ;
3435import com .google .api .gax .grpc .testing .LocalChannelProvider ;
3536import com .google .api .gax .rpc .DataLossException ;
37+ import com .google .api .gax .rpc .FixedTransportChannelProvider ;
3638import com .google .api .gax .rpc .TransportChannelProvider ;
3739import com .google .cloud .pubsub .v1 .Publisher .Builder ;
3840import com .google .protobuf .ByteString ;
3941import com .google .pubsub .v1 .ProjectTopicName ;
4042import com .google .pubsub .v1 .PublishRequest ;
4143import com .google .pubsub .v1 .PublishResponse ;
4244import com .google .pubsub .v1 .PubsubMessage ;
45+ import io .grpc .ManagedChannel ;
4346import io .grpc .Server ;
4447import io .grpc .Status ;
4548import io .grpc .StatusException ;
49+ import io .grpc .inprocess .InProcessChannelBuilder ;
4650import io .grpc .inprocess .InProcessServerBuilder ;
4751import java .util .List ;
4852import java .util .concurrent .CountDownLatch ;
@@ -75,6 +79,8 @@ public class PublisherImplTest {
7579
7680 private FakePublisherServiceImpl testPublisherServiceImpl ;
7781
82+ private ManagedChannel testChannel ;
83+
7884 private Server testServer ;
7985
8086 @ Before
@@ -84,6 +90,7 @@ public void setUp() throws Exception {
8490 InProcessServerBuilder serverBuilder = InProcessServerBuilder .forName ("test-server" );
8591 serverBuilder .addService (testPublisherServiceImpl );
8692 testServer = serverBuilder .build ();
93+ testChannel = InProcessChannelBuilder .forName ("test-server" ).build ();
8794 testServer .start ();
8895
8996 fakeExecutor = new FakeScheduledExecutorService ();
@@ -92,6 +99,7 @@ public void setUp() throws Exception {
9299 @ After
93100 public void tearDown () throws Exception {
94101 testServer .shutdownNow ().awaitTermination ();
102+ testChannel .shutdown ();
95103 }
96104
97105 @ Test
@@ -122,8 +130,7 @@ public void testPublishByDuration() throws Exception {
122130 assertEquals ("2" , publishFuture2 .get ());
123131
124132 assertEquals (2 , testPublisherServiceImpl .getCapturedRequests ().get (0 ).getMessagesCount ());
125- publisher .shutdown ();
126- publisher .awaitTermination (1 , TimeUnit .MINUTES );
133+ shutdownTestPublisher (publisher );
127134 }
128135
129136 @ Test
@@ -160,8 +167,9 @@ public void testPublishByNumBatchedMessages() throws Exception {
160167
161168 assertEquals (2 , testPublisherServiceImpl .getCapturedRequests ().get (0 ).getMessagesCount ());
162169 assertEquals (2 , testPublisherServiceImpl .getCapturedRequests ().get (1 ).getMessagesCount ());
163- publisher .shutdown ();
164- publisher .awaitTermination (1 , TimeUnit .MINUTES );
170+
171+ fakeExecutor .advanceTime (Duration .ofSeconds (100 ));
172+ shutdownTestPublisher (publisher );
165173 }
166174
167175 @ Test
@@ -195,8 +203,9 @@ public void testSinglePublishByNumBytes() throws Exception {
195203 assertEquals ("4" , publishFuture4 .get ());
196204
197205 assertEquals (2 , testPublisherServiceImpl .getCapturedRequests ().size ());
198- publisher .shutdown ();
199- publisher .awaitTermination (1 , TimeUnit .MINUTES );
206+
207+ fakeExecutor .advanceTime (Duration .ofSeconds (100 ));
208+ shutdownTestPublisher (publisher );
200209 }
201210
202211 @ Test
@@ -219,15 +228,16 @@ public void testPublishByShutdown() throws Exception {
219228
220229 // Note we are not advancing time or reaching the count threshold but messages should
221230 // still get published by call to shutdown
222-
223231 publisher .shutdown ();
224- publisher .awaitTermination (1 , TimeUnit .MINUTES );
225232
226233 // Verify the publishes completed
227234 assertTrue (publishFuture1 .isDone ());
228235 assertTrue (publishFuture2 .isDone ());
229236 assertEquals ("1" , publishFuture1 .get ());
230237 assertEquals ("2" , publishFuture2 .get ());
238+
239+ fakeExecutor .advanceTime (Duration .ofSeconds (100 ));
240+ publisher .awaitTermination (1 , TimeUnit .MINUTES );
231241 }
232242
233243 @ Test
@@ -269,8 +279,7 @@ public void testPublishMixedSizeAndDuration() throws Exception {
269279
270280 assertEquals (2 , testPublisherServiceImpl .getCapturedRequests ().get (0 ).getMessagesCount ());
271281 assertEquals (1 , testPublisherServiceImpl .getCapturedRequests ().get (1 ).getMessagesCount ());
272- publisher .shutdown ();
273- publisher .awaitTermination (1 , TimeUnit .MINUTES );
282+ shutdownTestPublisher (publisher );
274283 }
275284
276285 private ApiFuture <String > sendTestMessage (Publisher publisher , String data ) {
@@ -326,7 +335,9 @@ public void testBatchedMessagesWithOrderingKeyByNum() throws Exception {
326335 }
327336 }
328337 }
329- publisher .shutdown ();
338+
339+ fakeExecutor .advanceTime (Duration .ofSeconds (100 ));
340+ shutdownTestPublisher (publisher );
330341 }
331342
332343 @ Test
@@ -389,7 +400,7 @@ public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception {
389400 }
390401 }
391402 }
392- publisher . shutdown ( );
403+ shutdownTestPublisher ( publisher );
393404 }
394405
395406 @ Test
@@ -418,7 +429,8 @@ public void testLargeMessagesDoNotReorderBatches() throws Exception {
418429 // Verify that messages with "OrderB" were delivered in order.
419430 assertTrue (Integer .parseInt (publishFuture2 .get ()) < Integer .parseInt (publishFuture3 .get ()));
420431
421- publisher .shutdown ();
432+ fakeExecutor .advanceTime (Duration .ofSeconds (100 ));
433+ shutdownTestPublisher (publisher );
422434 }
423435
424436 @ Test
@@ -431,7 +443,7 @@ public void testOrderingKeyWhenDisabled_throwsException() throws Exception {
431443 } catch (IllegalStateException expected ) {
432444 // expected
433445 }
434- publisher . shutdown ( );
446+ shutdownTestPublisher ( publisher );
435447 }
436448
437449 @ Test
@@ -461,6 +473,7 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception {
461473
462474 assertEquals (4 , testPublisherServiceImpl .getCapturedRequests ().size ());
463475 publisher .shutdown ();
476+ assertTrue (publisher .awaitTermination (1 , TimeUnit .MINUTES ));
464477 }
465478
466479 @ Test
@@ -550,7 +563,7 @@ public void testResumePublish() throws Exception {
550563 Assert .assertEquals ("7" , future7 .get ());
551564 Assert .assertEquals ("8" , future8 .get ());
552565
553- publisher . shutdown ( );
566+ shutdownTestPublisher ( publisher );
554567 }
555568
556569 private ApiFuture <String > sendTestMessageWithOrderingKey (
@@ -604,8 +617,7 @@ public void testPublishFailureRetries() throws Exception {
604617 assertEquals ("1" , publishFuture1 .get ());
605618
606619 assertEquals (2 , testPublisherServiceImpl .getCapturedRequests ().size ());
607- publisher .shutdown ();
608- publisher .awaitTermination (1 , TimeUnit .MINUTES );
620+ shutdownTestPublisher (publisher );
609621 }
610622
611623 @ Test (expected = ExecutionException .class )
@@ -629,8 +641,7 @@ public void testPublishFailureRetries_retriesDisabled() throws Exception {
629641 publishFuture1 .get ();
630642 } finally {
631643 assertSame (testPublisherServiceImpl .getCapturedRequests ().size (), 1 );
632- publisher .shutdown ();
633- publisher .awaitTermination (1 , TimeUnit .MINUTES );
644+ shutdownTestPublisher (publisher );
634645 }
635646 }
636647
@@ -656,8 +667,7 @@ public void testPublishFailureRetries_maxRetriesSetup() throws Exception {
656667 assertEquals ("1" , publishFuture1 .get ());
657668
658669 assertEquals (3 , testPublisherServiceImpl .getCapturedRequests ().size ());
659- publisher .shutdown ();
660- publisher .awaitTermination (1 , TimeUnit .MINUTES );
670+ shutdownTestPublisher (publisher );
661671 }
662672
663673 @ Test
@@ -683,7 +693,7 @@ public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception
683693
684694 assertEquals (3 , testPublisherServiceImpl .getCapturedRequests ().size ());
685695 publisher .shutdown ();
686- publisher .awaitTermination (1 , TimeUnit .MINUTES );
696+ assertTrue ( publisher .awaitTermination (1 , TimeUnit .MINUTES ) );
687697 }
688698
689699 @ Test (expected = ExecutionException .class )
@@ -712,14 +722,15 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce
712722 } finally {
713723 assertTrue (testPublisherServiceImpl .getCapturedRequests ().size () >= 1 );
714724 publisher .shutdown ();
715- publisher .awaitTermination (1 , TimeUnit .MINUTES );
725+ assertTrue ( publisher .awaitTermination (1 , TimeUnit .MINUTES ) );
716726 }
717727 }
718728
719729 @ Test
720730 public void testPublisherGetters () throws Exception {
721731 Publisher .Builder builder = Publisher .newBuilder (TEST_TOPIC );
722- builder .setChannelProvider (TEST_CHANNEL_PROVIDER );
732+ builder .setChannelProvider (
733+ FixedTransportChannelProvider .create (GrpcTransportChannel .create (testChannel )));
723734 builder .setExecutorProvider (SINGLE_THREAD_EXECUTOR );
724735 builder .setBatchingSettings (
725736 BatchingSettings .newBuilder ()
@@ -735,7 +746,7 @@ public void testPublisherGetters() throws Exception {
735746 assertEquals (Duration .ofMillis (11 ), publisher .getBatchingSettings ().getDelayThreshold ());
736747 assertEquals (12 , (long ) publisher .getBatchingSettings ().getElementCountThreshold ());
737748 publisher .shutdown ();
738- publisher .awaitTermination (1 , TimeUnit .MINUTES );
749+ assertTrue ( publisher .awaitTermination (1 , TimeUnit .MINUTES ) );
739750 }
740751
741752 @ Test
@@ -1115,7 +1126,14 @@ public void run() {
11151126 private Builder getTestPublisherBuilder () {
11161127 return Publisher .newBuilder (TEST_TOPIC )
11171128 .setExecutorProvider (FixedExecutorProvider .create (fakeExecutor ))
1118- .setChannelProvider (TEST_CHANNEL_PROVIDER )
1129+ .setChannelProvider (
1130+ FixedTransportChannelProvider .create (GrpcTransportChannel .create (testChannel )))
11191131 .setCredentialsProvider (NoCredentialsProvider .create ());
11201132 }
1133+
1134+ private void shutdownTestPublisher (Publisher publisher ) throws InterruptedException {
1135+ publisher .shutdown ();
1136+ fakeExecutor .advanceTime (Duration .ofSeconds (10 ));
1137+ assertTrue (publisher .awaitTermination (1 , TimeUnit .MINUTES ));
1138+ }
11211139}
0 commit comments