@@ -44,6 +44,7 @@ import {defaultConverter} from './types';
4444import {
4545 autoId ,
4646 Deferred ,
47+ getTotalTimeout ,
4748 isPermanentRpcError ,
4849 mapToArray ,
4950 requestTag ,
@@ -2569,6 +2570,15 @@ export class Query<
25692570 return isPermanentRpcError ( err , methodName ) ;
25702571 }
25712572
2573+ _hasRetryTimedOut ( methodName : string , startTime : number ) : boolean {
2574+ const totalTimeout = getTotalTimeout ( methodName ) ;
2575+ if ( totalTimeout === 0 ) {
2576+ return false ;
2577+ }
2578+
2579+ return Date . now ( ) - startTime >= totalTimeout ;
2580+ }
2581+
25722582 /**
25732583 * Internal streaming method that accepts an optional transaction ID.
25742584 *
@@ -2579,6 +2589,7 @@ export class Query<
25792589 */
25802590 _stream ( transactionId ?: Uint8Array ) : NodeJS . ReadableStream {
25812591 const tag = requestTag ( ) ;
2592+ const startTime = Date . now ( ) ;
25822593
25832594 let lastReceivedDocument : QueryDocumentSnapshot <
25842595 AppModelType ,
@@ -2638,8 +2649,9 @@ export class Query<
26382649 let streamActive : Deferred < boolean > ;
26392650 do {
26402651 streamActive = new Deferred < boolean > ( ) ;
2652+ const methodName = 'runQuery' ;
26412653 backendStream = await this . _firestore . requestStream (
2642- 'runQuery' ,
2654+ methodName ,
26432655 /* bidirectional= */ false ,
26442656 request ,
26452657 tag
@@ -2656,12 +2668,28 @@ export class Query<
26562668 'Query failed with retryable stream error:' ,
26572669 err
26582670 ) ;
2659- // Enqueue a "no-op" write into the stream and resume the query
2660- // once it is processed. This allows any enqueued results to be
2661- // consumed before resuming the query so that the query resumption
2662- // can start at the correct document.
2671+
2672+ // Enqueue a "no-op" write into the stream and wait for it to be
2673+ // read by the downstream consumer. This ensures that all enqueued
2674+ // results in the stream are consumed, which will give us an accurate
2675+ // value for `lastReceivedDocument`.
26632676 stream . write ( NOOP_MESSAGE , ( ) => {
2664- if ( lastReceivedDocument ) {
2677+ if ( this . _hasRetryTimedOut ( methodName , startTime ) ) {
2678+ logger (
2679+ 'Query._stream' ,
2680+ tag ,
2681+ 'Query failed with retryable stream error but the total retry timeout has exceeded.'
2682+ ) ;
2683+ stream . destroy ( err ) ;
2684+ streamActive . resolve ( /* active= */ false ) ;
2685+ } else if ( lastReceivedDocument ) {
2686+ logger (
2687+ 'Query._stream' ,
2688+ tag ,
2689+ 'Query failed with retryable stream error and progress was made receiving ' +
2690+ 'documents, so the stream is being retried.'
2691+ ) ;
2692+
26652693 // Restart the query but use the last document we received as
26662694 // the query cursor. Note that we do not use backoff here. The
26672695 // call to `requestStream()` will backoff should the restart
@@ -2673,8 +2701,21 @@ export class Query<
26732701 } else {
26742702 request = this . startAfter ( lastReceivedDocument ) . toProto ( ) ;
26752703 }
2704+
2705+ // Set lastReceivedDocument to null before each retry attempt to ensure the retry makes progress
2706+ lastReceivedDocument = null ;
2707+
2708+ streamActive . resolve ( /* active= */ true ) ;
2709+ } else {
2710+ logger (
2711+ 'Query._stream' ,
2712+ tag ,
2713+ 'Query failed with retryable stream error however no progress was made receiving ' +
2714+ 'documents, so the stream is being closed.'
2715+ ) ;
2716+ stream . destroy ( err ) ;
2717+ streamActive . resolve ( /* active= */ false ) ;
26762718 }
2677- streamActive . resolve ( /* active= */ true ) ;
26782719 } ) ;
26792720 } else {
26802721 logger (
@@ -3320,48 +3361,33 @@ export class AggregateQuery<
33203361 // catch below.
33213362 const request = this . toProto ( transactionId ) ;
33223363
3323- let streamActive : Deferred < boolean > ;
3324- do {
3325- streamActive = new Deferred < boolean > ( ) ;
3326- const backendStream = await firestore . requestStream (
3327- 'runAggregationQuery' ,
3328- /* bidirectional= */ false ,
3329- request ,
3330- tag
3331- ) ;
3332- stream . on ( 'close' , ( ) => {
3333- backendStream . resume ( ) ;
3334- backendStream . end ( ) ;
3335- } ) ;
3336- backendStream . on ( 'error' , err => {
3337- backendStream . unpipe ( stream ) ;
3338- // If a non-transactional query failed, attempt to restart.
3339- // Transactional queries are retried via the transaction runner.
3340- if (
3341- ! transactionId &&
3342- ! isPermanentRpcError ( err , 'runAggregationQuery' )
3343- ) {
3344- logger (
3345- 'AggregateQuery._stream' ,
3346- tag ,
3347- 'AggregateQuery failed with retryable stream error:' ,
3348- err
3349- ) ;
3350- streamActive . resolve ( /* active= */ true ) ;
3351- } else {
3352- logger (
3353- 'AggregateQuery._stream' ,
3354- tag ,
3355- 'AggregateQuery failed with stream error:' ,
3356- err
3357- ) ;
3358- stream . destroy ( err ) ;
3359- streamActive . resolve ( /* active= */ false ) ;
3360- }
3361- } ) ;
3364+ const backendStream = await firestore . requestStream (
3365+ 'runAggregationQuery' ,
3366+ /* bidirectional= */ false ,
3367+ request ,
3368+ tag
3369+ ) ;
3370+ stream . on ( 'close' , ( ) => {
33623371 backendStream . resume ( ) ;
3363- backendStream . pipe ( stream ) ;
3364- } while ( await streamActive . promise ) ;
3372+ backendStream . end ( ) ;
3373+ } ) ;
3374+ backendStream . on ( 'error' , err => {
3375+ // TODO(group-by) When group-by queries are supported for aggregates
3376+ // consider implementing retries if the stream is making progress
3377+ // receiving results for groups. See the use of lastReceivedDocument
3378+ // in the retry strategy for runQuery.
3379+
3380+ backendStream . unpipe ( stream ) ;
3381+ logger (
3382+ 'AggregateQuery._stream' ,
3383+ tag ,
3384+ 'AggregateQuery failed with stream error:' ,
3385+ err
3386+ ) ;
3387+ stream . destroy ( err ) ;
3388+ } ) ;
3389+ backendStream . resume ( ) ;
3390+ backendStream . pipe ( stream ) ;
33653391 } )
33663392 . catch ( e => stream . destroy ( e ) ) ;
33673393
0 commit comments