3939import java .util .concurrent .ScheduledExecutorService ;
4040import java .util .concurrent .TimeUnit ;
4141import java .util .concurrent .atomic .AtomicLong ;
42+ import java .util .concurrent .locks .Lock ;
43+ import java .util .concurrent .locks .ReentrantLock ;
4244import java .util .logging .Level ;
4345import java .util .logging .Logger ;
4446import javax .annotation .Nullable ;
@@ -53,14 +55,15 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
5355 private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration .ofSeconds (10 );
5456 private static final int MAX_PER_REQUEST_CHANGES = 10000 ;
5557
56- private final AtomicLong channelReconnectBackoffMillis =
57- new AtomicLong (INITIAL_CHANNEL_RECONNECT_BACKOFF .toMillis ());
58-
5958 private final SubscriberStub asyncStub ;
60-
6159 private final String subscription ;
6260 private final ScheduledExecutorService executor ;
6361 private final MessageDispatcher messageDispatcher ;
62+
63+ private final AtomicLong channelReconnectBackoffMillis =
64+ new AtomicLong (INITIAL_CHANNEL_RECONNECT_BACKOFF .toMillis ());
65+
66+ private final Lock lock = new ReentrantLock ();
6467 private ClientCallStreamObserver <StreamingPullRequest > requestObserver ;
6568
6669 public StreamingSubscriberConnection (
@@ -104,22 +107,37 @@ protected void doStart() {
104107 @ Override
105108 protected void doStop () {
106109 messageDispatcher .stop ();
107- requestObserver .onError (Status .CANCELLED .asException ());
108- notifyStopped ();
110+
111+ lock .lock ();
112+ try {
113+ requestObserver .onError (Status .CANCELLED .asException ());
114+ } finally {
115+ lock .unlock ();
116+ notifyStopped ();
117+ }
109118 }
110119
111120 private class StreamingPullResponseObserver
112121 implements ClientResponseObserver <StreamingPullRequest , StreamingPullResponse > {
113122
114123 final SettableFuture <Void > errorFuture ;
115124
125+ /**
126+ * When a batch finsihes processing, we want to request one more batch from the server. But by
127+ * the time this happens, our stream might have already errored, and new stream created. We
128+ * don't want to request more batches from the new stream -- that might pull more messages than
129+ * the user can deal with -- so we save the request observer this response observer is "paired
130+ * with". If the stream has already errored, requesting more messages is a no-op.
131+ */
132+ ClientCallStreamObserver <StreamingPullRequest > thisRequestObserver ;
133+
116134 StreamingPullResponseObserver (SettableFuture <Void > errorFuture ) {
117135 this .errorFuture = errorFuture ;
118136 }
119137
120138 @ Override
121139 public void beforeStart (ClientCallStreamObserver <StreamingPullRequest > requestObserver ) {
122- StreamingSubscriberConnection . this . requestObserver = requestObserver ;
140+ thisRequestObserver = requestObserver ;
123141 requestObserver .disableAutoInboundFlowControl ();
124142 }
125143
@@ -131,9 +149,18 @@ public void onNext(StreamingPullResponse response) {
131149 new Runnable () {
132150 @ Override
133151 public void run () {
134- // Only if not shutdown we will request one more batches of messages to be delivered.
135- if (isAlive ()) {
136- requestObserver .request (1 );
152+ // Only request more if we're not shutdown.
153+ // If errorFuture is done, the stream has either failed or hung up,
154+ // and we don't need to request.
155+ if (isAlive () && !errorFuture .isDone ()) {
156+ lock .lock ();
157+ try {
158+ thisRequestObserver .request (1 );
159+ } catch (Exception e ) {
160+ logger .log (Level .WARNING , "cannot request more messages" , e );
161+ } finally {
162+ lock .unlock ();
163+ }
137164 }
138165 }
139166 });
@@ -169,6 +196,18 @@ private void initialize() {
169196 .build ());
170197 requestObserver .request (1 );
171198
199+ /**
200+ * Must make sure we do this after sending the subscription name and deadline. Otherwise, some
201+ * other thread might use this stream to do something else before we could send the first
202+ * request.
203+ */
204+ lock .lock ();
205+ try {
206+ this .requestObserver = requestObserver ;
207+ } finally {
208+ lock .unlock ();
209+ }
210+
172211 Futures .addCallback (
173212 errorFuture ,
174213 new FutureCallback <Void >() {
@@ -191,24 +230,24 @@ public void onFailure(Throwable cause) {
191230 return ;
192231 }
193232 logger .log (Level .WARNING , "Terminated streaming with exception" , cause );
194- if (StatusUtil .isRetryable (cause )) {
195- long backoffMillis = channelReconnectBackoffMillis .get ();
196- long newBackoffMillis =
197- Math .min (backoffMillis * 2 , MAX_CHANNEL_RECONNECT_BACKOFF .toMillis ());
198- channelReconnectBackoffMillis .set (newBackoffMillis );
199-
200- executor .schedule (
201- new Runnable () {
202- @ Override
203- public void run () {
204- initialize ();
205- }
206- },
207- backoffMillis ,
208- TimeUnit .MILLISECONDS );
209- } else {
233+ if (!StatusUtil .isRetryable (cause )) {
210234 notifyFailed (cause );
235+ return ;
211236 }
237+ long backoffMillis = channelReconnectBackoffMillis .get ();
238+ long newBackoffMillis =
239+ Math .min (backoffMillis * 2 , MAX_CHANNEL_RECONNECT_BACKOFF .toMillis ());
240+ channelReconnectBackoffMillis .set (newBackoffMillis );
241+
242+ executor .schedule (
243+ new Runnable () {
244+ @ Override
245+ public void run () {
246+ initialize ();
247+ }
248+ },
249+ backoffMillis ,
250+ TimeUnit .MILLISECONDS );
212251 }
213252 },
214253 executor );
@@ -223,8 +262,15 @@ public void sendAckOperations(
223262 List <String > acksToSend , List <PendingModifyAckDeadline > ackDeadlineExtensions ) {
224263 List <StreamingPullRequest > requests =
225264 partitionAckOperations (acksToSend , ackDeadlineExtensions , MAX_PER_REQUEST_CHANGES );
226- for (StreamingPullRequest request : requests ) {
227- requestObserver .onNext (request );
265+ lock .lock ();
266+ try {
267+ for (StreamingPullRequest request : requests ) {
268+ requestObserver .onNext (request );
269+ }
270+ } catch (Exception e ) {
271+ logger .log (Level .WARNING , "failed to send acks" , e );
272+ } finally {
273+ lock .unlock ();
228274 }
229275 }
230276
@@ -274,9 +320,16 @@ static List<StreamingPullRequest> partitionAckOperations(
274320
275321 public void updateStreamAckDeadline (int newAckDeadlineSeconds ) {
276322 messageDispatcher .setMessageDeadlineSeconds (newAckDeadlineSeconds );
277- requestObserver .onNext (
278- StreamingPullRequest .newBuilder ()
279- .setStreamAckDeadlineSeconds (newAckDeadlineSeconds )
280- .build ());
323+ lock .lock ();
324+ try {
325+ requestObserver .onNext (
326+ StreamingPullRequest .newBuilder ()
327+ .setStreamAckDeadlineSeconds (newAckDeadlineSeconds )
328+ .build ());
329+ } catch (Exception e ) {
330+ logger .log (Level .WARNING , "failed to set deadline" , e );
331+ } finally {
332+ lock .unlock ();
333+ }
281334 }
282335}
0 commit comments