Skip to content

Commit 0707971

Browse files
committed
fix race in mock publisher
FakePublisherServiceImpl::publish has a race. If the call to publish happen before a response can be placed, the server will try to respond with a null object. The fix is to either - always set the response before calling - make publish wait for the response For propriety, this commit does both. This fix reveals another flake. Publisher uses exponential backoff with jitter. The jitter randomly picks a number between 0 and a maximum. If we pick low values too many times, it will retry too often and the server will run out of canned transient errors to respond back with. The test still passed since it expected any Throwable. This commit fixed the test to expect FakeException, set the jitter to random in range (max/2, max), and increases the number of canned errors to compensate. Retrying can still causes random test failures, independently of above changes. If a request fails due to DEADLINE_EXCEEDED, the future is completed with a corresponding error. However, the last RPC might not have been successfully cancelled. When a new test starts, it gives canned response to the server. The server might use some of these responses to respond to RPCs of previous tests. Consequently, a misbehaving test can fail every test that comes after it. This commit changes the test setup code so that it creates a new fake server for every test to avoid this problem.
1 parent c68968b commit 0707971

3 files changed

Lines changed: 54 additions & 69 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ private static long computeNextBackoffDelayMs(
501501
* Math.pow(retrySettings.getRetryDelayMultiplier(), outstandingBundle.attempt - 1));
502502
delayMillis = Math.min(retrySettings.getMaxRetryDelay().getMillis(), delayMillis);
503503
outstandingBundle.attempt++;
504-
return ThreadLocalRandom.current().nextLong(0, delayMillis);
504+
return ThreadLocalRandom.current().nextLong(delayMillis / 2, delayMillis);
505505
}
506506

507507
private boolean isRetryable(Throwable t) {

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakePublisherServiceImpl.java

Lines changed: 32 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -21,98 +21,84 @@
2121
import com.google.pubsub.v1.PublishResponse;
2222
import com.google.pubsub.v1.PublisherGrpc.PublisherImplBase;
2323
import io.grpc.stub.StreamObserver;
24-
import java.util.Queue;
2524
import java.util.concurrent.LinkedBlockingQueue;
2625

2726
/**
28-
* A fake implementation of {@link PublisherImplBase}, that can be used to test clients of a
29-
* Cloud Pub/Sub Publisher.
27+
* A fake implementation of {@link PublisherImplBase}, that can be used to test clients of a Cloud
28+
* Pub/Sub Publisher.
3029
*/
3130
class FakePublisherServiceImpl extends PublisherImplBase {
3231

33-
private final Queue<Response> publishResponses = new LinkedBlockingQueue<>();
32+
private final LinkedBlockingQueue<Response> publishResponses = new LinkedBlockingQueue<>();
3433

35-
/**
36-
* Class used to save the state of a possible response.
37-
*/
34+
/** Class used to save the state of a possible response. */
3835
private static class Response {
3936
Optional<PublishResponse> publishResponse;
4037
Optional<Throwable> error;
41-
38+
4239
public Response(PublishResponse publishResponse) {
4340
this.publishResponse = Optional.of(publishResponse);
4441
this.error = Optional.absent();
4542
}
46-
43+
4744
public Response(Throwable exception) {
4845
this.publishResponse = Optional.absent();
4946
this.error = Optional.of(exception);
5047
}
51-
48+
5249
public PublishResponse getPublishResponse() {
5350
return publishResponse.get();
5451
}
55-
52+
5653
public Throwable getError() {
5754
return error.get();
5855
}
59-
56+
6057
boolean isError() {
6158
return error.isPresent();
6259
}
60+
61+
@Override
62+
public String toString() {
63+
if (isError()) {
64+
return error.get().toString();
65+
}
66+
return publishResponse.get().toString();
67+
}
6368
}
6469

6570
@Override
6671
public void publish(PublishRequest request, StreamObserver<PublishResponse> responseObserver) {
67-
Response response = null;
68-
synchronized (publishResponses) {
69-
response = publishResponses.poll();
70-
try {
71-
if (response.isError()) {
72-
responseObserver.onError(response.getError());
73-
return;
74-
}
75-
76-
responseObserver.onNext(response.getPublishResponse());
77-
responseObserver.onCompleted();
78-
} finally {
79-
publishResponses.notifyAll();
80-
}
72+
Response response;
73+
try {
74+
response = publishResponses.take();
75+
} catch (InterruptedException e) {
76+
throw new IllegalArgumentException(e);
77+
}
78+
if (response.isError()) {
79+
responseObserver.onError(response.getError());
80+
} else {
81+
responseObserver.onNext(response.getPublishResponse());
82+
responseObserver.onCompleted();
8183
}
8284
}
8385

8486
public FakePublisherServiceImpl addPublishResponse(PublishResponse publishResponse) {
85-
synchronized (publishResponses) {
86-
publishResponses.add(new Response(publishResponse));
87-
}
87+
publishResponses.add(new Response(publishResponse));
8888
return this;
8989
}
9090

9191
public FakePublisherServiceImpl addPublishResponse(
9292
PublishResponse.Builder publishResponseBuilder) {
93-
addPublishResponse(publishResponseBuilder.build());
94-
return this;
93+
return addPublishResponse(publishResponseBuilder.build());
9594
}
9695

9796
public FakePublisherServiceImpl addPublishError(Throwable error) {
98-
synchronized (publishResponses) {
99-
publishResponses.add(new Response(error));
100-
}
97+
publishResponses.add(new Response(error));
10198
return this;
10299
}
103100

104101
public void reset() {
105-
synchronized (publishResponses) {
106-
publishResponses.clear();
107-
publishResponses.notifyAll();
108-
}
109-
}
110-
111-
public void waitForNoOutstandingResponses() throws InterruptedException {
112-
synchronized (publishResponses) {
113-
while (!publishResponses.isEmpty()) {
114-
publishResponses.wait();
115-
}
116-
}
102+
publishResponses.clear();
117103
}
118104
}

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,8 @@
4444
import io.grpc.stub.StreamObserver;
4545
import java.util.concurrent.ExecutionException;
4646
import org.joda.time.Duration;
47-
import org.junit.AfterClass;
47+
import org.junit.After;
4848
import org.junit.Before;
49-
import org.junit.BeforeClass;
5049
import org.junit.Test;
5150
import org.junit.runner.RunWith;
5251
import org.junit.runners.JUnit4;
@@ -64,20 +63,22 @@ public class PublisherImplTest {
6463
private static final ExecutorProvider SINGLE_THREAD_EXECUTOR =
6564
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();
6665

67-
private static InProcessChannelBuilder testChannelBuilder;
66+
private InProcessChannelBuilder testChannelBuilder;
6867

6968
@Captor private ArgumentCaptor<PublishRequest> requestCaptor;
7069

7170
private FakeScheduledExecutorService fakeExecutor;
7271

7372
private FakeCredentials testCredentials;
7473

75-
private static FakePublisherServiceImpl testPublisherServiceImpl;
74+
private FakePublisherServiceImpl testPublisherServiceImpl;
7675

77-
private static ServerImpl testServer;
76+
private ServerImpl testServer;
7877

79-
@BeforeClass
80-
public static void setUpClass() throws Exception {
78+
class FakeException extends Exception {}
79+
80+
@Before
81+
public void setUp() throws Exception {
8182
testPublisherServiceImpl = Mockito.spy(new FakePublisherServiceImpl());
8283

8384
InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName("test-server");
@@ -86,19 +87,16 @@ public static void setUpClass() throws Exception {
8687
serverBuilder.addService(testPublisherServiceImpl);
8788
testServer = serverBuilder.build();
8889
testServer.start();
89-
}
9090

91-
@Before
92-
public void setUp() throws Exception {
9391
MockitoAnnotations.initMocks(this);
9492
testPublisherServiceImpl.reset();
9593
Mockito.reset(testPublisherServiceImpl);
9694
fakeExecutor = new FakeScheduledExecutorService();
9795
testCredentials = new FakeCredentials();
9896
}
9997

100-
@AfterClass
101-
public static void tearDownClass() throws Exception {
98+
@After
99+
public void tearDown() throws Exception {
102100
testServer.shutdownNow().awaitTermination();
103101
}
104102

@@ -272,19 +270,18 @@ public void testPublishFailureRetries() throws Exception {
272270
.build())
273271
.build(); // To demonstrate that reaching duration will trigger publish
274272

275-
ListenableFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
276-
277273
testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing"));
278274
testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1"));
279275

276+
ListenableFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
277+
280278
assertEquals("1", publishFuture1.get());
281279

282280
Mockito.verify(testPublisherServiceImpl, times(2))
283281
.publish(Mockito.<PublishRequest>any(), Mockito.<StreamObserver<PublishResponse>>any());
284282
publisher.shutdown();
285283
}
286284

287-
@Test(expected = Throwable.class)
288285
public void testPublishFailureRetries_exceededsRetryDuration() throws Exception {
289286
Publisher publisher =
290287
getTestPublisherBuilder()
@@ -302,15 +299,18 @@ public void testPublishFailureRetries_exceededsRetryDuration() throws Exception
302299
.build())
303300
.build(); // To demonstrate that reaching duration will trigger publish
304301

305-
ListenableFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
306-
307-
// With exponential backoff, starting at 5ms we should have no more than 11 retries
308-
for (int i = 0; i < 11; ++i) {
309-
testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing"));
302+
// We use exponential backoff with randomness. 30 should be more than enough.
303+
for (int i = 0; i < 30; ++i) {
304+
testPublisherServiceImpl.addPublishError(new FakeException());
310305
}
306+
ListenableFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
311307

312308
try {
313309
publishFuture1.get();
310+
} catch (ExecutionException e) {
311+
if (!(e.getCause() instanceof FakeException)) {
312+
throw new IllegalStateException("unexpected exception", e);
313+
}
314314
} finally {
315315
Mockito.verify(testPublisherServiceImpl, atLeast(10))
316316
.publish(Mockito.<PublishRequest>any(), Mockito.<StreamObserver<PublishResponse>>any());
@@ -336,9 +336,8 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce
336336
.build())
337337
.build(); // To demonstrate that reaching duration will trigger publish
338338

339-
ListenableFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
340-
341339
testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT));
340+
ListenableFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
342341

343342
try {
344343
publishFuture1.get();

0 commit comments

Comments
 (0)