2929import com .google .common .util .concurrent .SettableFuture ;
3030import com .google .pubsub .v1 .StreamingPullRequest ;
3131import com .google .pubsub .v1 .StreamingPullResponse ;
32- import com .google .pubsub .v1 .SubscriberGrpc ;
33- import io .grpc .CallOptions ;
34- import io .grpc .Channel ;
32+ import com .google .pubsub .v1 .SubscriberGrpc .SubscriberStub ;
3533import io .grpc .Status ;
3634import io .grpc .stub .ClientCallStreamObserver ;
37- import io .grpc .stub .ClientCalls ;
3835import io .grpc .stub .ClientResponseObserver ;
3936import java .util .ArrayList ;
4037import java .util .List ;
@@ -55,7 +52,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
5552
5653 private Duration channelReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF ;
5754
58- private final Channel channel ;
55+ private final SubscriberStub asyncStub ;
5956
6057 private final String subscription ;
6158 private final ScheduledExecutorService executor ;
@@ -69,14 +66,14 @@ public StreamingSubscriberConnection(
6966 Duration maxAckExtensionPeriod ,
7067 int streamAckDeadlineSeconds ,
7168 Distribution ackLatencyDistribution ,
72- Channel channel ,
69+ SubscriberStub asyncStub ,
7370 FlowController flowController ,
7471 ScheduledExecutorService executor ,
7572 @ Nullable ScheduledExecutorService alarmsExecutor ,
7673 ApiClock clock ) {
7774 this .subscription = subscription ;
7875 this .executor = executor ;
79- this .channel = channel ;
76+ this .asyncStub = asyncStub ;
8077 this .messageDispatcher =
8178 new MessageDispatcher (
8279 receiver ,
@@ -101,8 +98,8 @@ protected void doStart() {
10198 @ Override
10299 protected void doStop () {
103100 messageDispatcher .stop ();
104- notifyStopped ();
105101 requestObserver .onError (Status .CANCELLED .asException ());
102+ notifyStopped ();
106103 }
107104
108105 private class StreamingPullResponseObserver
@@ -137,7 +134,6 @@ public void run() {
137134
138135 @ Override
139136 public void onError (Throwable t ) {
140- logger .log (Level .WARNING , "Terminated streaming with exception" , t );
141137 errorFuture .setException (t );
142138 }
143139
@@ -154,9 +150,7 @@ private void initialize() {
154150 new StreamingPullResponseObserver (errorFuture );
155151 final ClientCallStreamObserver <StreamingPullRequest > requestObserver =
156152 (ClientCallStreamObserver <StreamingPullRequest >)
157- (ClientCalls .asyncBidiStreamingCall (
158- channel .newCall (SubscriberGrpc .METHOD_STREAMING_PULL , CallOptions .DEFAULT ),
159- responseObserver ));
153+ (asyncStub .streamingPull (responseObserver ));
160154 logger .log (
161155 Level .FINER ,
162156 "Initializing stream to subscription {0} with deadline {1}" ,
@@ -173,6 +167,9 @@ private void initialize() {
173167 new FutureCallback <Void >() {
174168 @ Override
175169 public void onSuccess (@ Nullable Void result ) {
170+ if (!isAlive ()) {
171+ return ;
172+ }
176173 channelReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF ;
177174 // The stream was closed. And any case we want to reopen it to continue receiving
178175 // messages.
@@ -186,6 +183,7 @@ public void onFailure(Throwable cause) {
186183 logger .log (Level .FINE , "pull failure after service no longer running" , cause );
187184 return ;
188185 }
186+ logger .log (Level .WARNING , "Terminated streaming with exception" , cause );
189187 if (StatusUtil .isRetryable (cause )) {
190188 long backoffMillis = channelReconnectBackoff .toMillis ();
191189 channelReconnectBackoff = channelReconnectBackoff .plusMillis (backoffMillis );
0 commit comments