Skip to content

Commit 8b60612

Browse files
authored
feat: Count queries (not available for use yet) (#1033)
1 parent 84423f4 commit 8b60612

File tree

13 files changed

+1345
-0
lines changed

13 files changed

+1345
-0
lines changed

google-cloud-firestore/clirr-ignored-differences.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,4 +279,11 @@
279279
<className>com/google/cloud/firestore/Firestore</className>
280280
<method>com.google.api.core.ApiFuture recursiveDelete(*)</method>
281281
</difference>
282+
283+
<!-- Aggregate Queries -->
284+
<difference>
285+
<differenceType>7012</differenceType>
286+
<className>com/google/cloud/firestore/spi/v1/FirestoreRpc</className>
287+
<method>com.google.api.gax.rpc.ServerStreamingCallable runAggregationQueryCallable()</method>
288+
</difference>
282289
</differences>
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.firestore;
18+
19+
import com.google.api.core.ApiFuture;
20+
import com.google.api.core.InternalExtensionOnly;
21+
import com.google.api.core.SettableApiFuture;
22+
import com.google.api.gax.rpc.ResponseObserver;
23+
import com.google.api.gax.rpc.ServerStreamingCallable;
24+
import com.google.api.gax.rpc.StreamController;
25+
import com.google.cloud.Timestamp;
26+
import com.google.firestore.v1.RunAggregationQueryRequest;
27+
import com.google.firestore.v1.RunAggregationQueryResponse;
28+
import com.google.firestore.v1.RunQueryRequest;
29+
import com.google.firestore.v1.StructuredAggregationQuery;
30+
import com.google.firestore.v1.Value;
31+
import com.google.protobuf.ByteString;
32+
import java.util.concurrent.atomic.AtomicBoolean;
33+
import javax.annotation.Nonnull;
34+
import javax.annotation.Nullable;
35+
36+
// TODO(count) Make this class public
37+
@InternalExtensionOnly
38+
class AggregateQuery {
39+
40+
/**
41+
* The "alias" to specify in the {@link RunAggregationQueryRequest} proto when running a count
42+
* query. The actual value is not meaningful, but will be used to get the count out of the {@link
43+
* RunAggregationQueryResponse}.
44+
*/
45+
private static final String ALIAS_COUNT = "count";
46+
47+
@Nonnull private final Query query;
48+
49+
AggregateQuery(@Nonnull Query query) {
50+
this.query = query;
51+
}
52+
53+
@Nonnull
54+
public Query getQuery() {
55+
return query;
56+
}
57+
58+
@Nonnull
59+
public ApiFuture<AggregateQuerySnapshot> get() {
60+
return get(null);
61+
}
62+
63+
@Nonnull
64+
ApiFuture<AggregateQuerySnapshot> get(@Nullable final ByteString transactionId) {
65+
RunAggregationQueryRequest request = toProto(transactionId);
66+
AggregateQueryResponseObserver responseObserver = new AggregateQueryResponseObserver();
67+
ServerStreamingCallable<RunAggregationQueryRequest, RunAggregationQueryResponse> callable =
68+
query.rpcContext.getClient().runAggregationQueryCallable();
69+
70+
query.rpcContext.streamRequest(request, responseObserver, callable);
71+
72+
return responseObserver.getFuture();
73+
}
74+
75+
private final class AggregateQueryResponseObserver
76+
implements ResponseObserver<RunAggregationQueryResponse> {
77+
78+
private final SettableApiFuture<AggregateQuerySnapshot> future = SettableApiFuture.create();
79+
private final AtomicBoolean isFutureNotified = new AtomicBoolean(false);
80+
private StreamController streamController;
81+
82+
SettableApiFuture<AggregateQuerySnapshot> getFuture() {
83+
return future;
84+
}
85+
86+
@Override
87+
public void onStart(StreamController streamController) {
88+
this.streamController = streamController;
89+
}
90+
91+
@Override
92+
public void onResponse(RunAggregationQueryResponse response) {
93+
if (!isFutureNotified.compareAndSet(false, true)) {
94+
return;
95+
}
96+
97+
Timestamp readTime = Timestamp.fromProto(response.getReadTime());
98+
Value value = response.getResult().getAggregateFieldsMap().get(ALIAS_COUNT);
99+
if (value == null) {
100+
throw new IllegalArgumentException(
101+
"RunAggregationQueryResponse is missing required alias: " + ALIAS_COUNT);
102+
} else if (value.getValueTypeCase() != Value.ValueTypeCase.INTEGER_VALUE) {
103+
throw new IllegalArgumentException(
104+
"RunAggregationQueryResponse alias "
105+
+ ALIAS_COUNT
106+
+ " has incorrect type: "
107+
+ value.getValueTypeCase());
108+
}
109+
long count = value.getIntegerValue();
110+
111+
future.set(new AggregateQuerySnapshot(AggregateQuery.this, readTime, count));
112+
113+
// Close the stream to avoid it dangling, since we're not expecting any more responses.
114+
streamController.cancel();
115+
}
116+
117+
@Override
118+
public void onError(Throwable throwable) {
119+
if (!isFutureNotified.compareAndSet(false, true)) {
120+
return;
121+
}
122+
123+
future.setException(throwable);
124+
}
125+
126+
@Override
127+
public void onComplete() {}
128+
}
129+
130+
@Nonnull
131+
public RunAggregationQueryRequest toProto() {
132+
return toProto(null);
133+
}
134+
135+
@Nonnull
136+
RunAggregationQueryRequest toProto(@Nullable final ByteString transactionId) {
137+
RunQueryRequest runQueryRequest = query.toProto();
138+
139+
RunAggregationQueryRequest.Builder request = RunAggregationQueryRequest.newBuilder();
140+
request.setParent(runQueryRequest.getParent());
141+
if (transactionId != null) {
142+
request.setTransaction(transactionId);
143+
}
144+
145+
StructuredAggregationQuery.Builder structuredAggregationQuery =
146+
request.getStructuredAggregationQueryBuilder();
147+
structuredAggregationQuery.setStructuredQuery(runQueryRequest.getStructuredQuery());
148+
149+
StructuredAggregationQuery.Aggregation.Builder aggregation =
150+
StructuredAggregationQuery.Aggregation.newBuilder();
151+
aggregation.setCount(StructuredAggregationQuery.Aggregation.Count.getDefaultInstance());
152+
aggregation.setAlias(ALIAS_COUNT);
153+
structuredAggregationQuery.addAggregations(aggregation);
154+
155+
return request.build();
156+
}
157+
158+
@Nonnull
159+
public static AggregateQuery fromProto(Firestore firestore, RunAggregationQueryRequest proto) {
160+
RunQueryRequest runQueryRequest =
161+
RunQueryRequest.newBuilder()
162+
.setParent(proto.getParent())
163+
.setStructuredQuery(proto.getStructuredAggregationQuery().getStructuredQuery())
164+
.build();
165+
Query query = Query.fromProto(firestore, runQueryRequest);
166+
return new AggregateQuery(query);
167+
}
168+
169+
@Override
170+
public int hashCode() {
171+
return query.hashCode();
172+
}
173+
174+
@Override
175+
public boolean equals(Object obj) {
176+
if (obj == this) {
177+
return true;
178+
} else if (!(obj instanceof AggregateQuery)) {
179+
return false;
180+
}
181+
AggregateQuery other = (AggregateQuery) obj;
182+
return query.equals(other.query);
183+
}
184+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.firestore;
18+
19+
import com.google.api.core.InternalExtensionOnly;
20+
import com.google.cloud.Timestamp;
21+
import java.util.Objects;
22+
import javax.annotation.Nonnull;
23+
24+
// TODO(count) Make this class public
25+
@InternalExtensionOnly
26+
class AggregateQuerySnapshot {
27+
28+
@Nonnull private final AggregateQuery query;
29+
@Nonnull private final Timestamp readTime;
30+
private final long count;
31+
32+
AggregateQuerySnapshot(@Nonnull AggregateQuery query, @Nonnull Timestamp readTime, long count) {
33+
this.query = query;
34+
this.readTime = readTime;
35+
this.count = count;
36+
}
37+
38+
@Nonnull
39+
public AggregateQuery getQuery() {
40+
return query;
41+
}
42+
43+
@Nonnull
44+
public Timestamp getReadTime() {
45+
return readTime;
46+
}
47+
48+
public long getCount() {
49+
return count;
50+
}
51+
52+
@Override
53+
public boolean equals(Object obj) {
54+
if (obj == this) {
55+
return true;
56+
} else if (!(obj instanceof AggregateQuerySnapshot)) {
57+
return false;
58+
}
59+
60+
AggregateQuerySnapshot other = (AggregateQuerySnapshot) obj;
61+
return query.equals(other.query) && readTime.equals(other.readTime) && count == other.count;
62+
}
63+
64+
@Override
65+
public int hashCode() {
66+
return Objects.hash(query, readTime, count);
67+
}
68+
}

google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1846,6 +1846,12 @@ private boolean isRetryableError(Throwable throwable) {
18461846
return false;
18471847
}
18481848

1849+
// TODO(count) Make this method public
1850+
@Nonnull
1851+
AggregateQuery count() {
1852+
return new AggregateQuery(this);
1853+
}
1854+
18491855
/**
18501856
* Returns true if this Query is equal to the provided object.
18511857
*

google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,4 +190,18 @@ public ApiFuture<QuerySnapshot> get(@Nonnull Query query) {
190190

191191
return query.get(transactionId);
192192
}
193+
194+
// TODO(count) Make this method public
195+
/**
196+
* Returns the result from the provided aggregate query. Holds a pessimistic lock on all accessed
197+
* documents.
198+
*
199+
* @return The result of the aggregation.
200+
*/
201+
@Nonnull
202+
ApiFuture<AggregateQuerySnapshot> get(@Nonnull AggregateQuery query) {
203+
Preconditions.checkState(isEmpty(), READ_BEFORE_WRITE_ERROR_MSG);
204+
205+
return query.get(transactionId);
206+
}
193207
}

google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/FirestoreRpc.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import com.google.firestore.v1.ListenResponse;
3838
import com.google.firestore.v1.PartitionQueryRequest;
3939
import com.google.firestore.v1.RollbackRequest;
40+
import com.google.firestore.v1.RunAggregationQueryRequest;
41+
import com.google.firestore.v1.RunAggregationQueryResponse;
4042
import com.google.firestore.v1.RunQueryRequest;
4143
import com.google.firestore.v1.RunQueryResponse;
4244
import com.google.protobuf.Empty;
@@ -60,6 +62,10 @@ public interface FirestoreRpc extends AutoCloseable, ServiceRpc {
6062
/** Runs a query. */
6163
ServerStreamingCallable<RunQueryRequest, RunQueryResponse> runQueryCallable();
6264

65+
/** Runs an aggregation query. */
66+
ServerStreamingCallable<RunAggregationQueryRequest, RunAggregationQueryResponse>
67+
runAggregationQueryCallable();
68+
6369
/** Starts a new transaction. */
6470
UnaryCallable<BeginTransactionRequest, BeginTransactionResponse> beginTransactionCallable();
6571

google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/GrpcFirestoreRpc.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
import com.google.firestore.v1.ListenResponse;
5757
import com.google.firestore.v1.PartitionQueryRequest;
5858
import com.google.firestore.v1.RollbackRequest;
59+
import com.google.firestore.v1.RunAggregationQueryRequest;
60+
import com.google.firestore.v1.RunAggregationQueryResponse;
5961
import com.google.firestore.v1.RunQueryRequest;
6062
import com.google.firestore.v1.RunQueryResponse;
6163
import com.google.protobuf.Empty;
@@ -208,6 +210,12 @@ public ServerStreamingCallable<RunQueryRequest, RunQueryResponse> runQueryCallab
208210
return firestoreStub.runQueryCallable();
209211
}
210212

213+
@Override
214+
public ServerStreamingCallable<RunAggregationQueryRequest, RunAggregationQueryResponse>
215+
runAggregationQueryCallable() {
216+
return firestoreStub.runAggregationQueryCallable();
217+
}
218+
211219
@Override
212220
public UnaryCallable<BeginTransactionRequest, BeginTransactionResponse>
213221
beginTransactionCallable() {

0 commit comments

Comments
 (0)