1515import { promisifyAll } from '@google-cloud/promisify' ;
1616import arrify = require( 'arrify' ) ;
1717import { ServiceError } from 'google-gax' ;
18- import { decorateStatus } from './decorateStatus ' ;
18+ import { BackoffSettings } from 'google-gax/build/src/gax ' ;
1919import { PassThrough , Transform } from 'stream' ;
2020
2121// eslint-disable-next-line @typescript-eslint/no-var-requires
@@ -46,9 +46,16 @@ import {Duplex} from 'stream';
4646// See protos/google/rpc/code.proto
4747// (4=DEADLINE_EXCEEDED, 10=ABORTED, 14=UNAVAILABLE)
4848const RETRYABLE_STATUS_CODES = new Set ( [ 4 , 10 , 14 ] ) ;
49+ const IDEMPOTENT_RETRYABLE_STATUS_CODES = new Set ( [ 4 , 14 ] ) ;
4950// (1=CANCELLED)
5051const IGNORED_STATUS_CODES = new Set ( [ 1 ] ) ;
5152
53+ const DEFAULT_BACKOFF_SETTINGS : BackoffSettings = {
54+ initialRetryDelayMillis : 10 ,
55+ retryDelayMultiplier : 2 ,
56+ maxRetryDelayMillis : 60000 ,
57+ } ;
58+
5259/**
5360 * @typedef {object } Policy
5461 * @property {number } [version] Specifies the format of the policy.
@@ -735,7 +742,8 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
735742 const rowsLimit = options . limit || 0 ;
736743 const hasLimit = rowsLimit !== 0 ;
737744 let rowsRead = 0 ;
738- let numRequestsMade = 0 ;
745+ let numConsecutiveErrors = 0 ;
746+ let retryTimer : NodeJS . Timeout | null ;
739747
740748 rowKeys = options . keys || [ ] ;
741749
@@ -788,13 +796,20 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
788796 if ( activeRequestStream ) {
789797 activeRequestStream . abort ( ) ;
790798 }
799+ if ( retryTimer ) {
800+ clearTimeout ( retryTimer ) ;
801+ }
791802 return end ( ) ;
792803 } ;
793804
794805 let chunkTransformer : ChunkTransformer ;
795806 let rowStream : Duplex ;
796807
797808 const makeNewRequest = ( ) => {
809+ // Avoid cancelling an expired timer if user
810+ // cancelled the stream in the middle of a retry
811+ retryTimer = null ;
812+
798813 const lastRowKey = chunkTransformer ? chunkTransformer . lastRowKey : '' ;
799814 // eslint-disable-next-line @typescript-eslint/no-explicit-any
800815 chunkTransformer = new ChunkTransformer ( { decode : options . decode } as any ) ;
@@ -805,7 +820,13 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
805820 } as google . bigtable . v2 . IReadRowsRequest ;
806821
807822 const retryOpts = {
808- currentRetryAttempt : numRequestsMade ,
823+ currentRetryAttempt : numConsecutiveErrors ,
824+ // Handling retries in this client. Specify the retry options to
825+ // make sure nothing is retried in retry-request.
826+ noResponseRetries : 0 ,
827+ shouldRetryFn : ( _ : any ) => {
828+ return false ;
829+ } ,
809830 } ;
810831
811832 if ( lastRowKey ) {
@@ -915,7 +936,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
915936 ) {
916937 return next ( ) ;
917938 }
918- numRequestsMade = 0 ;
919939 rowsRead ++ ;
920940 const row = this . row ( rowData . key ) ;
921941 row . data = rowData . data ;
@@ -936,20 +956,32 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
936956 userStream . end ( ) ;
937957 return ;
938958 }
959+ numConsecutiveErrors ++ ;
939960 if (
940- numRequestsMade <= maxRetries &&
961+ numConsecutiveErrors <= maxRetries &&
941962 RETRYABLE_STATUS_CODES . has ( error . code )
942963 ) {
943- makeNewRequest ( ) ;
964+ const backOffSettings =
965+ options . gaxOptions ?. retry ?. backoffSettings ||
966+ DEFAULT_BACKOFF_SETTINGS ;
967+ const nextRetryDelay = getNextDelay (
968+ numConsecutiveErrors ,
969+ backOffSettings
970+ ) ;
971+ retryTimer = setTimeout ( makeNewRequest , nextRetryDelay ) ;
944972 } else {
945973 userStream . emit ( 'error' , error ) ;
946974 }
947975 } )
976+ . on ( 'data' , _ => {
977+ // Reset error count after a successful read so the backoff
978+ // time won't keep increasing when as stream had multiple errors
979+ numConsecutiveErrors = 0 ;
980+ } )
948981 . on ( 'end' , ( ) => {
949982 activeRequestStream = null ;
950983 } ) ;
951984 rowStream . pipe ( userStream ) ;
952- numRequestsMade ++ ;
953985 } ;
954986
955987 makeNewRequest ( ) ;
@@ -1504,23 +1536,43 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
15041536 ) ;
15051537 const mutationErrorsByEntryIndex = new Map ( ) ;
15061538
1507- const onBatchResponse = (
1508- err : ServiceError | PartialFailureError | null
1509- ) => {
1510- // TODO: enable retries when the entire RPC fails
1511- if ( err ) {
1512- // The error happened before a request was even made, don't retry.
1539+ const isRetryable = ( err : ServiceError | null ) => {
1540+ // Don't retry if there are no more entries or retry attempts
1541+ if ( pendingEntryIndices . size === 0 || numRequestsMade >= maxRetries + 1 ) {
1542+ return false ;
1543+ }
1544+ // If the error is empty but there are still outstanding mutations,
1545+ // it means that there are retryable errors in the mutate response
1546+ // even when the RPC succeeded
1547+ return ! err || IDEMPOTENT_RETRYABLE_STATUS_CODES . has ( err . code ) ;
1548+ } ;
1549+
1550+ const onBatchResponse = ( err : ServiceError | null ) => {
1551+ // Return if the error happened before a request was made
1552+ if ( numRequestsMade === 0 ) {
15131553 callback ( err ) ;
15141554 return ;
15151555 }
1516- if ( pendingEntryIndices . size !== 0 && numRequestsMade <= maxRetries ) {
1517- makeNextBatchRequest ( ) ;
1556+
1557+ if ( isRetryable ( err ) ) {
1558+ const backOffSettings =
1559+ options . gaxOptions ?. retry ?. backoffSettings ||
1560+ DEFAULT_BACKOFF_SETTINGS ;
1561+ const nextDelay = getNextDelay ( numRequestsMade , backOffSettings ) ;
1562+ setTimeout ( makeNextBatchRequest , nextDelay ) ;
15181563 return ;
15191564 }
15201565
1566+ // If there's no more pending mutations, set the error
1567+ // to null
1568+ if ( pendingEntryIndices . size === 0 ) {
1569+ err = null ;
1570+ }
1571+
15211572 if ( mutationErrorsByEntryIndex . size !== 0 ) {
15221573 const mutationErrors = Array . from ( mutationErrorsByEntryIndex . values ( ) ) ;
1523- err = new PartialFailureError ( mutationErrors ) ;
1574+ callback ( new PartialFailureError ( mutationErrors , err ) ) ;
1575+ return ;
15241576 }
15251577
15261578 callback ( err ) ;
@@ -1541,6 +1593,12 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
15411593
15421594 const retryOpts = {
15431595 currentRetryAttempt : numRequestsMade ,
1596+ // Handling retries in this client. Specify the retry options to
1597+ // make sure nothing is retried in retry-request.
1598+ noResponseRetries : 0 ,
1599+ shouldRetryFn : ( _ : any ) => {
1600+ return false ;
1601+ } ,
15441602 } ;
15451603
15461604 this . bigtable
@@ -1552,13 +1610,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
15521610 retryOpts,
15531611 } )
15541612 . on ( 'error' , ( err : ServiceError ) => {
1555- // TODO: this check doesn't actually do anything, onBatchResponse
1556- // currently doesn't retry RPC errors, only entry failures
1557- if ( numRequestsMade === 0 ) {
1558- callback ( err ) ; // Likely a "projectId not detected" error.
1559- return ;
1560- }
1561-
15621613 onBatchResponse ( err ) ;
15631614 } )
15641615 . on ( 'data' , ( obj : google . bigtable . v2 . IMutateRowsResponse ) => {
@@ -1572,13 +1623,13 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
15721623 mutationErrorsByEntryIndex . delete ( originalEntriesIndex ) ;
15731624 return ;
15741625 }
1575- if ( ! RETRYABLE_STATUS_CODES . has ( entry . status ! . code ! ) ) {
1626+ if ( ! IDEMPOTENT_RETRYABLE_STATUS_CODES . has ( entry . status ! . code ! ) ) {
15761627 pendingEntryIndices . delete ( originalEntriesIndex ) ;
15771628 }
1578- const status = decorateStatus ( entry . status ) ;
1629+ const errorDetails = entry . status ;
15791630 // eslint-disable-next-line @typescript-eslint/no-explicit-any
1580- ( status as any ) . entry = originalEntry ;
1581- mutationErrorsByEntryIndex . set ( originalEntriesIndex , status ) ;
1631+ ( errorDetails as any ) . entry = originalEntry ;
1632+ mutationErrorsByEntryIndex . set ( originalEntriesIndex , errorDetails ) ;
15821633 } ) ;
15831634 } )
15841635 . on ( 'end' , onBatchResponse ) ;
@@ -1997,14 +2048,25 @@ promisifyAll(Table, {
19972048 exclude : [ 'family' , 'row' ] ,
19982049} ) ;
19992050
2051+ function getNextDelay ( numConsecutiveErrors : number , config : BackoffSettings ) {
2052+ // 0 - 100 ms jitter
2053+ const jitter = Math . floor ( Math . random ( ) * 100 ) ;
2054+ const calculatedNextRetryDelay =
2055+ config . initialRetryDelayMillis *
2056+ Math . pow ( config . retryDelayMultiplier , numConsecutiveErrors ) +
2057+ jitter ;
2058+
2059+ return Math . min ( calculatedNextRetryDelay , config . maxRetryDelayMillis ) ;
2060+ }
2061+
20002062export interface GoogleInnerError {
20012063 reason ?: string ;
20022064 message ?: string ;
20032065}
20042066
20052067export class PartialFailureError extends Error {
20062068 errors ?: GoogleInnerError [ ] ;
2007- constructor ( errors : GoogleInnerError [ ] ) {
2069+ constructor ( errors : GoogleInnerError [ ] , rpcError ?: ServiceError | null ) {
20082070 super ( ) ;
20092071 this . errors = errors ;
20102072 this . name = 'PartialFailureError' ;
@@ -2017,5 +2079,8 @@ export class PartialFailureError extends Error {
20172079 messages . push ( '\n' ) ;
20182080 }
20192081 this . message = messages . join ( '\n' ) ;
2082+ if ( rpcError ) {
2083+ this . message += 'Request failed with: ' + rpcError . message ;
2084+ }
20202085 }
20212086}
0 commit comments