Skip to content

Commit 9033de8

Browse files
authored
---
yaml --- r: 8427 b: refs/heads/master c: 97d50c2 h: refs/heads/master i: 8425: b60bf63 8423: 963d61f
1 parent 60e67f8 commit 9033de8

2 files changed

Lines changed: 86 additions & 33 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
refs/heads/master: 1fefc4ca0c63ee3a4e5b6b35734c161c4faeb599
2+
refs/heads/master: 97d50c2753708d66c7bc8a47a093cffdcb63d887
33
refs/heads/travis: 47e4fee4fd5af9b2a8ce46f23c72ec95f9b195b2
44
refs/heads/gh-pages: 3e16a39145437096333db5811e5c0292719c1823
55
refs/tags/0.0.9: 22f1839238f66c39e67ed4dfdcd273b1ae2e8444

trunk/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 85 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import java.util.concurrent.ScheduledExecutorService;
4040
import java.util.concurrent.TimeUnit;
4141
import java.util.concurrent.atomic.AtomicLong;
42+
import java.util.concurrent.locks.Lock;
43+
import java.util.concurrent.locks.ReentrantLock;
4244
import java.util.logging.Level;
4345
import java.util.logging.Logger;
4446
import javax.annotation.Nullable;
@@ -53,14 +55,15 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
5355
private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10);
5456
private static final int MAX_PER_REQUEST_CHANGES = 10000;
5557

56-
private final AtomicLong channelReconnectBackoffMillis =
57-
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
58-
5958
private final SubscriberStub asyncStub;
60-
6159
private final String subscription;
6260
private final ScheduledExecutorService executor;
6361
private final MessageDispatcher messageDispatcher;
62+
63+
private final AtomicLong channelReconnectBackoffMillis =
64+
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
65+
66+
private final Lock lock = new ReentrantLock();
6467
private ClientCallStreamObserver<StreamingPullRequest> requestObserver;
6568

6669
public StreamingSubscriberConnection(
@@ -104,22 +107,37 @@ protected void doStart() {
104107
@Override
105108
protected void doStop() {
106109
messageDispatcher.stop();
107-
requestObserver.onError(Status.CANCELLED.asException());
108-
notifyStopped();
110+
111+
lock.lock();
112+
try {
113+
requestObserver.onError(Status.CANCELLED.asException());
114+
} finally {
115+
lock.unlock();
116+
notifyStopped();
117+
}
109118
}
110119

111120
private class StreamingPullResponseObserver
112121
implements ClientResponseObserver<StreamingPullRequest, StreamingPullResponse> {
113122

114123
final SettableFuture<Void> errorFuture;
115124

125+
/**
126+
* When a batch finsihes processing, we want to request one more batch from the server. But by
127+
* the time this happens, our stream might have already errored, and new stream created. We
128+
* don't want to request more batches from the new stream -- that might pull more messages than
129+
* the user can deal with -- so we save the request observer this response observer is "paired
130+
* with". If the stream has already errored, requesting more messages is a no-op.
131+
*/
132+
ClientCallStreamObserver<StreamingPullRequest> thisRequestObserver;
133+
116134
StreamingPullResponseObserver(SettableFuture<Void> errorFuture) {
117135
this.errorFuture = errorFuture;
118136
}
119137

120138
@Override
121139
public void beforeStart(ClientCallStreamObserver<StreamingPullRequest> requestObserver) {
122-
StreamingSubscriberConnection.this.requestObserver = requestObserver;
140+
thisRequestObserver = requestObserver;
123141
requestObserver.disableAutoInboundFlowControl();
124142
}
125143

@@ -131,9 +149,18 @@ public void onNext(StreamingPullResponse response) {
131149
new Runnable() {
132150
@Override
133151
public void run() {
134-
// Only if not shutdown we will request one more batches of messages to be delivered.
135-
if (isAlive()) {
136-
requestObserver.request(1);
152+
// Only request more if we're not shutdown.
153+
// If errorFuture is done, the stream has either failed or hung up,
154+
// and we don't need to request.
155+
if (isAlive() && !errorFuture.isDone()) {
156+
lock.lock();
157+
try {
158+
thisRequestObserver.request(1);
159+
} catch (Exception e) {
160+
logger.log(Level.WARNING, "cannot request more messages", e);
161+
} finally {
162+
lock.unlock();
163+
}
137164
}
138165
}
139166
});
@@ -169,6 +196,18 @@ private void initialize() {
169196
.build());
170197
requestObserver.request(1);
171198

199+
/**
200+
* Must make sure we do this after sending the subscription name and deadline. Otherwise, some
201+
* other thread might use this stream to do something else before we could send the first
202+
* request.
203+
*/
204+
lock.lock();
205+
try {
206+
this.requestObserver = requestObserver;
207+
} finally {
208+
lock.unlock();
209+
}
210+
172211
Futures.addCallback(
173212
errorFuture,
174213
new FutureCallback<Void>() {
@@ -191,24 +230,24 @@ public void onFailure(Throwable cause) {
191230
return;
192231
}
193232
logger.log(Level.WARNING, "Terminated streaming with exception", cause);
194-
if (StatusUtil.isRetryable(cause)) {
195-
long backoffMillis = channelReconnectBackoffMillis.get();
196-
long newBackoffMillis =
197-
Math.min(backoffMillis * 2, MAX_CHANNEL_RECONNECT_BACKOFF.toMillis());
198-
channelReconnectBackoffMillis.set(newBackoffMillis);
199-
200-
executor.schedule(
201-
new Runnable() {
202-
@Override
203-
public void run() {
204-
initialize();
205-
}
206-
},
207-
backoffMillis,
208-
TimeUnit.MILLISECONDS);
209-
} else {
233+
if (!StatusUtil.isRetryable(cause)) {
210234
notifyFailed(cause);
235+
return;
211236
}
237+
long backoffMillis = channelReconnectBackoffMillis.get();
238+
long newBackoffMillis =
239+
Math.min(backoffMillis * 2, MAX_CHANNEL_RECONNECT_BACKOFF.toMillis());
240+
channelReconnectBackoffMillis.set(newBackoffMillis);
241+
242+
executor.schedule(
243+
new Runnable() {
244+
@Override
245+
public void run() {
246+
initialize();
247+
}
248+
},
249+
backoffMillis,
250+
TimeUnit.MILLISECONDS);
212251
}
213252
},
214253
executor);
@@ -223,8 +262,15 @@ public void sendAckOperations(
223262
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions) {
224263
List<StreamingPullRequest> requests =
225264
partitionAckOperations(acksToSend, ackDeadlineExtensions, MAX_PER_REQUEST_CHANGES);
226-
for (StreamingPullRequest request : requests) {
227-
requestObserver.onNext(request);
265+
lock.lock();
266+
try {
267+
for (StreamingPullRequest request : requests) {
268+
requestObserver.onNext(request);
269+
}
270+
} catch (Exception e) {
271+
logger.log(Level.WARNING, "failed to send acks", e);
272+
} finally {
273+
lock.unlock();
228274
}
229275
}
230276

@@ -274,9 +320,16 @@ static List<StreamingPullRequest> partitionAckOperations(
274320

275321
public void updateStreamAckDeadline(int newAckDeadlineSeconds) {
276322
messageDispatcher.setMessageDeadlineSeconds(newAckDeadlineSeconds);
277-
requestObserver.onNext(
278-
StreamingPullRequest.newBuilder()
279-
.setStreamAckDeadlineSeconds(newAckDeadlineSeconds)
280-
.build());
323+
lock.lock();
324+
try {
325+
requestObserver.onNext(
326+
StreamingPullRequest.newBuilder()
327+
.setStreamAckDeadlineSeconds(newAckDeadlineSeconds)
328+
.build());
329+
} catch (Exception e) {
330+
logger.log(Level.WARNING, "failed to set deadline", e);
331+
} finally {
332+
lock.unlock();
333+
}
281334
}
282335
}

0 commit comments

Comments
 (0)