Skip to content

Commit 6302963

Browse files
authored
---
yaml --- r: 8803 b: refs/heads/lesv-patch-1 c: 87dd418 h: refs/heads/master i: 8801: 3b922b0 8799: cd8fd8e
1 parent b827649 commit 6302963

4 files changed

Lines changed: 25 additions & 8 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ refs/tags/v0.22.0: 18b298fe4bfe8ec2f20b0e0bf7ffdcce5cc3c5fe
6666
refs/heads/vam-google-patch-1: d0c8fee3a4074d0bf7360ce8c4f7f7223d0ee7b9
6767
refs/heads/vam-google-patch-CODEOWNERS: 2ac1616e25229e51d08a984708ef1918f91a35ee
6868
refs/heads/danoscarmike-patch-1: 7342a9916bce4ed00002c7202e2a16c5d46afaea
69-
refs/heads/lesv-patch-1: 23bb30ea382d237c9eebf9758f89194f1cb0eb75
69+
refs/heads/lesv-patch-1: 87dd4186a9cd58fc405caf1135330d2e4c24b7c1
7070
refs/heads/ml-update-branch: 079dd6610017f5c51b9d1938c12d6d55b61513cf
7171
refs/heads/vkedia-patch-2: 7d8241388a9769a5c069334761b06c7012c878e7
7272
refs/heads/vkedia-patch-3: 4d128043acaa7db9160faf439d2ca6104e8a88cb

branches/lesv-patch-1/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import com.google.api.gax.core.ExecutorProvider;
2424
import com.google.api.gax.core.InstantiatingExecutorProvider;
2525
import com.google.api.gax.grpc.ChannelProvider;
26+
import com.google.api.gax.grpc.GrpcApiExceptionFactory;
2627
import com.google.api.gax.retrying.RetrySettings;
28+
import com.google.api.gax.rpc.ApiException;
2729
import com.google.auth.Credentials;
2830
import com.google.auth.oauth2.GoogleCredentials;
2931
import com.google.common.annotations.VisibleForTesting;
@@ -367,9 +369,12 @@ public void onFailure(Throwable t) {
367369
|| System.currentTimeMillis() + nextBackoffDelay
368370
> outstandingBatch.creationTime + retrySettings.getTotalTimeout().toMillis()) {
369371
try {
372+
ApiException gaxException =
373+
GrpcApiExceptionFactory.createException(
374+
t, Status.fromThrowable(t).getCode(), false);
370375
for (OutstandingPublish outstandingPublish :
371376
outstandingBatch.outstandingPublishes) {
372-
outstandingPublish.publishResult.setException(t);
377+
outstandingPublish.publishResult.setException(gaxException);
373378
}
374379
} finally {
375380
messagesWaiter.incrementPendingMessages(-outstandingBatch.size());

branches/lesv-patch-1/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import com.google.api.core.ApiClock;
2121
import com.google.api.gax.batching.FlowController;
2222
import com.google.api.gax.core.Distribution;
23+
import com.google.api.gax.grpc.GrpcApiExceptionFactory;
24+
import com.google.api.gax.rpc.ApiException;
2325
import com.google.cloud.pubsub.v1.MessageDispatcher.AckProcessor;
2426
import com.google.cloud.pubsub.v1.MessageDispatcher.PendingModifyAckDeadline;
2527
import com.google.common.annotations.VisibleForTesting;
@@ -230,8 +232,11 @@ public void onFailure(Throwable cause) {
230232
return;
231233
}
232234
if (!StatusUtil.isRetryable(cause)) {
233-
logger.log(Level.SEVERE, "terminated streaming with exception", cause);
234-
notifyFailed(cause);
235+
ApiException gaxException =
236+
GrpcApiExceptionFactory.createException(
237+
cause, Status.fromThrowable(cause).getCode(), false);
238+
logger.log(Level.SEVERE, "terminated streaming with exception", gaxException);
239+
notifyFailed(gaxException);
235240
return;
236241
}
237242
logger.log(Level.FINE, "stream closed with retryable exception; will reconnect", cause);

branches/lesv-patch-1/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.google.api.gax.core.FixedExecutorProvider;
2424
import com.google.api.gax.core.InstantiatingExecutorProvider;
2525
import com.google.api.gax.grpc.FixedChannelProvider;
26+
import com.google.api.gax.grpc.GrpcStatusCode;
27+
import com.google.api.gax.rpc.ApiException;
2628
import com.google.cloud.pubsub.v1.FakeSubscriberServiceImpl.ModifyAckDeadline;
2729
import com.google.cloud.pubsub.v1.Subscriber.Builder;
2830
import com.google.common.base.Function;
@@ -38,7 +40,6 @@
3840
import io.grpc.Server;
3941
import io.grpc.Status;
4042
import io.grpc.StatusException;
41-
import io.grpc.StatusRuntimeException;
4243
import io.grpc.inprocess.InProcessChannelBuilder;
4344
import io.grpc.inprocess.InProcessServerBuilder;
4445
import java.util.ArrayList;
@@ -517,9 +518,15 @@ public void testFailedChannel_fatalError_subscriberFails() throws Exception {
517518
} finally {
518519
// The subscriber must finish with an state error because its FAILED status.
519520
assertEquals(Subscriber.State.FAILED, subscriber.state());
520-
assertEquals(
521-
Status.INVALID_ARGUMENT,
522-
((StatusRuntimeException) subscriber.failureCause()).getStatus());
521+
522+
Throwable t = subscriber.failureCause();
523+
assertTrue(t instanceof ApiException);
524+
525+
ApiException ex = (ApiException) (t);
526+
assertTrue(ex.getStatusCode() instanceof GrpcStatusCode);
527+
528+
GrpcStatusCode grpcCode = (GrpcStatusCode) ex.getStatusCode();
529+
assertEquals(Status.Code.INVALID_ARGUMENT, grpcCode.getCode());
523530
}
524531
}
525532

0 commit comments

Comments
 (0)