Skip to content
This repository was archived by the owner on Mar 4, 2026. It is now read-only.

Commit bcaecb4

Browse files
tom-andersendconeybegcf-owl-bot[bot]
authored
feat: COUNT Queries (#1774)
Co-authored-by: Denver Coneybeare <[email protected]> Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 5ba8df0 commit bcaecb4

8 files changed

Lines changed: 775 additions & 15 deletions

File tree

dev/src/reference.ts

Lines changed: 323 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616

1717
import * as firestore from '@google-cloud/firestore';
18-
import {Duplex, Transform} from 'stream';
18+
import {Duplex, Readable, Transform} from 'stream';
1919
import * as deepEqual from 'fast-deep-equal';
2020

2121
import * as protos from '../protos/firestore_v1_proto_api';
@@ -55,7 +55,6 @@ import {
5555
} from './validate';
5656
import {DocumentWatch, QueryWatch} from './watch';
5757
import {validateDocumentData, WriteBatch, WriteResult} from './write-batch';
58-
5958
import api = protos.google.firestore.v1;
6059

6160
/**
@@ -1599,6 +1598,27 @@ export class Query<T = firestore.DocumentData> implements firestore.Query<T> {
15991598
return new Query(this._firestore, options);
16001599
}
16011600

1601+
/**
1602+
* Returns a query that counts the documents in the result set of this
1603+
* query.
1604+
*
1605+
* The returned query, when executed, counts the documents in the result set
1606+
* of this query without actually downloading the documents.
1607+
*
1608+
* Using the returned query to count the documents is efficient because only
1609+
* the final count, not the documents' data, is downloaded. The returned
1610+
* query can even count the documents if the result set would be
1611+
* prohibitively large to download entirely (e.g. thousands of documents).
1612+
*
1613+
* @return a query that counts the documents in the result set of this
1614+
* query. The count can be retrieved from `snapshot.data().count`, where
1615+
* `snapshot` is the `AggregateQuerySnapshot` resulting from running the
1616+
* returned query.
1617+
*/
1618+
count(): AggregateQuery<{count: firestore.AggregateField<number>}> {
1619+
return new AggregateQuery(this, {count: {}});
1620+
}
1621+
16021622
/**
16031623
* Returns true if this `Query` is equal to the provided value.
16041624
*
@@ -2832,6 +2852,307 @@ export class CollectionReference<T = firestore.DocumentData>
28322852
}
28332853
}
28342854

2855+
/**
2856+
* A query that calculates aggregations over an underlying query.
2857+
*/
2858+
export class AggregateQuery<T extends firestore.AggregateSpec>
2859+
implements firestore.AggregateQuery<T>
2860+
{
2861+
/**
2862+
* @private
2863+
* @internal
2864+
*
2865+
* @param _query The query whose aggregations will be calculated by this
2866+
* object.
2867+
* @param _aggregates The aggregations that will be performed by this query.
2868+
*/
2869+
constructor(
2870+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
2871+
private readonly _query: Query<any>,
2872+
private readonly _aggregates: T
2873+
) {}
2874+
2875+
/** The query whose aggregations will be calculated by this object. */
2876+
get query(): firestore.Query<unknown> {
2877+
return this._query;
2878+
}
2879+
2880+
/**
2881+
* Executes this query.
2882+
*
2883+
* @return A promise that will be resolved with the results of the query.
2884+
*/
2885+
get(): Promise<AggregateQuerySnapshot<T>> {
2886+
return this._get();
2887+
}
2888+
2889+
/**
2890+
* Internal get() method that accepts an optional transaction id.
2891+
*
2892+
* @private
2893+
* @internal
2894+
* @param {bytes=} transactionId A transaction ID.
2895+
*/
2896+
_get(transactionId?: Uint8Array): Promise<AggregateQuerySnapshot<T>> {
2897+
// Capture the error stack to preserve stack tracing across async calls.
2898+
const stack = Error().stack!;
2899+
2900+
return new Promise((resolve, reject) => {
2901+
const stream = this._stream(transactionId);
2902+
stream.on('error', err => {
2903+
reject(wrapError(err, stack));
2904+
});
2905+
stream.once('data', result => {
2906+
stream.destroy();
2907+
resolve(result);
2908+
});
2909+
stream.on('end', () => {
2910+
reject('No AggregateQuery results');
2911+
});
2912+
});
2913+
}
2914+
2915+
/**
2916+
* Internal streaming method that accepts an optional transaction ID.
2917+
*
2918+
* @private
2919+
* @internal
2920+
* @param transactionId A transaction ID.
2921+
* @returns A stream of document results.
2922+
*/
2923+
_stream(transactionId?: Uint8Array): Readable {
2924+
const tag = requestTag();
2925+
const firestore = this._query.firestore;
2926+
2927+
const stream: Transform = new Transform({
2928+
objectMode: true,
2929+
transform: (proto: api.IRunAggregationQueryResponse, enc, callback) => {
2930+
if (proto.result) {
2931+
const readTime = Timestamp.fromProto(proto.readTime!);
2932+
const data = this.decodeResult(proto.result);
2933+
callback(
2934+
undefined,
2935+
new AggregateQuerySnapshot<T>(this, readTime, data)
2936+
);
2937+
} else {
2938+
callback(Error('RunAggregationQueryResponse is missing result'));
2939+
}
2940+
},
2941+
});
2942+
2943+
firestore
2944+
.initializeIfNeeded(tag)
2945+
.then(async () => {
2946+
// `toProto()` might throw an exception. We rely on the behavior of an
2947+
// async function to convert this exception into the rejected Promise we
2948+
// catch below.
2949+
const request = this.toProto(transactionId);
2950+
2951+
let streamActive: Deferred<boolean>;
2952+
do {
2953+
streamActive = new Deferred<boolean>();
2954+
const backendStream = await firestore.requestStream(
2955+
'runAggregationQuery',
2956+
/* bidirectional= */ false,
2957+
request,
2958+
tag
2959+
);
2960+
stream.on('close', () => {
2961+
backendStream.resume();
2962+
backendStream.end();
2963+
});
2964+
backendStream.on('error', err => {
2965+
backendStream.unpipe(stream);
2966+
// If a non-transactional query failed, attempt to restart.
2967+
// Transactional queries are retried via the transaction runner.
2968+
if (
2969+
!transactionId &&
2970+
!isPermanentRpcError(err, 'runAggregationQuery')
2971+
) {
2972+
logger(
2973+
'AggregateQuery._stream',
2974+
tag,
2975+
'AggregateQuery failed with retryable stream error:',
2976+
err
2977+
);
2978+
streamActive.resolve(/* active= */ true);
2979+
} else {
2980+
logger(
2981+
'AggregateQuery._stream',
2982+
tag,
2983+
'AggregateQuery failed with stream error:',
2984+
err
2985+
);
2986+
stream.destroy(err);
2987+
streamActive.resolve(/* active= */ false);
2988+
}
2989+
});
2990+
backendStream.resume();
2991+
backendStream.pipe(stream);
2992+
} while (await streamActive.promise);
2993+
})
2994+
.catch(e => stream.destroy(e));
2995+
2996+
return stream;
2997+
}
2998+
2999+
/**
3000+
* Internal method to decode values within result.
3001+
* @private
3002+
*/
3003+
private decodeResult(
3004+
proto: api.IAggregationResult
3005+
): firestore.AggregateSpecData<T> {
3006+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
3007+
const data: any = {};
3008+
const fields = proto.aggregateFields;
3009+
if (fields) {
3010+
const serializer = this._query.firestore._serializer!;
3011+
for (const prop of Object.keys(fields)) {
3012+
if (this._aggregates[prop] === undefined) {
3013+
throw new Error(
3014+
`Unexpected alias [${prop}] in result aggregate result`
3015+
);
3016+
}
3017+
data[prop] = serializer.decodeValue(fields[prop]);
3018+
}
3019+
}
3020+
return data;
3021+
}
3022+
3023+
/**
3024+
* Internal method for serializing a query to its RunAggregationQuery proto
3025+
* representation with an optional transaction id.
3026+
*
3027+
* @private
3028+
* @internal
3029+
* @returns Serialized JSON for the query.
3030+
*/
3031+
toProto(transactionId?: Uint8Array): api.IRunAggregationQueryRequest {
3032+
const queryProto = this._query.toProto();
3033+
//TODO(tomandersen) inspect _query to build request - this is just hard
3034+
// coded count right now.
3035+
const runQueryRequest: api.IRunAggregationQueryRequest = {
3036+
parent: queryProto.parent,
3037+
structuredAggregationQuery: {
3038+
structuredQuery: queryProto.structuredQuery,
3039+
aggregations: [
3040+
{
3041+
alias: 'count',
3042+
count: {},
3043+
},
3044+
],
3045+
},
3046+
};
3047+
3048+
if (transactionId instanceof Uint8Array) {
3049+
runQueryRequest.transaction = transactionId;
3050+
}
3051+
3052+
return runQueryRequest;
3053+
}
3054+
3055+
/**
3056+
* Compares this object with the given object for equality.
3057+
*
3058+
* This object is considered "equal" to the other object if and only if
3059+
* `other` performs the same aggregations as this `AggregateQuery` and
3060+
* the underlying Query of `other` compares equal to that of this object
3061+
* using `Query.isEqual()`.
3062+
*
3063+
* @param other The object to compare to this object for equality.
3064+
* @return `true` if this object is "equal" to the given object, as
3065+
* defined above, or `false` otherwise.
3066+
*/
3067+
isEqual(other: firestore.AggregateQuery<T>): boolean {
3068+
if (this === other) {
3069+
return true;
3070+
}
3071+
if (!(other instanceof AggregateQuery)) {
3072+
return false;
3073+
}
3074+
if (!this.query.isEqual(other.query)) {
3075+
return false;
3076+
}
3077+
return deepEqual(this._aggregates, other._aggregates);
3078+
}
3079+
}
3080+
3081+
/**
3082+
* The results of executing an aggregation query.
3083+
*/
3084+
export class AggregateQuerySnapshot<T extends firestore.AggregateSpec>
3085+
implements firestore.AggregateQuerySnapshot<T>
3086+
{
3087+
/**
3088+
* @private
3089+
* @internal
3090+
*
3091+
* @param _query The query that was executed to produce this result.
3092+
* @param _readTime The time this snapshot was read.
3093+
* @param _data The results of the aggregations performed over the underlying
3094+
* query.
3095+
*/
3096+
constructor(
3097+
private readonly _query: AggregateQuery<T>,
3098+
private readonly _readTime: Timestamp,
3099+
private readonly _data: firestore.AggregateSpecData<T>
3100+
) {}
3101+
3102+
/** The query that was executed to produce this result. */
3103+
get query(): firestore.AggregateQuery<T> {
3104+
return this._query;
3105+
}
3106+
3107+
/** The time this snapshot was read. */
3108+
get readTime(): firestore.Timestamp {
3109+
return this._readTime;
3110+
}
3111+
3112+
/**
3113+
* Returns the results of the aggregations performed over the underlying
3114+
* query.
3115+
*
3116+
* The keys of the returned object will be the same as those of the
3117+
* `AggregateSpec` object specified to the aggregation method, and the
3118+
* values will be the corresponding aggregation result.
3119+
*
3120+
* @returns The results of the aggregations performed over the underlying
3121+
* query.
3122+
*/
3123+
data(): firestore.AggregateSpecData<T> {
3124+
return this._data;
3125+
}
3126+
3127+
/**
3128+
* Compares this object with the given object for equality.
3129+
*
3130+
* Two `AggregateQuerySnapshot` instances are considered "equal" if they
3131+
* have the same data and their underlying queries compare "equal" using
3132+
* `AggregateQuery.isEqual()`.
3133+
*
3134+
* @param other The object to compare to this object for equality.
3135+
* @return `true` if this object is "equal" to the given object, as
3136+
* defined above, or `false` otherwise.
3137+
*/
3138+
isEqual(other: firestore.AggregateQuerySnapshot<T>): boolean {
3139+
if (this === other) {
3140+
return true;
3141+
}
3142+
if (!(other instanceof AggregateQuerySnapshot)) {
3143+
return false;
3144+
}
3145+
// Since the read time is different on every read, we explicitly ignore all
3146+
// document metadata in this comparison, just like
3147+
// `DocumentSnapshot.isEqual()` does.
3148+
if (!this.query.isEqual(other.query)) {
3149+
return false;
3150+
}
3151+
3152+
return deepEqual(this._data, other._data);
3153+
}
3154+
}
3155+
28353156
/**
28363157
* Validates the input string as a field order direction.
28373158
*

0 commit comments

Comments
 (0)