Skip to content
This repository was archived by the owner on Mar 4, 2026. It is now read-only.

Commit d85b0e9

Browse files
feat: Use REST (#1698)
To use REST transport when possible, pass `{preferRest: true}` to the constructor: ```ts const db = new firestore.Firestore({preferRest: true}); ```
1 parent 6ba6751 commit d85b0e9

16 files changed

Lines changed: 314 additions & 134 deletions

dev/src/bulk-writer.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import * as firestore from '@google-cloud/firestore';
1717

1818
import * as assert from 'assert';
19-
import {GoogleError} from 'google-gax';
19+
import type {GoogleError} from 'google-gax';
2020

2121
import {google} from '../protos/firestore_v1_proto_api';
2222
import {FieldPath, Firestore} from '.';
@@ -285,9 +285,10 @@ class BulkCommitBatch extends WriteBatch {
285285
);
286286
this.pendingOps[i].onSuccess(new WriteResult(updateTime));
287287
} else {
288-
const error = new (require('google-gax').GoogleError)(
289-
status.message || undefined
290-
);
288+
const error =
289+
new (require('google-gax/build/src/fallback').GoogleError)(
290+
status.message || undefined
291+
);
291292
error.code = status.code as number;
292293
this.pendingOps[i].onError(wrapError(error, stack));
293294
}

dev/src/collection-group.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ export class CollectionGroup<T = firestore.DocumentData>
9696

9797
const stream = await this.firestore.requestStream(
9898
'partitionQueryStream',
99+
/* bidirectional= */ false,
99100
request,
100101
tag
101102
);

dev/src/document-reader.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ export class DocumentReader<T> {
112112
try {
113113
const stream = await this.firestore.requestStream(
114114
'batchGetDocuments',
115+
/* bidirectional= */ false,
115116
request,
116117
requestTag
117118
);

dev/src/index.ts

Lines changed: 87 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
import * as firestore from '@google-cloud/firestore';
1818

19-
import {CallOptions} from 'google-gax';
19+
import type {CallOptions} from 'google-gax';
20+
import type * as googleGax from 'google-gax';
21+
import type * as googleGaxFallback from 'google-gax/build/src/fallback';
2022
import {Duplex, PassThrough, Transform} from 'stream';
2123

2224
import {URL} from 'url';
@@ -393,6 +395,16 @@ export class Firestore implements firestore.Firestore {
393395
*/
394396
private _clientPool: ClientPool<GapicClient>;
395397

398+
/**
399+
* Preloaded instance of google-gax (full module, with gRPC support).
400+
*/
401+
private _gax?: typeof googleGax;
402+
403+
/**
404+
* Preloaded instance of google-gax HTTP fallback implementation (no gRPC).
405+
*/
406+
private _gaxFallback?: typeof googleGaxFallback;
407+
396408
/**
397409
* The configuration options for the GAPIC client.
398410
* @private
@@ -534,19 +546,48 @@ export class Firestore implements firestore.Firestore {
534546
this._clientPool = new ClientPool<GapicClient>(
535547
MAX_CONCURRENT_REQUESTS_PER_CLIENT,
536548
maxIdleChannels,
537-
/* clientFactory= */ () => {
549+
/* clientFactory= */ (requiresGrpc: boolean) => {
538550
let client: GapicClient;
539551

552+
// Use the rest fallback if enabled and if the method does not require GRPC
553+
const useFallback =
554+
!this._settings.preferRest || requiresGrpc ? false : 'rest';
555+
556+
let gax: typeof googleGax | typeof googleGaxFallback;
557+
if (useFallback) {
558+
if (!this._gaxFallback) {
559+
gax = this._gaxFallback = require('google-gax/build/src/fallback');
560+
} else {
561+
gax = this._gaxFallback;
562+
}
563+
} else {
564+
if (!this._gax) {
565+
gax = this._gax = require('google-gax');
566+
} else {
567+
gax = this._gax;
568+
}
569+
}
570+
540571
if (this._settings.ssl === false) {
541572
const grpcModule = this._settings.grpc ?? require('google-gax').grpc;
542573
const sslCreds = grpcModule.credentials.createInsecure();
543574

544-
client = new module.exports.v1({
545-
sslCreds,
546-
...this._settings,
547-
});
575+
client = new module.exports.v1(
576+
{
577+
sslCreds,
578+
...this._settings,
579+
fallback: useFallback,
580+
},
581+
gax
582+
);
548583
} else {
549-
client = new module.exports.v1(this._settings);
584+
client = new module.exports.v1(
585+
{
586+
...this._settings,
587+
fallback: useFallback,
588+
},
589+
gax
590+
);
550591
}
551592

552593
logger('Firestore', null, 'Initialized Firestore GAPIC Client');
@@ -1379,8 +1420,10 @@ export class Firestore implements firestore.Firestore {
13791420

13801421
if (this._projectId === undefined) {
13811422
try {
1382-
this._projectId = await this._clientPool.run(requestTag, gapicClient =>
1383-
gapicClient.getProjectId()
1423+
this._projectId = await this._clientPool.run(
1424+
requestTag,
1425+
/* requiresGrpc= */ false,
1426+
gapicClient => gapicClient.getProjectId()
13841427
);
13851428
logger(
13861429
'Firestore.initializeIfNeeded',
@@ -1421,10 +1464,11 @@ export class Firestore implements firestore.Firestore {
14211464

14221465
if (retryCodes) {
14231466
const retryParams = getRetryParams(methodName);
1424-
callOptions.retry = new (require('google-gax').RetryOptions)(
1425-
retryCodes,
1426-
retryParams
1427-
);
1467+
callOptions.retry =
1468+
new (require('google-gax/build/src/fallback').RetryOptions)(
1469+
retryCodes,
1470+
retryParams
1471+
);
14281472
}
14291473

14301474
return callOptions;
@@ -1627,24 +1671,33 @@ export class Firestore implements firestore.Firestore {
16271671
): Promise<Resp> {
16281672
const callOptions = this.createCallOptions(methodName, retryCodes);
16291673

1630-
return this._clientPool.run(requestTag, async gapicClient => {
1631-
try {
1632-
logger('Firestore.request', requestTag, 'Sending request: %j', request);
1633-
const [result] = await (
1634-
gapicClient[methodName] as UnaryMethod<Req, Resp>
1635-
)(request, callOptions);
1636-
logger(
1637-
'Firestore.request',
1638-
requestTag,
1639-
'Received response: %j',
1640-
result
1641-
);
1642-
return result;
1643-
} catch (err) {
1644-
logger('Firestore.request', requestTag, 'Received error:', err);
1645-
return Promise.reject(err);
1674+
return this._clientPool.run(
1675+
requestTag,
1676+
/* requiresGrpc= */ false,
1677+
async gapicClient => {
1678+
try {
1679+
logger(
1680+
'Firestore.request',
1681+
requestTag,
1682+
'Sending request: %j',
1683+
request
1684+
);
1685+
const [result] = await (
1686+
gapicClient[methodName] as UnaryMethod<Req, Resp>
1687+
)(request, callOptions);
1688+
logger(
1689+
'Firestore.request',
1690+
requestTag,
1691+
'Received response: %j',
1692+
result
1693+
);
1694+
return result;
1695+
} catch (err) {
1696+
logger('Firestore.request', requestTag, 'Received error:', err);
1697+
return Promise.reject(err);
1698+
}
16461699
}
1647-
});
1700+
);
16481701
}
16491702

16501703
/**
@@ -1658,12 +1711,15 @@ export class Firestore implements firestore.Firestore {
16581711
* @internal
16591712
* @param methodName Name of the streaming Veneer API endpoint that
16601713
* takes a request and GAX options.
1714+
* @param bidrectional Whether the request is bidirectional (true) or
1715+
* unidirectional (false_
16611716
* @param request The Protobuf request to send.
16621717
* @param requestTag A unique client-assigned identifier for this request.
16631718
* @returns A Promise with the resulting read-only stream.
16641719
*/
16651720
requestStream(
16661721
methodName: FirestoreStreamingMethod,
1722+
bidrectional: boolean,
16671723
request: {},
16681724
requestTag: string
16691725
): Promise<Duplex> {
@@ -1674,7 +1730,7 @@ export class Firestore implements firestore.Firestore {
16741730
return this._retry(methodName, requestTag, () => {
16751731
const result = new Deferred<Duplex>();
16761732

1677-
this._clientPool.run(requestTag, async gapicClient => {
1733+
this._clientPool.run(requestTag, bidrectional, async gapicClient => {
16781734
logger(
16791735
'Firestore.requestStream',
16801736
requestTag,

dev/src/pool.ts

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,15 @@ export const CLIENT_TERMINATED_ERROR_MSG =
3535
* @internal
3636
*/
3737
export class ClientPool<T> {
38+
private grpcEnabled = false;
39+
3840
/**
3941
* Stores each active clients and how many operations it has outstanding.
4042
*/
41-
private activeClients = new Map<T, number>();
43+
private activeClients = new Map<
44+
T,
45+
{activeRequestCount: number; grpcEnabled: boolean}
46+
>();
4247

4348
/**
4449
* A set of clients that have seen RST_STREAM errors (see
@@ -72,7 +77,7 @@ export class ClientPool<T> {
7277
constructor(
7378
private readonly concurrentOperationLimit: number,
7479
private readonly maxIdleClients: number,
75-
private readonly clientFactory: () => T,
80+
private readonly clientFactory: (requiresGrpc: boolean) => T,
7681
private readonly clientDestructor: (client: T) => Promise<void> = () =>
7782
Promise.resolve()
7883
) {}
@@ -84,21 +89,22 @@ export class ClientPool<T> {
8489
* @private
8590
* @internal
8691
*/
87-
private acquire(requestTag: string): T {
92+
private acquire(requestTag: string, requiresGrpc: boolean): T {
8893
let selectedClient: T | null = null;
8994
let selectedClientRequestCount = -1;
9095

91-
for (const [client, requestCount] of this.activeClients) {
96+
for (const [client, metadata] of this.activeClients) {
9297
// Use the "most-full" client that can still accommodate the request
9398
// in order to maximize the number of idle clients as operations start to
9499
// complete.
95100
if (
96101
!this.failedClients.has(client) &&
97-
requestCount > selectedClientRequestCount &&
98-
requestCount < this.concurrentOperationLimit
102+
metadata.activeRequestCount > selectedClientRequestCount &&
103+
metadata.activeRequestCount < this.concurrentOperationLimit &&
104+
(!requiresGrpc || metadata.grpcEnabled)
99105
) {
100106
selectedClient = client;
101-
selectedClientRequestCount = requestCount;
107+
selectedClientRequestCount = metadata.activeRequestCount;
102108
}
103109
}
104110

@@ -111,15 +117,18 @@ export class ClientPool<T> {
111117
);
112118
} else {
113119
logger('ClientPool.acquire', requestTag, 'Creating a new client');
114-
selectedClient = this.clientFactory();
120+
selectedClient = this.clientFactory(requiresGrpc);
115121
selectedClientRequestCount = 0;
116122
assert(
117123
!this.activeClients.has(selectedClient),
118124
'The provided client factory returned an existing instance'
119125
);
120126
}
121127

122-
this.activeClients.set(selectedClient, selectedClientRequestCount + 1);
128+
this.activeClients.set(selectedClient, {
129+
grpcEnabled: requiresGrpc,
130+
activeRequestCount: selectedClientRequestCount + 1,
131+
});
123132

124133
return selectedClient!;
125134
}
@@ -131,9 +140,12 @@ export class ClientPool<T> {
131140
* @internal
132141
*/
133142
private async release(requestTag: string, client: T): Promise<void> {
134-
const requestCount = this.activeClients.get(client) || 0;
135-
assert(requestCount > 0, 'No active requests');
136-
this.activeClients.set(client, requestCount - 1);
143+
const metadata = this.activeClients.get(client);
144+
assert(metadata && metadata.activeRequestCount > 0, 'No active requests');
145+
this.activeClients.set(client, {
146+
grpcEnabled: metadata.grpcEnabled,
147+
activeRequestCount: metadata.activeRequestCount - 1,
148+
});
137149
if (this.terminated && this.opCount === 0) {
138150
this.terminateDeferred.resolve();
139151
}
@@ -153,22 +165,30 @@ export class ClientPool<T> {
153165
* @internal
154166
*/
155167
private shouldGarbageCollectClient(client: T): boolean {
156-
// Don't garbage collect clients that have active requests.
157-
if (this.activeClients.get(client) !== 0) {
168+
const clientMetadata = this.activeClients.get(client)!;
169+
170+
if (clientMetadata.activeRequestCount !== 0) {
171+
// Don't garbage collect clients that have active requests.
158172
return false;
159173
}
160174

175+
if (this.grpcEnabled !== clientMetadata.grpcEnabled) {
176+
// We are transitioning to GRPC. Garbage collect REST clients.
177+
return true;
178+
}
179+
161180
// Idle clients that have received RST_STREAM errors are always garbage
162181
// collected.
163182
if (this.failedClients.has(client)) {
164183
return true;
165184
}
166185

167186
// Otherwise, only garbage collect if we have too much idle capacity (e.g.
168-
// more than 100 idle capacity with default settings) .
187+
// more than 100 idle capacity with default settings).
169188
let idleCapacityCount = 0;
170-
for (const [, count] of this.activeClients) {
171-
idleCapacityCount += this.concurrentOperationLimit - count;
189+
for (const [, metadata] of this.activeClients) {
190+
idleCapacityCount +=
191+
this.concurrentOperationLimit - metadata.activeRequestCount;
172192
}
173193
return (
174194
idleCapacityCount > this.maxIdleClients * this.concurrentOperationLimit
@@ -197,7 +217,9 @@ export class ClientPool<T> {
197217
// Visible for testing.
198218
get opCount(): number {
199219
let activeOperationCount = 0;
200-
this.activeClients.forEach(count => (activeOperationCount += count));
220+
this.activeClients.forEach(
221+
metadata => (activeOperationCount += metadata.activeRequestCount)
222+
);
201223
return activeOperationCount;
202224
}
203225

@@ -213,11 +235,15 @@ export class ClientPool<T> {
213235
* @private
214236
* @internal
215237
*/
216-
run<V>(requestTag: string, op: (client: T) => Promise<V>): Promise<V> {
238+
run<V>(
239+
requestTag: string,
240+
requiresGrpc: boolean,
241+
op: (client: T) => Promise<V>
242+
): Promise<V> {
217243
if (this.terminated) {
218244
return Promise.reject(new Error(CLIENT_TERMINATED_ERROR_MSG));
219245
}
220-
const client = this.acquire(requestTag);
246+
const client = this.acquire(requestTag, requiresGrpc);
221247

222248
return op(client)
223249
.catch(async (err: GoogleError) => {

0 commit comments

Comments
 (0)