@@ -35,6 +35,7 @@ import {DocumentSnapshot, DocumentSnapshotBuilder} from '../src/document';
3535import { QualifiedResourcePath } from '../src/path' ;
3636import {
3737 ApiOverride ,
38+ collect ,
3839 createInstance ,
3940 document ,
4041 InvalidApiUsage ,
@@ -49,8 +50,10 @@ import {
4950 writeResult ,
5051} from './util/helpers' ;
5152
53+ import { GoogleError } from 'google-gax' ;
5254import api = google . firestore . v1 ;
5355import protobuf = google . protobuf ;
56+ import { Deferred } from '../src/util' ;
5457
5558const PROJECT_ID = 'test-project' ;
5659const DATABASE_ROOT = `projects/${ PROJECT_ID } /databases/(default)` ;
@@ -2507,3 +2510,120 @@ describe('collectionGroup queries', () => {
25072510 } ) ;
25082511 } ) ;
25092512} ) ;
2513+
2514+ describe ( 'query resumption' , ( ) => {
2515+ let firestore : Firestore ;
2516+
2517+ beforeEach ( ( ) => {
2518+ setTimeoutHandler ( setImmediate ) ;
2519+ return createInstance ( ) . then ( firestoreInstance => {
2520+ firestore = firestoreInstance ;
2521+ } ) ;
2522+ } ) ;
2523+
2524+ afterEach ( async ( ) => {
2525+ await verifyInstance ( firestore ) ;
2526+ setTimeoutHandler ( setTimeout ) ;
2527+ } ) ;
2528+
2529+ // Prevent regression of
2530+ // https://github.com/googleapis/nodejs-firestore/issues/1790
2531+ it ( 'results should not be double produced on retryable error with back pressure' , async ( ) => {
2532+ // Generate the IDs of the documents that will match the query.
2533+ const documentIds = Array . from ( new Array ( 500 ) , ( _ , index ) => `doc${ index } ` ) ;
2534+
2535+ // Finds the index in `documentIds` of the document referred to in the
2536+ // "startAt" of the given request.
2537+ function getStartAtDocumentIndex (
2538+ request : api . IRunQueryRequest
2539+ ) : number | null {
2540+ const startAt = request . structuredQuery ?. startAt ;
2541+ const startAtValue = startAt ?. values ?. [ 0 ] ?. referenceValue ;
2542+ const startAtBefore = startAt ?. before ;
2543+ if ( typeof startAtValue !== 'string' ) {
2544+ return null ;
2545+ }
2546+ const docId = startAtValue . split ( '/' ) . pop ( ) ! ;
2547+ const docIdIndex = documentIds . indexOf ( docId ) ;
2548+ if ( docIdIndex < 0 ) {
2549+ return null ;
2550+ }
2551+ return startAtBefore ? docIdIndex : docIdIndex + 1 ;
2552+ }
2553+
2554+ const RETRYABLE_ERROR_DOMAIN = 'RETRYABLE_ERROR_DOMAIN' ;
2555+
2556+ // A mock replacement for Query._isPermanentRpcError which (a) resolves
2557+ // a promise once invoked and (b) treats a specific error "domain" as
2558+ // non-retryable.
2559+ function mockIsPermanentRpcError ( err : GoogleError ) : boolean {
2560+ mockIsPermanentRpcError . invoked . resolve ( true ) ;
2561+ return err ?. domain !== RETRYABLE_ERROR_DOMAIN ;
2562+ }
2563+ mockIsPermanentRpcError . invoked = new Deferred ( ) ;
2564+
2565+ // Return the first half of the documents, followed by a retryable error.
2566+ function * getRequest1Responses ( ) : Generator < api . IRunQueryResponse | Error > {
2567+ const runQueryResponses = documentIds
2568+ . slice ( 0 , documentIds . length / 2 )
2569+ . map ( documentId => result ( documentId ) ) ;
2570+ for ( const runQueryResponse of runQueryResponses ) {
2571+ yield runQueryResponse ;
2572+ }
2573+ const retryableError = new GoogleError ( 'simulated retryable error' ) ;
2574+ retryableError . domain = RETRYABLE_ERROR_DOMAIN ;
2575+ yield retryableError ;
2576+ }
2577+
2578+ // Return the remaining documents.
2579+ function * getRequest2Responses (
2580+ request : api . IRunQueryRequest
2581+ ) : Generator < api . IRunQueryResponse > {
2582+ const startAtDocumentIndex = getStartAtDocumentIndex ( request ) ;
2583+ if ( startAtDocumentIndex === null ) {
2584+ throw new Error ( 'request #2 should specify a valid startAt' ) ;
2585+ }
2586+ const runQueryResponses = documentIds
2587+ . slice ( startAtDocumentIndex )
2588+ . map ( documentId => result ( documentId ) ) ;
2589+ for ( const runQueryResponse of runQueryResponses ) {
2590+ yield runQueryResponse ;
2591+ }
2592+ }
2593+
2594+ // Set up the mocked responses from Watch.
2595+ let requestNum = 0 ;
2596+ const overrides : ApiOverride = {
2597+ runQuery : request => {
2598+ requestNum ++ ;
2599+ switch ( requestNum ) {
2600+ case 1 :
2601+ return stream ( ...getRequest1Responses ( ) ) ;
2602+ case 2 :
2603+ return stream ( ...getRequest2Responses ( request ! ) ) ;
2604+ default :
2605+ throw new Error ( `should never get here (requestNum=${ requestNum } )` ) ;
2606+ }
2607+ } ,
2608+ } ;
2609+
2610+ // Create an async iterator to get the result set but DO NOT iterate over
2611+ // it immediately. Instead, allow the responses to pile up and fill the
2612+ // buffers. Once isPermanentError() is invoked, indicating that the first
2613+ // request has failed and is about to be retried, collect the results from
2614+ // the async iterator into an array.
2615+ firestore = await createInstance ( overrides ) ;
2616+ const query = firestore . collection ( 'collectionId' ) ;
2617+ query . _isPermanentRpcError = mockIsPermanentRpcError ;
2618+ const iterator = query
2619+ . stream ( )
2620+ [ Symbol . asyncIterator ] ( ) as AsyncIterator < QueryDocumentSnapshot > ;
2621+ await mockIsPermanentRpcError . invoked . promise ;
2622+ const snapshots = await collect ( iterator ) ;
2623+
2624+ // Verify that the async iterator returned the correct documents and,
2625+ // especially, does not have duplicate results.
2626+ const actualDocumentIds = snapshots . map ( snapshot => snapshot . id ) ;
2627+ expect ( actualDocumentIds ) . to . eql ( documentIds ) ;
2628+ } ) ;
2629+ } ) ;
0 commit comments