Skip to content

Commit 97f6164

Browse files
AntonZarutskypongad
authored andcommitted
add-max-retry-property-support (#2062)
1 parent 5af4a46 commit 97f6164

2 files changed

Lines changed: 91 additions & 14 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -344,8 +344,7 @@ public void onSuccess(PublishResponse result) {
344344
+ "the expected %s results. Please contact Cloud Pub/Sub support "
345345
+ "if this frequently occurs",
346346
result.getMessageIdsCount(), outstandingBatch.size()));
347-
for (OutstandingPublish oustandingMessage :
348-
outstandingBatch.outstandingPublishes) {
347+
for (OutstandingPublish oustandingMessage : outstandingBatch.outstandingPublishes) {
349348
oustandingMessage.publishResult.setException(t);
350349
}
351350
return;
@@ -368,9 +367,10 @@ public void onFailure(Throwable t) {
368367
computeNextBackoffDelayMs(outstandingBatch, retrySettings, longRandom);
369368

370369
if (!isRetryable(t)
370+
|| retrySettings.getMaxAttempts() > 0
371+
&& outstandingBatch.getAttempt() > retrySettings.getMaxAttempts()
371372
|| System.currentTimeMillis() + nextBackoffDelay
372-
> outstandingBatch.creationTime
373-
+ retrySettings.getTotalTimeout().toMillis()) {
373+
> outstandingBatch.creationTime + retrySettings.getTotalTimeout().toMillis()) {
374374
try {
375375
for (OutstandingPublish outstandingPublish :
376376
outstandingBatch.outstandingPublishes) {
@@ -408,6 +408,10 @@ private static final class OutstandingBatch {
408408
this.batchSizeBytes = batchSizeBytes;
409409
}
410410

411+
public int getAttempt() {
412+
return attempt;
413+
}
414+
411415
public int size() {
412416
return outstandingPublishes.size();
413417
}
@@ -506,7 +510,7 @@ interface LongRandom {
506510
* Constructs a new {@link Builder} using the given topic.
507511
*
508512
* <p>Example of creating a {@code Publisher}.
509-
* <pre> {@code
513+
* <pre>{@code
510514
* String projectName = "my_project";
511515
* String topicName = "my_topic";
512516
* TopicName topic = TopicName.create(projectName, topicName);

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java

Lines changed: 82 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,8 @@
1616

1717
package com.google.cloud.pubsub.spi.v1;
1818

19-
import static org.junit.Assert.assertEquals;
20-
import static org.junit.Assert.assertFalse;
21-
import static org.junit.Assert.assertTrue;
22-
import static org.junit.Assert.fail;
23-
24-
import com.google.api.gax.batching.BatchingSettings;
2519
import com.google.api.core.ApiFuture;
20+
import com.google.api.gax.batching.BatchingSettings;
2621
import com.google.api.gax.batching.FlowControlSettings;
2722
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
2823
import com.google.api.gax.grpc.ChannelProvider;
@@ -40,14 +35,17 @@
4035
import io.grpc.inprocess.InProcessChannelBuilder;
4136
import io.grpc.inprocess.InProcessServerBuilder;
4237
import io.grpc.internal.ServerImpl;
43-
import java.util.concurrent.ExecutionException;
44-
import java.util.concurrent.Executor;
45-
import org.threeten.bp.Duration;
4638
import org.junit.After;
4739
import org.junit.Before;
4840
import org.junit.Test;
4941
import org.junit.runner.RunWith;
5042
import org.junit.runners.JUnit4;
43+
import org.threeten.bp.Duration;
44+
45+
import java.util.concurrent.ExecutionException;
46+
import java.util.concurrent.Executor;
47+
48+
import static org.junit.Assert.*;
5149

5250
@RunWith(JUnit4.class)
5351
public class PublisherImplTest {
@@ -279,6 +277,81 @@ public void testPublishFailureRetries() throws Exception {
279277
publisher.shutdown();
280278
}
281279

280+
@Test(expected = ExecutionException.class)
281+
public void testPublishFailureRetries_retriesDisabled() throws Exception {
282+
Publisher publisher =
283+
getTestPublisherBuilder()
284+
.setExecutorProvider(SINGLE_THREAD_EXECUTOR)
285+
.setRetrySettings(
286+
Publisher.Builder.DEFAULT_RETRY_SETTINGS
287+
.toBuilder()
288+
.setTotalTimeout(Duration.ofSeconds(10))
289+
.setMaxAttempts(1)
290+
.build())
291+
.build();
292+
293+
testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing"));
294+
295+
ApiFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
296+
297+
try {
298+
publishFuture1.get();
299+
} finally {
300+
assertSame(testPublisherServiceImpl.getCapturedRequests().size(), 1);
301+
publisher.shutdown();
302+
}
303+
}
304+
305+
@Test
306+
public void testPublishFailureRetries_maxRetriesSetup() throws Exception {
307+
Publisher publisher =
308+
getTestPublisherBuilder()
309+
.setExecutorProvider(SINGLE_THREAD_EXECUTOR)
310+
.setRetrySettings(
311+
Publisher.Builder.DEFAULT_RETRY_SETTINGS
312+
.toBuilder()
313+
.setTotalTimeout(Duration.ofSeconds(10))
314+
.setMaxAttempts(3)
315+
.build())
316+
.build();
317+
318+
testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing"));
319+
testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing"));
320+
testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1"));
321+
322+
ApiFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
323+
324+
assertEquals("1", publishFuture1.get());
325+
326+
assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size());
327+
publisher.shutdown();
328+
}
329+
330+
@Test
331+
public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception {
332+
Publisher publisher =
333+
getTestPublisherBuilder()
334+
.setExecutorProvider(SINGLE_THREAD_EXECUTOR)
335+
.setRetrySettings(
336+
Publisher.Builder.DEFAULT_RETRY_SETTINGS
337+
.toBuilder()
338+
.setTotalTimeout(Duration.ofSeconds(10))
339+
.setMaxAttempts(0)
340+
.build())
341+
.build();
342+
343+
testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing"));
344+
testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing"));
345+
testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1"));
346+
347+
ApiFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
348+
349+
assertEquals("1", publishFuture1.get());
350+
351+
assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size());
352+
publisher.shutdown();
353+
}
354+
282355
public void testPublishFailureRetries_exceededsRetryDuration() throws Exception {
283356
Publisher publisher =
284357
getTestPublisherBuilder()

0 commit comments

Comments
 (0)