|
15 | 15 | */ |
16 | 16 |
|
17 | 17 | import * as firestore from '@google-cloud/firestore'; |
18 | | -import {Duplex, Transform} from 'stream'; |
| 18 | +import {Duplex, Readable, Transform} from 'stream'; |
19 | 19 | import * as deepEqual from 'fast-deep-equal'; |
20 | 20 |
|
21 | 21 | import * as protos from '../protos/firestore_v1_proto_api'; |
@@ -55,7 +55,6 @@ import { |
55 | 55 | } from './validate'; |
56 | 56 | import {DocumentWatch, QueryWatch} from './watch'; |
57 | 57 | import {validateDocumentData, WriteBatch, WriteResult} from './write-batch'; |
58 | | - |
59 | 58 | import api = protos.google.firestore.v1; |
60 | 59 |
|
61 | 60 | /** |
@@ -1599,6 +1598,27 @@ export class Query<T = firestore.DocumentData> implements firestore.Query<T> { |
1599 | 1598 | return new Query(this._firestore, options); |
1600 | 1599 | } |
1601 | 1600 |
|
| 1601 | + /** |
| 1602 | + * Returns a query that counts the documents in the result set of this |
| 1603 | + * query. |
| 1604 | + * |
| 1605 | + * The returned query, when executed, counts the documents in the result set |
| 1606 | + * of this query without actually downloading the documents. |
| 1607 | + * |
| 1608 | + * Using the returned query to count the documents is efficient because only |
| 1609 | + * the final count, not the documents' data, is downloaded. The returned |
| 1610 | + * query can even count the documents if the result set would be |
| 1611 | + * prohibitively large to download entirely (e.g. thousands of documents). |
| 1612 | + * |
| 1613 | + * @return a query that counts the documents in the result set of this |
| 1614 | + * query. The count can be retrieved from `snapshot.data().count`, where |
| 1615 | + * `snapshot` is the `AggregateQuerySnapshot` resulting from running the |
| 1616 | + * returned query. |
| 1617 | + */ |
| 1618 | + count(): AggregateQuery<{count: firestore.AggregateField<number>}> { |
| 1619 | + return new AggregateQuery(this, {count: {}}); |
| 1620 | + } |
| 1621 | + |
1602 | 1622 | /** |
1603 | 1623 | * Returns true if this `Query` is equal to the provided value. |
1604 | 1624 | * |
@@ -2832,6 +2852,307 @@ export class CollectionReference<T = firestore.DocumentData> |
2832 | 2852 | } |
2833 | 2853 | } |
2834 | 2854 |
|
| 2855 | +/** |
| 2856 | + * A query that calculates aggregations over an underlying query. |
| 2857 | + */ |
| 2858 | +export class AggregateQuery<T extends firestore.AggregateSpec> |
| 2859 | + implements firestore.AggregateQuery<T> |
| 2860 | +{ |
| 2861 | + /** |
| 2862 | + * @private |
| 2863 | + * @internal |
| 2864 | + * |
| 2865 | + * @param _query The query whose aggregations will be calculated by this |
| 2866 | + * object. |
| 2867 | + * @param _aggregates The aggregations that will be performed by this query. |
| 2868 | + */ |
| 2869 | + constructor( |
| 2870 | + // eslint-disable-next-line @typescript-eslint/no-explicit-any |
| 2871 | + private readonly _query: Query<any>, |
| 2872 | + private readonly _aggregates: T |
| 2873 | + ) {} |
| 2874 | + |
| 2875 | + /** The query whose aggregations will be calculated by this object. */ |
| 2876 | + get query(): firestore.Query<unknown> { |
| 2877 | + return this._query; |
| 2878 | + } |
| 2879 | + |
| 2880 | + /** |
| 2881 | + * Executes this query. |
| 2882 | + * |
| 2883 | + * @return A promise that will be resolved with the results of the query. |
| 2884 | + */ |
| 2885 | + get(): Promise<AggregateQuerySnapshot<T>> { |
| 2886 | + return this._get(); |
| 2887 | + } |
| 2888 | + |
| 2889 | + /** |
| 2890 | + * Internal get() method that accepts an optional transaction id. |
| 2891 | + * |
| 2892 | + * @private |
| 2893 | + * @internal |
| 2894 | + * @param {bytes=} transactionId A transaction ID. |
| 2895 | + */ |
| 2896 | + _get(transactionId?: Uint8Array): Promise<AggregateQuerySnapshot<T>> { |
| 2897 | + // Capture the error stack to preserve stack tracing across async calls. |
| 2898 | + const stack = Error().stack!; |
| 2899 | + |
| 2900 | + return new Promise((resolve, reject) => { |
| 2901 | + const stream = this._stream(transactionId); |
| 2902 | + stream.on('error', err => { |
| 2903 | + reject(wrapError(err, stack)); |
| 2904 | + }); |
| 2905 | + stream.once('data', result => { |
| 2906 | + stream.destroy(); |
| 2907 | + resolve(result); |
| 2908 | + }); |
| 2909 | + stream.on('end', () => { |
| 2910 | + reject('No AggregateQuery results'); |
| 2911 | + }); |
| 2912 | + }); |
| 2913 | + } |
| 2914 | + |
| 2915 | + /** |
| 2916 | + * Internal streaming method that accepts an optional transaction ID. |
| 2917 | + * |
| 2918 | + * @private |
| 2919 | + * @internal |
| 2920 | + * @param transactionId A transaction ID. |
| 2921 | + * @returns A stream of document results. |
| 2922 | + */ |
| 2923 | + _stream(transactionId?: Uint8Array): Readable { |
| 2924 | + const tag = requestTag(); |
| 2925 | + const firestore = this._query.firestore; |
| 2926 | + |
| 2927 | + const stream: Transform = new Transform({ |
| 2928 | + objectMode: true, |
| 2929 | + transform: (proto: api.IRunAggregationQueryResponse, enc, callback) => { |
| 2930 | + if (proto.result) { |
| 2931 | + const readTime = Timestamp.fromProto(proto.readTime!); |
| 2932 | + const data = this.decodeResult(proto.result); |
| 2933 | + callback( |
| 2934 | + undefined, |
| 2935 | + new AggregateQuerySnapshot<T>(this, readTime, data) |
| 2936 | + ); |
| 2937 | + } else { |
| 2938 | + callback(Error('RunAggregationQueryResponse is missing result')); |
| 2939 | + } |
| 2940 | + }, |
| 2941 | + }); |
| 2942 | + |
| 2943 | + firestore |
| 2944 | + .initializeIfNeeded(tag) |
| 2945 | + .then(async () => { |
| 2946 | + // `toProto()` might throw an exception. We rely on the behavior of an |
| 2947 | + // async function to convert this exception into the rejected Promise we |
| 2948 | + // catch below. |
| 2949 | + const request = this.toProto(transactionId); |
| 2950 | + |
| 2951 | + let streamActive: Deferred<boolean>; |
| 2952 | + do { |
| 2953 | + streamActive = new Deferred<boolean>(); |
| 2954 | + const backendStream = await firestore.requestStream( |
| 2955 | + 'runAggregationQuery', |
| 2956 | + /* bidirectional= */ false, |
| 2957 | + request, |
| 2958 | + tag |
| 2959 | + ); |
| 2960 | + stream.on('close', () => { |
| 2961 | + backendStream.resume(); |
| 2962 | + backendStream.end(); |
| 2963 | + }); |
| 2964 | + backendStream.on('error', err => { |
| 2965 | + backendStream.unpipe(stream); |
| 2966 | + // If a non-transactional query failed, attempt to restart. |
| 2967 | + // Transactional queries are retried via the transaction runner. |
| 2968 | + if ( |
| 2969 | + !transactionId && |
| 2970 | + !isPermanentRpcError(err, 'runAggregationQuery') |
| 2971 | + ) { |
| 2972 | + logger( |
| 2973 | + 'AggregateQuery._stream', |
| 2974 | + tag, |
| 2975 | + 'AggregateQuery failed with retryable stream error:', |
| 2976 | + err |
| 2977 | + ); |
| 2978 | + streamActive.resolve(/* active= */ true); |
| 2979 | + } else { |
| 2980 | + logger( |
| 2981 | + 'AggregateQuery._stream', |
| 2982 | + tag, |
| 2983 | + 'AggregateQuery failed with stream error:', |
| 2984 | + err |
| 2985 | + ); |
| 2986 | + stream.destroy(err); |
| 2987 | + streamActive.resolve(/* active= */ false); |
| 2988 | + } |
| 2989 | + }); |
| 2990 | + backendStream.resume(); |
| 2991 | + backendStream.pipe(stream); |
| 2992 | + } while (await streamActive.promise); |
| 2993 | + }) |
| 2994 | + .catch(e => stream.destroy(e)); |
| 2995 | + |
| 2996 | + return stream; |
| 2997 | + } |
| 2998 | + |
| 2999 | + /** |
| 3000 | + * Internal method to decode values within result. |
| 3001 | + * @private |
| 3002 | + */ |
| 3003 | + private decodeResult( |
| 3004 | + proto: api.IAggregationResult |
| 3005 | + ): firestore.AggregateSpecData<T> { |
| 3006 | + // eslint-disable-next-line @typescript-eslint/no-explicit-any |
| 3007 | + const data: any = {}; |
| 3008 | + const fields = proto.aggregateFields; |
| 3009 | + if (fields) { |
| 3010 | + const serializer = this._query.firestore._serializer!; |
| 3011 | + for (const prop of Object.keys(fields)) { |
| 3012 | + if (this._aggregates[prop] === undefined) { |
| 3013 | + throw new Error( |
| 3014 | + `Unexpected alias [${prop}] in result aggregate result` |
| 3015 | + ); |
| 3016 | + } |
| 3017 | + data[prop] = serializer.decodeValue(fields[prop]); |
| 3018 | + } |
| 3019 | + } |
| 3020 | + return data; |
| 3021 | + } |
| 3022 | + |
| 3023 | + /** |
| 3024 | + * Internal method for serializing a query to its RunAggregationQuery proto |
| 3025 | + * representation with an optional transaction id. |
| 3026 | + * |
| 3027 | + * @private |
| 3028 | + * @internal |
| 3029 | + * @returns Serialized JSON for the query. |
| 3030 | + */ |
| 3031 | + toProto(transactionId?: Uint8Array): api.IRunAggregationQueryRequest { |
| 3032 | + const queryProto = this._query.toProto(); |
| 3033 | + //TODO(tomandersen) inspect _query to build request - this is just hard |
| 3034 | + // coded count right now. |
| 3035 | + const runQueryRequest: api.IRunAggregationQueryRequest = { |
| 3036 | + parent: queryProto.parent, |
| 3037 | + structuredAggregationQuery: { |
| 3038 | + structuredQuery: queryProto.structuredQuery, |
| 3039 | + aggregations: [ |
| 3040 | + { |
| 3041 | + alias: 'count', |
| 3042 | + count: {}, |
| 3043 | + }, |
| 3044 | + ], |
| 3045 | + }, |
| 3046 | + }; |
| 3047 | + |
| 3048 | + if (transactionId instanceof Uint8Array) { |
| 3049 | + runQueryRequest.transaction = transactionId; |
| 3050 | + } |
| 3051 | + |
| 3052 | + return runQueryRequest; |
| 3053 | + } |
| 3054 | + |
| 3055 | + /** |
| 3056 | + * Compares this object with the given object for equality. |
| 3057 | + * |
| 3058 | + * This object is considered "equal" to the other object if and only if |
| 3059 | + * `other` performs the same aggregations as this `AggregateQuery` and |
| 3060 | + * the underlying Query of `other` compares equal to that of this object |
| 3061 | + * using `Query.isEqual()`. |
| 3062 | + * |
| 3063 | + * @param other The object to compare to this object for equality. |
| 3064 | + * @return `true` if this object is "equal" to the given object, as |
| 3065 | + * defined above, or `false` otherwise. |
| 3066 | + */ |
| 3067 | + isEqual(other: firestore.AggregateQuery<T>): boolean { |
| 3068 | + if (this === other) { |
| 3069 | + return true; |
| 3070 | + } |
| 3071 | + if (!(other instanceof AggregateQuery)) { |
| 3072 | + return false; |
| 3073 | + } |
| 3074 | + if (!this.query.isEqual(other.query)) { |
| 3075 | + return false; |
| 3076 | + } |
| 3077 | + return deepEqual(this._aggregates, other._aggregates); |
| 3078 | + } |
| 3079 | +} |
| 3080 | + |
| 3081 | +/** |
| 3082 | + * The results of executing an aggregation query. |
| 3083 | + */ |
| 3084 | +export class AggregateQuerySnapshot<T extends firestore.AggregateSpec> |
| 3085 | + implements firestore.AggregateQuerySnapshot<T> |
| 3086 | +{ |
| 3087 | + /** |
| 3088 | + * @private |
| 3089 | + * @internal |
| 3090 | + * |
| 3091 | + * @param _query The query that was executed to produce this result. |
| 3092 | + * @param _readTime The time this snapshot was read. |
| 3093 | + * @param _data The results of the aggregations performed over the underlying |
| 3094 | + * query. |
| 3095 | + */ |
| 3096 | + constructor( |
| 3097 | + private readonly _query: AggregateQuery<T>, |
| 3098 | + private readonly _readTime: Timestamp, |
| 3099 | + private readonly _data: firestore.AggregateSpecData<T> |
| 3100 | + ) {} |
| 3101 | + |
| 3102 | + /** The query that was executed to produce this result. */ |
| 3103 | + get query(): firestore.AggregateQuery<T> { |
| 3104 | + return this._query; |
| 3105 | + } |
| 3106 | + |
| 3107 | + /** The time this snapshot was read. */ |
| 3108 | + get readTime(): firestore.Timestamp { |
| 3109 | + return this._readTime; |
| 3110 | + } |
| 3111 | + |
| 3112 | + /** |
| 3113 | + * Returns the results of the aggregations performed over the underlying |
| 3114 | + * query. |
| 3115 | + * |
| 3116 | + * The keys of the returned object will be the same as those of the |
| 3117 | + * `AggregateSpec` object specified to the aggregation method, and the |
| 3118 | + * values will be the corresponding aggregation result. |
| 3119 | + * |
| 3120 | + * @returns The results of the aggregations performed over the underlying |
| 3121 | + * query. |
| 3122 | + */ |
| 3123 | + data(): firestore.AggregateSpecData<T> { |
| 3124 | + return this._data; |
| 3125 | + } |
| 3126 | + |
| 3127 | + /** |
| 3128 | + * Compares this object with the given object for equality. |
| 3129 | + * |
| 3130 | + * Two `AggregateQuerySnapshot` instances are considered "equal" if they |
| 3131 | + * have the same data and their underlying queries compare "equal" using |
| 3132 | + * `AggregateQuery.isEqual()`. |
| 3133 | + * |
| 3134 | + * @param other The object to compare to this object for equality. |
| 3135 | + * @return `true` if this object is "equal" to the given object, as |
| 3136 | + * defined above, or `false` otherwise. |
| 3137 | + */ |
| 3138 | + isEqual(other: firestore.AggregateQuerySnapshot<T>): boolean { |
| 3139 | + if (this === other) { |
| 3140 | + return true; |
| 3141 | + } |
| 3142 | + if (!(other instanceof AggregateQuerySnapshot)) { |
| 3143 | + return false; |
| 3144 | + } |
| 3145 | + // Since the read time is different on every read, we explicitly ignore all |
| 3146 | + // document metadata in this comparison, just like |
| 3147 | + // `DocumentSnapshot.isEqual()` does. |
| 3148 | + if (!this.query.isEqual(other.query)) { |
| 3149 | + return false; |
| 3150 | + } |
| 3151 | + |
| 3152 | + return deepEqual(this._data, other._data); |
| 3153 | + } |
| 3154 | +} |
| 3155 | + |
2835 | 3156 | /** |
2836 | 3157 | * Validates the input string as a field order direction. |
2837 | 3158 | * |
|
0 commit comments