Skip to content

Commit ee3f8c0

Browse files
fix: Prevent watch stream from emitting events after close. (#1471)
* Prevent watch stream from emitting events after close. * Cleanup * Copyright * Make classes final and hide visibility. * Add comments * Hand roll spy * Add unit test, rename class, add comments. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Copyright * Comment * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Comments --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent c1f478f commit ee3f8c0

File tree

8 files changed

+542
-45
lines changed

8 files changed

+542
-45
lines changed

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -916,7 +916,7 @@ private void sendBatchLocked(final BulkCommitBatch batch, final boolean flush) {
916916
bulkWriterExecutor);
917917
} else {
918918
long delayMs = rateLimiter.getNextRequestDelayMs(batch.getMutationsSize());
919-
logger.log(Level.FINE, String.format("Backing off for %d seconds", delayMs / 1000));
919+
logger.log(Level.FINE, () -> String.format("Backing off for %d seconds", delayMs / 1000));
920920
bulkWriterExecutor.schedule(
921921
() -> {
922922
synchronized (lock) {
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.firestore;
18+
19+
import com.google.api.gax.rpc.BidiStreamObserver;
20+
import com.google.api.gax.rpc.ClientStream;
21+
import com.google.api.gax.rpc.StreamController;
22+
import java.util.function.Function;
23+
import java.util.logging.Logger;
24+
25+
/**
26+
* Conditionally pass through callbacks to wrapped `BidiStreamObserver`.
27+
*
28+
* <p>Due to the asynchronous nature of a stream, there can be a delay between closing a stream and
29+
* the upstream no longer sending responses. Receiving callbacks after closing upstream can have
30+
* undesirable consequences.
31+
*
32+
* <p>The underlying `ClientStream` can be called through the `SilenceableBidiStream`. Methods such
33+
* as `send()` and `closeSend()` are exposed.
34+
*
35+
* <p>The `SilenceableBidiStream` wraps a `BidiStreamObserver`. This is helpful for situations where
36+
* the observer should be detached from a stream. Instead of calling the `closeSend()` method, the
37+
* `closeSendAndSilence()` method will silence the stream by preventing further callbacks including
38+
* `onError` and `onComplete`.
39+
*
40+
* <p>If silenced, the observer could be safely attached to a new stream. This is useful for error
41+
* handling where upstream must be stopped, but a new stream can continue to service the observer.
42+
* In these cases, the old stream cannot be allowed to send more responses, and especially cannot be
43+
* allowed to send `onError` or `onComplete` since that would signal the downstream that the stream
44+
* is finished.
45+
*/
46+
final class SilenceableBidiStream<RequestT, ResponseT>
47+
implements BidiStreamObserver<RequestT, ResponseT> {
48+
49+
private final ClientStream<RequestT> stream;
50+
private final BidiStreamObserver<RequestT, ResponseT> delegate;
51+
private boolean silence = false;
52+
private static final Logger LOGGER = Logger.getLogger(Watch.class.getName());
53+
54+
SilenceableBidiStream(
55+
BidiStreamObserver<RequestT, ResponseT> responseObserverT,
56+
Function<BidiStreamObserver<RequestT, ResponseT>, ClientStream<RequestT>> streamSupplier) {
57+
this.delegate = responseObserverT;
58+
stream = streamSupplier.apply(this);
59+
}
60+
61+
public boolean isSilenced() {
62+
return silence;
63+
}
64+
65+
public void send(RequestT request) {
66+
LOGGER.info(stream.toString());
67+
stream.send(request);
68+
}
69+
70+
public void closeSend() {
71+
LOGGER.info(stream::toString);
72+
stream.closeSend();
73+
}
74+
75+
public void closeSendAndSilence() {
76+
LOGGER.info(stream::toString);
77+
silence = true;
78+
stream.closeSend();
79+
}
80+
81+
@Override
82+
public void onReady(ClientStream<RequestT> stream) {
83+
if (silence) {
84+
LOGGER.info(() -> String.format("Silenced: %s", stream));
85+
} else {
86+
delegate.onReady(stream);
87+
}
88+
}
89+
90+
@Override
91+
public void onStart(StreamController controller) {
92+
if (silence) {
93+
LOGGER.info(() -> String.format("Silenced: %s", stream));
94+
} else {
95+
delegate.onStart(controller);
96+
}
97+
}
98+
99+
@Override
100+
public void onResponse(ResponseT response) {
101+
if (silence) {
102+
LOGGER.info(() -> String.format("Silenced: %s", stream));
103+
} else {
104+
delegate.onResponse(response);
105+
}
106+
}
107+
108+
@Override
109+
public void onError(Throwable t) {
110+
if (silence) {
111+
LOGGER.info(() -> String.format("Silenced: %s", stream));
112+
} else {
113+
delegate.onError(t);
114+
}
115+
}
116+
117+
@Override
118+
public void onComplete() {
119+
if (silence) {
120+
LOGGER.info(() -> String.format("Silenced: %s", stream));
121+
} else {
122+
delegate.onComplete();
123+
}
124+
}
125+
}

google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.concurrent.ScheduledExecutorService;
5050
import java.util.concurrent.TimeUnit;
5151
import java.util.concurrent.atomic.AtomicBoolean;
52+
import java.util.logging.Logger;
5253
import javax.annotation.Nullable;
5354

5455
/**
@@ -59,7 +60,7 @@
5960
* It synchronizes on its own instance so it is advisable not to use this class for external
6061
* synchronization.
6162
*/
62-
class Watch implements BidiStreamObserver<ListenRequest, ListenResponse> {
63+
final class Watch implements BidiStreamObserver<ListenRequest, ListenResponse> {
6364
/**
6465
* Target ID used by watch. Watch uses a fixed target id since we only support one target per
6566
* stream. The actual target ID we use is arbitrary.
@@ -73,7 +74,7 @@ class Watch implements BidiStreamObserver<ListenRequest, ListenResponse> {
7374
private final ExponentialRetryAlgorithm backoff;
7475
private final Target target;
7576
private TimedAttemptSettings nextAttempt;
76-
private ClientStream<ListenRequest> stream;
77+
private SilenceableBidiStream<ListenRequest, ListenResponse> stream;
7778

7879
/** The sorted tree of DocumentSnapshots as sent in the last snapshot. */
7980
private DocumentSet documentSet;
@@ -115,6 +116,8 @@ static class ChangeSet {
115116
List<QueryDocumentSnapshot> updates = new ArrayList<>();
116117
}
117118

119+
private static final Logger LOGGER = Logger.getLogger(Watch.class.getName());
120+
118121
/**
119122
* @param firestore The Firestore Database client.
120123
* @param query The query that is used to order the document snapshots returned by this watch.
@@ -246,7 +249,16 @@ && affectsTarget(change.getTargetIdsList(), WATCH_TARGET_ID)) {
246249
changeMap.put(ResourcePath.create(listenResponse.getDocumentRemove().getDocument()), null);
247250
break;
248251
case FILTER:
249-
if (listenResponse.getFilter().getCount() != currentSize()) {
252+
// Keep copy of counts for producing log message.
253+
// The method currentSize() is computationally expensive, and should only be run once.
254+
int filterCount = listenResponse.getFilter().getCount();
255+
int currentSize = currentSize();
256+
if (filterCount != currentSize) {
257+
LOGGER.info(
258+
() ->
259+
String.format(
260+
"filter: count mismatch filter count %d != current size %d",
261+
filterCount, currentSize));
250262
// We need to remove all the current results.
251263
resetDocs();
252264
// The filter didn't match, so re-issue the query.
@@ -318,7 +330,7 @@ private void resetDocs() {
318330
resumeToken = null;
319331

320332
for (DocumentSnapshot snapshot : documentSet) {
321-
// Mark each document as deleted. If documents are not deleted, they will be send again by
333+
// Mark each document as deleted. If documents are not deleted, they will be sent again by
322334
// the server.
323335
changeMap.put(snapshot.getReference().getResourcePath(), null);
324336
}
@@ -329,7 +341,7 @@ private void resetDocs() {
329341
/** Closes the stream and calls onError() if the stream is still active. */
330342
private void closeStream(final Throwable throwable) {
331343
if (stream != null) {
332-
stream.closeSend();
344+
stream.closeSendAndSilence();
333345
stream = null;
334346
}
335347

@@ -371,7 +383,7 @@ private void maybeReopenStream(Throwable throwable) {
371383
/** Helper to restart the outgoing stream to the backend. */
372384
private void resetStream() {
373385
if (stream != null) {
374-
stream.closeSend();
386+
stream.closeSendAndSilence();
375387
stream = null;
376388
}
377389

@@ -398,7 +410,12 @@ private void initStream() {
398410
nextAttempt = backoff.createNextAttempt(nextAttempt);
399411

400412
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN);
401-
stream = firestore.streamRequest(Watch.this, firestore.getClient().listenCallable());
413+
stream =
414+
new SilenceableBidiStream<>(
415+
Watch.this,
416+
observer ->
417+
firestore.streamRequest(
418+
observer, firestore.getClient().listenCallable()));
402419

403420
ListenRequest.Builder request = ListenRequest.newBuilder();
404421
request.setDatabase(firestore.getDatabaseName());
@@ -459,6 +476,7 @@ private void pushSnapshot(final Timestamp readTime, ByteString nextResumeToken)
459476
if (!hasPushed || !changes.isEmpty()) {
460477
final QuerySnapshot querySnapshot =
461478
QuerySnapshot.withChanges(query, readTime, documentSet, changes);
479+
LOGGER.info(querySnapshot.toString());
462480
userCallbackExecutor.execute(() -> listener.onEvent(querySnapshot, null));
463481
hasPushed = true;
464482
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.firestore;
18+
19+
import com.google.api.gax.rpc.BidiStreamObserver;
20+
import com.google.api.gax.rpc.BidiStreamingCallable;
21+
import com.google.api.gax.rpc.ClientStream;
22+
23+
public final class FirestoreSpy {
24+
25+
public final FirestoreImpl spy;
26+
public BidiStreamObserver streamRequestBidiStreamObserver;
27+
28+
public FirestoreSpy(FirestoreOptions firestoreOptions) {
29+
spy =
30+
new FirestoreImpl(firestoreOptions) {
31+
@Override
32+
public <RequestT, ResponseT> ClientStream<RequestT> streamRequest(
33+
BidiStreamObserver<RequestT, ResponseT> responseObserverT,
34+
BidiStreamingCallable<RequestT, ResponseT> callable) {
35+
streamRequestBidiStreamObserver = responseObserverT;
36+
return super.streamRequest(responseObserverT, callable);
37+
}
38+
};
39+
}
40+
}

0 commit comments

Comments
 (0)