|
16 | 16 |
|
17 | 17 | package com.google.cloud.pubsub.spi.v1; |
18 | 18 |
|
19 | | -import static org.junit.Assert.assertEquals; |
20 | | -import static org.junit.Assert.assertFalse; |
21 | | -import static org.junit.Assert.assertTrue; |
22 | | -import static org.junit.Assert.fail; |
23 | | - |
24 | | -import com.google.api.gax.batching.BatchingSettings; |
25 | 19 | import com.google.api.core.ApiFuture; |
| 20 | +import com.google.api.gax.batching.BatchingSettings; |
26 | 21 | import com.google.api.gax.batching.FlowControlSettings; |
27 | 22 | import com.google.api.gax.batching.FlowController.LimitExceededBehavior; |
28 | 23 | import com.google.api.gax.grpc.ChannelProvider; |
|
40 | 35 | import io.grpc.inprocess.InProcessChannelBuilder; |
41 | 36 | import io.grpc.inprocess.InProcessServerBuilder; |
42 | 37 | import io.grpc.internal.ServerImpl; |
43 | | -import java.util.concurrent.ExecutionException; |
44 | | -import java.util.concurrent.Executor; |
45 | | -import org.threeten.bp.Duration; |
46 | 38 | import org.junit.After; |
47 | 39 | import org.junit.Before; |
48 | 40 | import org.junit.Test; |
49 | 41 | import org.junit.runner.RunWith; |
50 | 42 | import org.junit.runners.JUnit4; |
| 43 | +import org.threeten.bp.Duration; |
| 44 | + |
| 45 | +import java.util.concurrent.ExecutionException; |
| 46 | +import java.util.concurrent.Executor; |
| 47 | + |
| 48 | +import static org.junit.Assert.*; |
51 | 49 |
|
52 | 50 | @RunWith(JUnit4.class) |
53 | 51 | public class PublisherImplTest { |
@@ -279,6 +277,81 @@ public void testPublishFailureRetries() throws Exception { |
279 | 277 | publisher.shutdown(); |
280 | 278 | } |
281 | 279 |
|
| 280 | + @Test(expected = ExecutionException.class) |
| 281 | + public void testPublishFailureRetries_retriesDisabled() throws Exception { |
| 282 | + Publisher publisher = |
| 283 | + getTestPublisherBuilder() |
| 284 | + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) |
| 285 | + .setRetrySettings( |
| 286 | + Publisher.Builder.DEFAULT_RETRY_SETTINGS |
| 287 | + .toBuilder() |
| 288 | + .setTotalTimeout(Duration.ofSeconds(10)) |
| 289 | + .setMaxAttempts(1) |
| 290 | + .build()) |
| 291 | + .build(); |
| 292 | + |
| 293 | + testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); |
| 294 | + |
| 295 | + ApiFuture<String> publishFuture1 = sendTestMessage(publisher, "A"); |
| 296 | + |
| 297 | + try { |
| 298 | + publishFuture1.get(); |
| 299 | + } finally { |
| 300 | + assertSame(testPublisherServiceImpl.getCapturedRequests().size(), 1); |
| 301 | + publisher.shutdown(); |
| 302 | + } |
| 303 | + } |
| 304 | + |
| 305 | + @Test |
| 306 | + public void testPublishFailureRetries_maxRetriesSetup() throws Exception { |
| 307 | + Publisher publisher = |
| 308 | + getTestPublisherBuilder() |
| 309 | + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) |
| 310 | + .setRetrySettings( |
| 311 | + Publisher.Builder.DEFAULT_RETRY_SETTINGS |
| 312 | + .toBuilder() |
| 313 | + .setTotalTimeout(Duration.ofSeconds(10)) |
| 314 | + .setMaxAttempts(3) |
| 315 | + .build()) |
| 316 | + .build(); |
| 317 | + |
| 318 | + testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); |
| 319 | + testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); |
| 320 | + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1")); |
| 321 | + |
| 322 | + ApiFuture<String> publishFuture1 = sendTestMessage(publisher, "A"); |
| 323 | + |
| 324 | + assertEquals("1", publishFuture1.get()); |
| 325 | + |
| 326 | + assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size()); |
| 327 | + publisher.shutdown(); |
| 328 | + } |
| 329 | + |
| 330 | + @Test |
| 331 | + public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception { |
| 332 | + Publisher publisher = |
| 333 | + getTestPublisherBuilder() |
| 334 | + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) |
| 335 | + .setRetrySettings( |
| 336 | + Publisher.Builder.DEFAULT_RETRY_SETTINGS |
| 337 | + .toBuilder() |
| 338 | + .setTotalTimeout(Duration.ofSeconds(10)) |
| 339 | + .setMaxAttempts(0) |
| 340 | + .build()) |
| 341 | + .build(); |
| 342 | + |
| 343 | + testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); |
| 344 | + testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); |
| 345 | + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1")); |
| 346 | + |
| 347 | + ApiFuture<String> publishFuture1 = sendTestMessage(publisher, "A"); |
| 348 | + |
| 349 | + assertEquals("1", publishFuture1.get()); |
| 350 | + |
| 351 | + assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size()); |
| 352 | + publisher.shutdown(); |
| 353 | + } |
| 354 | + |
282 | 355 | public void testPublishFailureRetries_exceededsRetryDuration() throws Exception { |
283 | 356 | Publisher publisher = |
284 | 357 | getTestPublisherBuilder() |
|
0 commit comments