Skip to content
This repository was archived by the owner on Mar 4, 2026. It is now read-only.

Commit 2f08612

Browse files
authored
fix: Optimize Transaction PITR (#2002)
* Optimize Transaction PITR * Comments and types * Enforce read-only cannot write * Pretty
1 parent 8799032 commit 2f08612

6 files changed

Lines changed: 219 additions & 140 deletions

File tree

dev/src/document-reader.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {isPermanentRpcError} from './util';
2121
import {google} from '../protos/firestore_v1_proto_api';
2222
import {logger} from './logger';
2323
import {Firestore} from './index';
24+
import {Timestamp} from './timestamp';
2425
import {DocumentData} from '@google-cloud/firestore';
2526
import api = google.firestore.v1;
2627

@@ -36,6 +37,8 @@ export class DocumentReader<AppModelType, DbModelType extends DocumentData> {
3637
fieldMask?: FieldPath[];
3738
/** An optional transaction ID to use for this read. */
3839
transactionId?: Uint8Array;
40+
/** An optional readTime to use for this read. */
41+
readTime?: Timestamp;
3942

4043
private outstandingDocuments = new Set<string>();
4144
private retrievedDocuments = new Map<string, DocumentSnapshot>();
@@ -99,9 +102,13 @@ export class DocumentReader<AppModelType, DbModelType extends DocumentData> {
99102

100103
const request: api.IBatchGetDocumentsRequest = {
101104
database: this.firestore.formattedName,
102-
transaction: this.transactionId,
103105
documents: Array.from(this.outstandingDocuments),
104106
};
107+
if (this.transactionId) {
108+
request.transaction = this.transactionId;
109+
} else if (this.readTime) {
110+
request.readTime = this.readTime.toProto().timestampValue;
111+
}
105112

106113
if (this.fieldMask) {
107114
const fieldPaths = this.fieldMask.map(

dev/src/index.ts

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1189,10 +1189,6 @@ export class Firestore implements firestore.Firestore {
11891189

11901190
const tag = requestTag();
11911191

1192-
let maxAttempts = DEFAULT_MAX_TRANSACTION_ATTEMPTS;
1193-
let readOnly = false;
1194-
let readTime: Timestamp | undefined;
1195-
11961192
if (transactionOptions) {
11971193
validateObject('transactionOptions', transactionOptions);
11981194
validateBoolean(
@@ -1207,29 +1203,18 @@ export class Firestore implements firestore.Firestore {
12071203
transactionOptions.readTime,
12081204
{optional: true}
12091205
);
1210-
1211-
readOnly = true;
1212-
readTime = transactionOptions.readTime as Timestamp | undefined;
1213-
maxAttempts = 1;
12141206
} else {
12151207
validateInteger(
12161208
'transactionOptions.maxAttempts',
12171209
transactionOptions.maxAttempts,
12181210
{optional: true, minValue: 1}
12191211
);
1220-
1221-
maxAttempts =
1222-
transactionOptions.maxAttempts || DEFAULT_MAX_TRANSACTION_ATTEMPTS;
12231212
}
12241213
}
12251214

1226-
const transaction = new Transaction(this, tag);
1215+
const transaction = new Transaction(this, tag, transactionOptions);
12271216
return this.initializeIfNeeded(tag).then(() =>
1228-
transaction.runTransaction(updateFunction, {
1229-
maxAttempts,
1230-
readOnly,
1231-
readTime,
1232-
})
1217+
transaction.runTransaction(updateFunction)
12331218
);
12341219
}
12351220

dev/src/reference.ts

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2343,10 +2343,11 @@ export class Query<
23432343
*
23442344
* @private
23452345
* @internal
2346-
* @param {bytes=} transactionId A transaction ID.
2346+
* @param transactionIdOrReadTime A transaction ID or the read time at which
2347+
* to execute the query.
23472348
*/
23482349
_get(
2349-
transactionId?: Uint8Array
2350+
transactionIdOrReadTime?: Uint8Array | Timestamp
23502351
): Promise<QuerySnapshot<AppModelType, DbModelType>> {
23512352
const docs: Array<QueryDocumentSnapshot<AppModelType, DbModelType>> = [];
23522353

@@ -2356,7 +2357,7 @@ export class Query<
23562357
return new Promise((resolve, reject) => {
23572358
let readTime: Timestamp;
23582359

2359-
this._stream(transactionId)
2360+
this._stream(transactionIdOrReadTime)
23602361
.on('error', err => {
23612362
reject(wrapError(err, stack));
23622363
})
@@ -2616,12 +2617,15 @@ export class Query<
26162617
/**
26172618
* Internal streaming method that accepts an optional transaction ID.
26182619
*
2619-
* @param transactionId A transaction ID.
2620+
* @param transactionIdOrReadTime A transaction ID or the read time at which
2621+
* to execute the query.
26202622
* @private
26212623
* @internal
26222624
* @returns A stream of document results.
26232625
*/
2624-
_stream(transactionId?: Uint8Array): NodeJS.ReadableStream {
2626+
_stream(
2627+
transactionIdOrReadTime?: Uint8Array | Timestamp
2628+
): NodeJS.ReadableStream {
26252629
const tag = requestTag();
26262630
const startTime = Date.now();
26272631

@@ -2678,7 +2682,7 @@ export class Query<
26782682
// `toProto()` might throw an exception. We rely on the behavior of an
26792683
// async function to convert this exception into the rejected Promise we
26802684
// catch below.
2681-
let request = this.toProto(transactionId);
2685+
let request = this.toProto(transactionIdOrReadTime);
26822686

26832687
let streamActive: Deferred<boolean>;
26842688
do {
@@ -2695,7 +2699,10 @@ export class Query<
26952699

26962700
// If a non-transactional query failed, attempt to restart.
26972701
// Transactional queries are retried via the transaction runner.
2698-
if (!transactionId && !this._isPermanentRpcError(err, 'runQuery')) {
2702+
if (
2703+
!transactionIdOrReadTime &&
2704+
!this._isPermanentRpcError(err, 'runQuery')
2705+
) {
26992706
logger(
27002707
'Query._stream',
27012708
tag,
@@ -3338,15 +3345,15 @@ export class AggregateQuery<
33383345
* @param {bytes=} transactionId A transaction ID.
33393346
*/
33403347
_get(
3341-
transactionId?: Uint8Array
3348+
transactionIdOrReadTime?: Uint8Array | Timestamp
33423349
): Promise<
33433350
AggregateQuerySnapshot<AggregateSpecType, AppModelType, DbModelType>
33443351
> {
33453352
// Capture the error stack to preserve stack tracing across async calls.
33463353
const stack = Error().stack!;
33473354

33483355
return new Promise((resolve, reject) => {
3349-
const stream = this._stream(transactionId);
3356+
const stream = this._stream(transactionIdOrReadTime);
33503357
stream.on('error', err => {
33513358
reject(wrapError(err, stack));
33523359
});
@@ -3368,7 +3375,7 @@ export class AggregateQuery<
33683375
* @param transactionId A transaction ID.
33693376
* @returns A stream of document results.
33703377
*/
3371-
_stream(transactionId?: Uint8Array): Readable {
3378+
_stream(transactionIdOrReadTime?: Uint8Array | Timestamp): Readable {
33723379
const tag = requestTag();
33733380
const firestore = this._query.firestore;
33743381

@@ -3391,7 +3398,7 @@ export class AggregateQuery<
33913398
// `toProto()` might throw an exception. We rely on the behavior of an
33923399
// async function to convert this exception into the rejected Promise we
33933400
// catch below.
3394-
const request = this.toProto(transactionId);
3401+
const request = this.toProto(transactionIdOrReadTime);
33953402

33963403
const backendStream = await firestore.requestStream(
33973404
'runAggregationQuery',
@@ -3463,7 +3470,9 @@ export class AggregateQuery<
34633470
* @internal
34643471
* @returns Serialized JSON for the query.
34653472
*/
3466-
toProto(transactionId?: Uint8Array): api.IRunAggregationQueryRequest {
3473+
toProto(
3474+
transactionIdOrReadTime?: Uint8Array | Timestamp
3475+
): api.IRunAggregationQueryRequest {
34673476
const queryProto = this._query.toProto();
34683477
const runQueryRequest: api.IRunAggregationQueryRequest = {
34693478
parent: queryProto.parent,
@@ -3484,8 +3493,10 @@ export class AggregateQuery<
34843493
},
34853494
};
34863495

3487-
if (transactionId instanceof Uint8Array) {
3488-
runQueryRequest.transaction = transactionId;
3496+
if (transactionIdOrReadTime instanceof Uint8Array) {
3497+
runQueryRequest.transaction = transactionIdOrReadTime;
3498+
} else if (transactionIdOrReadTime instanceof Timestamp) {
3499+
runQueryRequest.readTime = transactionIdOrReadTime;
34893500
}
34903501

34913502
return runQueryRequest;

0 commit comments

Comments
 (0)