Skip to content

Commit b2b2f86

Browse files
fix: ensure proper cleanup of publisher in tests (#310)
* fix: better cleanup during publisher test * fix: format issues * fix: test timeouts should be a minute
1 parent 822005c commit b2b2f86

1 file changed

Lines changed: 44 additions & 26 deletions

File tree

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,22 @@
3131
import com.google.api.gax.core.FixedExecutorProvider;
3232
import com.google.api.gax.core.InstantiatingExecutorProvider;
3333
import com.google.api.gax.core.NoCredentialsProvider;
34+
import com.google.api.gax.grpc.GrpcTransportChannel;
3435
import com.google.api.gax.grpc.testing.LocalChannelProvider;
3536
import com.google.api.gax.rpc.DataLossException;
37+
import com.google.api.gax.rpc.FixedTransportChannelProvider;
3638
import com.google.api.gax.rpc.TransportChannelProvider;
3739
import com.google.cloud.pubsub.v1.Publisher.Builder;
3840
import com.google.protobuf.ByteString;
3941
import com.google.pubsub.v1.ProjectTopicName;
4042
import com.google.pubsub.v1.PublishRequest;
4143
import com.google.pubsub.v1.PublishResponse;
4244
import com.google.pubsub.v1.PubsubMessage;
45+
import io.grpc.ManagedChannel;
4346
import io.grpc.Server;
4447
import io.grpc.Status;
4548
import io.grpc.StatusException;
49+
import io.grpc.inprocess.InProcessChannelBuilder;
4650
import io.grpc.inprocess.InProcessServerBuilder;
4751
import java.util.List;
4852
import java.util.concurrent.CountDownLatch;
@@ -75,6 +79,8 @@ public class PublisherImplTest {
7579

7680
private FakePublisherServiceImpl testPublisherServiceImpl;
7781

82+
private ManagedChannel testChannel;
83+
7884
private Server testServer;
7985

8086
@Before
@@ -84,6 +90,7 @@ public void setUp() throws Exception {
8490
InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName("test-server");
8591
serverBuilder.addService(testPublisherServiceImpl);
8692
testServer = serverBuilder.build();
93+
testChannel = InProcessChannelBuilder.forName("test-server").build();
8794
testServer.start();
8895

8996
fakeExecutor = new FakeScheduledExecutorService();
@@ -92,6 +99,7 @@ public void setUp() throws Exception {
9299
@After
93100
public void tearDown() throws Exception {
94101
testServer.shutdownNow().awaitTermination();
102+
testChannel.shutdown();
95103
}
96104

97105
@Test
@@ -122,8 +130,7 @@ public void testPublishByDuration() throws Exception {
122130
assertEquals("2", publishFuture2.get());
123131

124132
assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount());
125-
publisher.shutdown();
126-
publisher.awaitTermination(1, TimeUnit.MINUTES);
133+
shutdownTestPublisher(publisher);
127134
}
128135

129136
@Test
@@ -160,8 +167,9 @@ public void testPublishByNumBatchedMessages() throws Exception {
160167

161168
assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount());
162169
assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(1).getMessagesCount());
163-
publisher.shutdown();
164-
publisher.awaitTermination(1, TimeUnit.MINUTES);
170+
171+
fakeExecutor.advanceTime(Duration.ofSeconds(100));
172+
shutdownTestPublisher(publisher);
165173
}
166174

167175
@Test
@@ -195,8 +203,9 @@ public void testSinglePublishByNumBytes() throws Exception {
195203
assertEquals("4", publishFuture4.get());
196204

197205
assertEquals(2, testPublisherServiceImpl.getCapturedRequests().size());
198-
publisher.shutdown();
199-
publisher.awaitTermination(1, TimeUnit.MINUTES);
206+
207+
fakeExecutor.advanceTime(Duration.ofSeconds(100));
208+
shutdownTestPublisher(publisher);
200209
}
201210

202211
@Test
@@ -219,15 +228,16 @@ public void testPublishByShutdown() throws Exception {
219228

220229
// Note we are not advancing time or reaching the count threshold but messages should
221230
// still get published by call to shutdown
222-
223231
publisher.shutdown();
224-
publisher.awaitTermination(1, TimeUnit.MINUTES);
225232

226233
// Verify the publishes completed
227234
assertTrue(publishFuture1.isDone());
228235
assertTrue(publishFuture2.isDone());
229236
assertEquals("1", publishFuture1.get());
230237
assertEquals("2", publishFuture2.get());
238+
239+
fakeExecutor.advanceTime(Duration.ofSeconds(100));
240+
publisher.awaitTermination(1, TimeUnit.MINUTES);
231241
}
232242

233243
@Test
@@ -269,8 +279,7 @@ public void testPublishMixedSizeAndDuration() throws Exception {
269279

270280
assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount());
271281
assertEquals(1, testPublisherServiceImpl.getCapturedRequests().get(1).getMessagesCount());
272-
publisher.shutdown();
273-
publisher.awaitTermination(1, TimeUnit.MINUTES);
282+
shutdownTestPublisher(publisher);
274283
}
275284

276285
private ApiFuture<String> sendTestMessage(Publisher publisher, String data) {
@@ -326,7 +335,9 @@ public void testBatchedMessagesWithOrderingKeyByNum() throws Exception {
326335
}
327336
}
328337
}
329-
publisher.shutdown();
338+
339+
fakeExecutor.advanceTime(Duration.ofSeconds(100));
340+
shutdownTestPublisher(publisher);
330341
}
331342

332343
@Test
@@ -389,7 +400,7 @@ public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception {
389400
}
390401
}
391402
}
392-
publisher.shutdown();
403+
shutdownTestPublisher(publisher);
393404
}
394405

395406
@Test
@@ -418,7 +429,8 @@ public void testLargeMessagesDoNotReorderBatches() throws Exception {
418429
// Verify that messages with "OrderB" were delivered in order.
419430
assertTrue(Integer.parseInt(publishFuture2.get()) < Integer.parseInt(publishFuture3.get()));
420431

421-
publisher.shutdown();
432+
fakeExecutor.advanceTime(Duration.ofSeconds(100));
433+
shutdownTestPublisher(publisher);
422434
}
423435

424436
@Test
@@ -431,7 +443,7 @@ public void testOrderingKeyWhenDisabled_throwsException() throws Exception {
431443
} catch (IllegalStateException expected) {
432444
// expected
433445
}
434-
publisher.shutdown();
446+
shutdownTestPublisher(publisher);
435447
}
436448

437449
@Test
@@ -461,6 +473,7 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception {
461473

462474
assertEquals(4, testPublisherServiceImpl.getCapturedRequests().size());
463475
publisher.shutdown();
476+
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
464477
}
465478

466479
@Test
@@ -550,7 +563,7 @@ public void testResumePublish() throws Exception {
550563
Assert.assertEquals("7", future7.get());
551564
Assert.assertEquals("8", future8.get());
552565

553-
publisher.shutdown();
566+
shutdownTestPublisher(publisher);
554567
}
555568

556569
private ApiFuture<String> sendTestMessageWithOrderingKey(
@@ -604,8 +617,7 @@ public void testPublishFailureRetries() throws Exception {
604617
assertEquals("1", publishFuture1.get());
605618

606619
assertEquals(2, testPublisherServiceImpl.getCapturedRequests().size());
607-
publisher.shutdown();
608-
publisher.awaitTermination(1, TimeUnit.MINUTES);
620+
shutdownTestPublisher(publisher);
609621
}
610622

611623
@Test(expected = ExecutionException.class)
@@ -629,8 +641,7 @@ public void testPublishFailureRetries_retriesDisabled() throws Exception {
629641
publishFuture1.get();
630642
} finally {
631643
assertSame(testPublisherServiceImpl.getCapturedRequests().size(), 1);
632-
publisher.shutdown();
633-
publisher.awaitTermination(1, TimeUnit.MINUTES);
644+
shutdownTestPublisher(publisher);
634645
}
635646
}
636647

@@ -656,8 +667,7 @@ public void testPublishFailureRetries_maxRetriesSetup() throws Exception {
656667
assertEquals("1", publishFuture1.get());
657668

658669
assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size());
659-
publisher.shutdown();
660-
publisher.awaitTermination(1, TimeUnit.MINUTES);
670+
shutdownTestPublisher(publisher);
661671
}
662672

663673
@Test
@@ -683,7 +693,7 @@ public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception
683693

684694
assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size());
685695
publisher.shutdown();
686-
publisher.awaitTermination(1, TimeUnit.MINUTES);
696+
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
687697
}
688698

689699
@Test(expected = ExecutionException.class)
@@ -712,14 +722,15 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce
712722
} finally {
713723
assertTrue(testPublisherServiceImpl.getCapturedRequests().size() >= 1);
714724
publisher.shutdown();
715-
publisher.awaitTermination(1, TimeUnit.MINUTES);
725+
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
716726
}
717727
}
718728

719729
@Test
720730
public void testPublisherGetters() throws Exception {
721731
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);
722-
builder.setChannelProvider(TEST_CHANNEL_PROVIDER);
732+
builder.setChannelProvider(
733+
FixedTransportChannelProvider.create(GrpcTransportChannel.create(testChannel)));
723734
builder.setExecutorProvider(SINGLE_THREAD_EXECUTOR);
724735
builder.setBatchingSettings(
725736
BatchingSettings.newBuilder()
@@ -735,7 +746,7 @@ public void testPublisherGetters() throws Exception {
735746
assertEquals(Duration.ofMillis(11), publisher.getBatchingSettings().getDelayThreshold());
736747
assertEquals(12, (long) publisher.getBatchingSettings().getElementCountThreshold());
737748
publisher.shutdown();
738-
publisher.awaitTermination(1, TimeUnit.MINUTES);
749+
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
739750
}
740751

741752
@Test
@@ -1115,7 +1126,14 @@ public void run() {
11151126
private Builder getTestPublisherBuilder() {
11161127
return Publisher.newBuilder(TEST_TOPIC)
11171128
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
1118-
.setChannelProvider(TEST_CHANNEL_PROVIDER)
1129+
.setChannelProvider(
1130+
FixedTransportChannelProvider.create(GrpcTransportChannel.create(testChannel)))
11191131
.setCredentialsProvider(NoCredentialsProvider.create());
11201132
}
1133+
1134+
private void shutdownTestPublisher(Publisher publisher) throws InterruptedException {
1135+
publisher.shutdown();
1136+
fakeExecutor.advanceTime(Duration.ofSeconds(10));
1137+
assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES));
1138+
}
11211139
}

0 commit comments

Comments
 (0)