Skip to content

Commit b79631a

Browse files
Fixing intermittent pubsub test failures
1 parent 619f1d8 commit b79631a

4 files changed

Lines changed: 53 additions & 17 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,12 @@ public void run() {
167167

168168
@Override
169169
public void onFailure(Throwable cause) {
170-
if (!(cause instanceof StatusRuntimeException)
171-
|| isRetryable(((StatusRuntimeException) cause).getStatus())) {
170+
if (!isAlive()) {
171+
// we don't care about subscription failures when we're no longer running.
172+
logger.log(Level.FINE, "pull failure after service no longer running", cause);
173+
return;
174+
}
175+
if (StatusUtil.isRetryable(cause)) {
172176
logger.log(Level.SEVERE, "Failed to pull messages (recoverable): ", cause);
173177
executor.schedule(
174178
new Runnable() {
@@ -183,14 +187,18 @@ public void run() {
183187
},
184188
backoff.getMillis(),
185189
TimeUnit.MILLISECONDS);
186-
return;
190+
} else {
191+
messageDispatcher.stop();
192+
notifyFailed(cause);
187193
}
188-
messageDispatcher.stop();
189-
notifyFailed(cause);
190194
}
191195
});
192196
}
193197

198+
private boolean isAlive() {
199+
return state() == State.RUNNING || state() == State.STARTING;
200+
}
201+
194202
@Override
195203
public void sendAckOperations(
196204
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions) {

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StatusUtil.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,31 @@
1717
package com.google.cloud.pubsub.spi.v1;
1818

1919
import io.grpc.Status;
20+
import io.grpc.StatusRuntimeException;
2021

2122
/** Utilities for handling gRPC {@link Status}. */
2223
final class StatusUtil {
2324
private StatusUtil() {
24-
// Static class, not instatiable.
25+
// Static class, not instantiable.
2526
}
2627

27-
public static boolean isRetryable(Status status) {
28-
switch (status.getCode()) {
28+
public static boolean isRetryable(Throwable error) {
29+
if (!(error instanceof StatusRuntimeException)) {
30+
return true;
31+
}
32+
StatusRuntimeException statusRuntimeException = (StatusRuntimeException) error;
33+
switch (statusRuntimeException.getStatus().getCode()) {
2934
case DEADLINE_EXCEEDED:
3035
case INTERNAL:
3136
case CANCELLED:
3237
case RESOURCE_EXHAUSTED:
33-
case UNAVAILABLE:
3438
return true;
39+
case UNAVAILABLE:
40+
if (statusRuntimeException.getMessage().contains("Server shutdownNow invoked")) {
41+
return false;
42+
} else {
43+
return true;
44+
}
3545
default:
3646
return false;
3747
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package com.google.cloud.pubsub.spi.v1;
1818

19-
import static com.google.cloud.pubsub.spi.v1.StatusUtil.isRetryable;
20-
2119
import com.google.api.gax.core.FlowController;
2220
import com.google.api.stats.Distribution;
2321
import com.google.auth.Credentials;
@@ -179,9 +177,13 @@ public void onSuccess(@Nullable Void result) {
179177
}
180178

181179
@Override
182-
public void onFailure(Throwable t) {
183-
Status errorStatus = Status.fromThrowable(t);
184-
if (isRetryable(errorStatus) && isAlive()) {
180+
public void onFailure(Throwable cause) {
181+
if (!isAlive()) {
182+
// we don't care about subscription failures when we're no longer running.
183+
logger.log(Level.FINE, "pull failure after service no longer running", cause);
184+
return;
185+
}
186+
if (StatusUtil.isRetryable(cause)) {
185187
long backoffMillis = channelReconnectBackoff.getMillis();
186188
channelReconnectBackoff = channelReconnectBackoff.plus(backoffMillis);
187189
executor.schedule(
@@ -194,9 +196,7 @@ public void run() {
194196
backoffMillis,
195197
TimeUnit.MILLISECONDS);
196198
} else {
197-
if (isAlive()) {
198-
notifyFailed(t);
199-
}
199+
notifyFailed(cause);
200200
}
201201
}
202202
},
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.google.cloud.pubsub.spi.v1;
2+
3+
import com.google.common.truth.Truth;
4+
import io.grpc.Status;
5+
import io.grpc.StatusRuntimeException;
6+
import org.junit.Test;
7+
8+
public class StatusUtilTest {
9+
10+
@Test
11+
public void testIsRetryable() {
12+
Truth.assertThat(StatusUtil.isRetryable(new StatusRuntimeException(Status.UNAVAILABLE)))
13+
.isTrue();
14+
Truth.assertThat(StatusUtil.isRetryable(new StatusRuntimeException(
15+
Status.UNAVAILABLE.withDescription("Server shutdownNow invoked"))))
16+
.isFalse();
17+
}
18+
}

0 commit comments

Comments
 (0)