Skip to content

Commit feb1921

Browse files
fix: respect total request timeout for Query retries (#806)
1 parent 6c71649 commit feb1921

File tree

5 files changed

+167
-22
lines changed

5 files changed

+167
-22
lines changed

google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
package com.google.cloud.firestore;
1818

19+
import com.google.api.core.ApiClock;
1920
import com.google.api.core.ApiFuture;
21+
import com.google.api.core.NanoClock;
2022
import com.google.api.core.SettableApiFuture;
2123
import com.google.api.gax.rpc.ApiStreamObserver;
2224
import com.google.api.gax.rpc.BidiStreamingCallable;
@@ -42,6 +44,7 @@
4244
import java.util.Random;
4345
import javax.annotation.Nonnull;
4446
import javax.annotation.Nullable;
47+
import org.threeten.bp.Duration;
4548

4649
/**
4750
* Main implementation of the Firestore client. This is the entry point for all Firestore
@@ -408,6 +411,16 @@ public FirestoreRpc getClient() {
408411
return firestoreClient;
409412
}
410413

414+
@Override
415+
public Duration getTotalRequestTimeout() {
416+
return firestoreOptions.getRetrySettings().getTotalTimeout();
417+
}
418+
419+
@Override
420+
public ApiClock getClock() {
421+
return NanoClock.getDefaultClock();
422+
}
423+
411424
/** Request funnel for all read/write requests. */
412425
@Override
413426
public <RequestT, ResponseT> ApiFuture<ResponseT> sendRequest(

google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreRpcContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.firestore;
1818

19+
import com.google.api.core.ApiClock;
1920
import com.google.api.core.ApiFuture;
2021
import com.google.api.core.InternalApi;
2122
import com.google.api.core.InternalExtensionOnly;
@@ -24,6 +25,7 @@
2425
import com.google.api.gax.rpc.ServerStreamingCallable;
2526
import com.google.api.gax.rpc.UnaryCallable;
2627
import com.google.cloud.firestore.spi.v1.FirestoreRpc;
28+
import org.threeten.bp.Duration;
2729

2830
@InternalApi
2931
@InternalExtensionOnly
@@ -37,6 +39,10 @@ interface FirestoreRpcContext<FS extends Firestore> {
3739

3840
FirestoreRpc getClient();
3941

42+
Duration getTotalRequestTimeout();
43+
44+
ApiClock getClock();
45+
4046
<RequestT, ResponseT> ApiFuture<ResponseT> sendRequest(
4147
RequestT requestT, UnaryCallable<RequestT, ResponseT> callable);
4248

google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import java.util.concurrent.atomic.AtomicReference;
6868
import javax.annotation.Nonnull;
6969
import javax.annotation.Nullable;
70+
import org.threeten.bp.Duration;
7071

7172
/**
7273
* A Query which you can read or listen to. You can also construct refined Query objects by adding
@@ -1342,6 +1343,7 @@ public void onCompleted() {
13421343
responseObserver.onCompleted();
13431344
}
13441345
},
1346+
/* startTimeNanos= */ rpcContext.getClock().nanoTime(),
13451347
/* transactionId= */ null,
13461348
/* readTime= */ null);
13471349
}
@@ -1478,6 +1480,7 @@ Timestamp getReadTime() {
14781480

14791481
private void internalStream(
14801482
final QuerySnapshotObserver documentObserver,
1483+
final long startTimeNanos,
14811484
@Nullable final ByteString transactionId,
14821485
@Nullable final Timestamp readTime) {
14831486
RunQueryRequest.Builder request = RunQueryRequest.newBuilder();
@@ -1533,30 +1536,20 @@ public void onNext(RunQueryResponse response) {
15331536

15341537
@Override
15351538
public void onError(Throwable throwable) {
1536-
// If a non-transactional query failed, attempt to restart.
1537-
// Transactional queries are retried via the transaction runner.
1538-
if (transactionId == null && isRetryableError(throwable)) {
1539+
QueryDocumentSnapshot cursor = lastReceivedDocument.get();
1540+
if (shouldRetry(cursor, throwable)) {
15391541
Tracing.getTracer()
15401542
.getCurrentSpan()
15411543
.addAnnotation("Firestore.Query: Retryable Error");
15421544

1543-
// Restart the query but use the last document we received as the
1544-
// query cursor. Note that this it is ok to not use backoff here
1545-
// since we are requiring at least a single document result.
1546-
QueryDocumentSnapshot cursor = lastReceivedDocument.get();
1547-
if (cursor != null) {
1548-
if (options.getRequireConsistency()) {
1549-
Query.this
1550-
.startAfter(cursor)
1551-
.internalStream(
1552-
documentObserver, /* transactionId= */ null, cursor.getReadTime());
1553-
} else {
1554-
Query.this
1555-
.startAfter(cursor)
1556-
.internalStream(
1557-
documentObserver, /* transactionId= */ null, /* readTime= */ null);
1558-
}
1559-
}
1545+
Query.this
1546+
.startAfter(cursor)
1547+
.internalStream(
1548+
documentObserver,
1549+
startTimeNanos,
1550+
/* transactionId= */ null,
1551+
options.getRequireConsistency() ? cursor.getReadTime() : null);
1552+
15601553
} else {
15611554
Tracing.getTracer().getCurrentSpan().addAnnotation("Firestore.Query: Error");
15621555
documentObserver.onError(throwable);
@@ -1573,6 +1566,30 @@ public void onCompleted() {
15731566
"numDocuments", AttributeValue.longAttributeValue(numDocuments)));
15741567
documentObserver.onCompleted(readTime);
15751568
}
1569+
1570+
boolean shouldRetry(DocumentSnapshot lastDocument, Throwable t) {
1571+
if (transactionId != null) {
1572+
// Transactional queries are retried via the transaction runner.
1573+
return false;
1574+
}
1575+
1576+
if (lastDocument == null) {
1577+
// Only retry if we have received a single result. Retries for RPCs with initial
1578+
// failure are handled by Google Gax, which also implements backoff.
1579+
return false;
1580+
}
1581+
1582+
if (!isRetryableError(t)) {
1583+
return false;
1584+
}
1585+
1586+
if (rpcContext.getTotalRequestTimeout().isZero()) {
1587+
return true;
1588+
}
1589+
1590+
Duration duration = Duration.ofNanos(rpcContext.getClock().nanoTime() - startTimeNanos);
1591+
return duration.compareTo(rpcContext.getTotalRequestTimeout()) < 0;
1592+
}
15761593
};
15771594

15781595
rpcContext.streamRequest(request.build(), observer, rpcContext.getClient().runQueryCallable());
@@ -1642,6 +1659,7 @@ public void onCompleted() {
16421659
result.set(querySnapshot);
16431660
}
16441661
},
1662+
/* startTimeNanos= */ rpcContext.getClock().nanoTime(),
16451663
transactionId,
16461664
/* readTime= */ null);
16471665

google-cloud-firestore/src/test/java/com/google/cloud/firestore/QueryTest.java

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,13 @@
3434
import static com.google.cloud.firestore.LocalFirestoreHelper.unaryFilter;
3535
import static org.junit.Assert.assertEquals;
3636
import static org.junit.Assert.assertFalse;
37+
import static org.junit.Assert.assertThrows;
3738
import static org.junit.Assert.assertTrue;
3839
import static org.junit.Assert.fail;
3940
import static org.mockito.Mockito.doAnswer;
41+
import static org.mockito.Mockito.doReturn;
4042

43+
import com.google.api.core.ApiClock;
4144
import com.google.api.gax.rpc.ApiStreamObserver;
4245
import com.google.api.gax.rpc.ServerStreamingCallable;
4346
import com.google.cloud.Timestamp;
@@ -60,6 +63,7 @@
6063
import java.util.Collections;
6164
import java.util.Iterator;
6265
import java.util.List;
66+
import java.util.concurrent.ExecutionException;
6367
import java.util.concurrent.Semaphore;
6468
import org.junit.Before;
6569
import org.junit.Test;
@@ -71,10 +75,29 @@
7175
import org.mockito.Spy;
7276
import org.mockito.runners.MockitoJUnitRunner;
7377
import org.mockito.stubbing.Answer;
78+
import org.threeten.bp.Duration;
7479

7580
@RunWith(MockitoJUnitRunner.class)
7681
public class QueryTest {
7782

83+
static class MockClock implements ApiClock {
84+
long nanoTime = 0;
85+
86+
public void advance(long nanos) {
87+
nanoTime += nanos;
88+
}
89+
90+
@Override
91+
public long nanoTime() {
92+
return nanoTime;
93+
}
94+
95+
@Override
96+
public long millisTime() {
97+
return nanoTime / 1000000;
98+
}
99+
}
100+
78101
@Spy
79102
private final FirestoreImpl firestoreMock =
80103
new FirestoreImpl(
@@ -87,8 +110,14 @@ public class QueryTest {
87110

88111
private Query query;
89112

113+
private MockClock clock;
114+
90115
@Before
91116
public void before() {
117+
clock = new MockClock();
118+
doReturn(clock).when(firestoreMock).getClock();
119+
doReturn(Duration.ZERO).when(firestoreMock).getTotalRequestTimeout();
120+
92121
query = firestoreMock.collection(COLLECTION_ID);
93122
}
94123

@@ -1025,7 +1054,86 @@ public void onCompleted() {}
10251054
semaphore.acquire();
10261055

10271056
// Verify the request count
1028-
List<RunQueryRequest> requests = runQuery.getAllValues();
1057+
assertEquals(1, runQuery.getAllValues().size());
1058+
}
1059+
1060+
@Test
1061+
public void onlyRetriesWhenResultSent() throws Exception {
1062+
doAnswer(
1063+
queryResponse(
1064+
FirestoreException.forServerRejection(
1065+
Status.DEADLINE_EXCEEDED, "Simulated test failure")))
1066+
.when(firestoreMock)
1067+
.streamRequest(
1068+
runQuery.capture(),
1069+
streamObserverCapture.capture(),
1070+
Matchers.<ServerStreamingCallable>any());
1071+
1072+
assertThrows(ExecutionException.class, () -> query.get().get());
1073+
1074+
// Verify the request count
1075+
assertEquals(1, runQuery.getAllValues().size());
1076+
}
1077+
1078+
@Test
1079+
public void retriesWithoutTimeout() throws Exception {
1080+
final boolean[] returnError = new boolean[] {true};
1081+
1082+
doAnswer(
1083+
(Answer<RunQueryResponse>)
1084+
invocation -> {
1085+
// Advance clock by an hour
1086+
clock.advance(Duration.ofHours(1).toNanos());
1087+
1088+
if (returnError[0]) {
1089+
returnError[0] = false;
1090+
return queryResponse(
1091+
FirestoreException.forServerRejection(
1092+
Status.DEADLINE_EXCEEDED, "Simulated test failure"),
1093+
DOCUMENT_NAME + "1")
1094+
.answer(invocation);
1095+
} else {
1096+
return queryResponse(DOCUMENT_NAME + "2").answer(invocation);
1097+
}
1098+
})
1099+
.when(firestoreMock)
1100+
.streamRequest(
1101+
runQuery.capture(),
1102+
streamObserverCapture.capture(),
1103+
Matchers.<ServerStreamingCallable>any());
1104+
1105+
query.get().get();
1106+
1107+
// Verify the request count
1108+
assertEquals(2, runQuery.getAllValues().size());
1109+
}
1110+
1111+
@Test
1112+
public void doesNotRetryWithTimeout() {
1113+
doReturn(Duration.ofMinutes(1)).when(firestoreMock).getTotalRequestTimeout();
1114+
1115+
doAnswer(
1116+
(Answer<RunQueryResponse>)
1117+
invocation -> {
1118+
// Advance clock by an hour
1119+
clock.advance(Duration.ofHours(1).toNanos());
1120+
1121+
return queryResponse(
1122+
FirestoreException.forServerRejection(
1123+
Status.DEADLINE_EXCEEDED, "Simulated test failure"),
1124+
DOCUMENT_NAME + "1",
1125+
DOCUMENT_NAME + "2")
1126+
.answer(invocation);
1127+
})
1128+
.when(firestoreMock)
1129+
.streamRequest(
1130+
runQuery.capture(),
1131+
streamObserverCapture.capture(),
1132+
Matchers.<ServerStreamingCallable>any());
1133+
1134+
assertThrows(ExecutionException.class, () -> query.get().get());
1135+
1136+
// Verify the request count
10291137
assertEquals(1, runQuery.getAllValues().size());
10301138
}
10311139

google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1769,7 +1769,7 @@ public void testRecursiveDeleteWithCustomBulkWriterInstance() throws Exception {
17691769
}
17701770

17711771
@Test
1772-
public void testEnforcesTimeouts() throws Exception {
1772+
public void testEnforcesTimeouts() {
17731773
FirestoreOptions firestoreOptions =
17741774
FirestoreOptions.newBuilder()
17751775
.setRetrySettings(

0 commit comments

Comments
 (0)