File tree Expand file tree Collapse file tree
main/java/com/google/cloud/pubsub/spi/v1
test/java/com/google/cloud/pubsub/spi/v1 Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -167,8 +167,12 @@ public void run() {
167167
168168 @ Override
169169 public void onFailure (Throwable cause ) {
170- if (!(cause instanceof StatusRuntimeException )
171- || isRetryable (((StatusRuntimeException ) cause ).getStatus ())) {
170+ if (!isAlive ()) {
171+ // we don't care about subscription failures when we're no longer running.
172+ logger .log (Level .FINE , "pull failure after service no longer running" , cause );
173+ return ;
174+ }
175+ if (StatusUtil .isRetryable (cause )) {
172176 logger .log (Level .SEVERE , "Failed to pull messages (recoverable): " , cause );
173177 executor .schedule (
174178 new Runnable () {
@@ -183,14 +187,18 @@ public void run() {
183187 },
184188 backoff .getMillis (),
185189 TimeUnit .MILLISECONDS );
186- return ;
190+ } else {
191+ messageDispatcher .stop ();
192+ notifyFailed (cause );
187193 }
188- messageDispatcher .stop ();
189- notifyFailed (cause );
190194 }
191195 });
192196 }
193197
198+ private boolean isAlive () {
199+ return state () == State .RUNNING || state () == State .STARTING ;
200+ }
201+
194202 @ Override
195203 public void sendAckOperations (
196204 List <String > acksToSend , List <PendingModifyAckDeadline > ackDeadlineExtensions ) {
Original file line number Diff line number Diff line change 1717package com .google .cloud .pubsub .spi .v1 ;
1818
1919import io .grpc .Status ;
20+ import io .grpc .StatusRuntimeException ;
2021
2122/** Utilities for handling gRPC {@link Status}. */
2223final class StatusUtil {
2324 private StatusUtil () {
24- // Static class, not instatiable .
25+ // Static class, not instantiable .
2526 }
2627
27- public static boolean isRetryable (Status status ) {
28- switch (status .getCode ()) {
28+ public static boolean isRetryable (Throwable error ) {
29+ if (!(error instanceof StatusRuntimeException )) {
30+ return true ;
31+ }
32+ StatusRuntimeException statusRuntimeException = (StatusRuntimeException ) error ;
33+ switch (statusRuntimeException .getStatus ().getCode ()) {
2934 case DEADLINE_EXCEEDED :
3035 case INTERNAL :
3136 case CANCELLED :
3237 case RESOURCE_EXHAUSTED :
33- case UNAVAILABLE :
3438 return true ;
39+ case UNAVAILABLE :
40+ if (statusRuntimeException .getMessage ().contains ("Server shutdownNow invoked" )) {
41+ return false ;
42+ } else {
43+ return true ;
44+ }
3545 default :
3646 return false ;
3747 }
Original file line number Diff line number Diff line change 1616
1717package com .google .cloud .pubsub .spi .v1 ;
1818
19- import static com .google .cloud .pubsub .spi .v1 .StatusUtil .isRetryable ;
20-
2119import com .google .api .gax .core .FlowController ;
2220import com .google .api .stats .Distribution ;
2321import com .google .auth .Credentials ;
@@ -179,9 +177,13 @@ public void onSuccess(@Nullable Void result) {
179177 }
180178
181179 @ Override
182- public void onFailure (Throwable t ) {
183- Status errorStatus = Status .fromThrowable (t );
184- if (isRetryable (errorStatus ) && isAlive ()) {
180+ public void onFailure (Throwable cause ) {
181+ if (!isAlive ()) {
182+ // we don't care about subscription failures when we're no longer running.
183+ logger .log (Level .FINE , "pull failure after service no longer running" , cause );
184+ return ;
185+ }
186+ if (StatusUtil .isRetryable (cause )) {
185187 long backoffMillis = channelReconnectBackoff .getMillis ();
186188 channelReconnectBackoff = channelReconnectBackoff .plus (backoffMillis );
187189 executor .schedule (
@@ -194,9 +196,7 @@ public void run() {
194196 backoffMillis ,
195197 TimeUnit .MILLISECONDS );
196198 } else {
197- if (isAlive ()) {
198- notifyFailed (t );
199- }
199+ notifyFailed (cause );
200200 }
201201 }
202202 },
Original file line number Diff line number Diff line change 1+ package com .google .cloud .pubsub .spi .v1 ;
2+
3+ import com .google .common .truth .Truth ;
4+ import io .grpc .Status ;
5+ import io .grpc .StatusRuntimeException ;
6+ import org .junit .Test ;
7+
8+ public class StatusUtilTest {
9+
10+ @ Test
11+ public void testIsRetryable () {
12+ Truth .assertThat (StatusUtil .isRetryable (new StatusRuntimeException (Status .UNAVAILABLE )))
13+ .isTrue ();
14+ Truth .assertThat (StatusUtil .isRetryable (new StatusRuntimeException (
15+ Status .UNAVAILABLE .withDescription ("Server shutdownNow invoked" ))))
16+ .isFalse ();
17+ }
18+ }
You can’t perform that action at this time.
0 commit comments