Skip to content
This repository was archived by the owner on Mar 26, 2026. It is now read-only.

Commit 3718011

Browse files
authored
fix: disable retry-request, add exponential backoff in mutateRows and readRows (#1060)
* fix: add rpc level retries for mutate * remove debugging logs * add exponential backoff * simplify mutate row retry logic * fix broken tests * ignore checks for retry options * fix lint * comments * reset retry after a succee response * fix lint * fix system test * clean up * add rpc status in mutate rows, and remove http status * remove unnecessary check * remove decorate status * update * fix * correct retry count
1 parent 2b175ac commit 3718011

9 files changed

Lines changed: 264 additions & 254 deletions

File tree

src/decorateStatus.ts

Lines changed: 0 additions & 80 deletions
This file was deleted.

src/index.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import {
2929
CreateInstanceResponse,
3030
IInstance,
3131
} from './instance';
32-
import {shouldRetryRequest} from './decorateStatus';
3332
import {google} from '../protos/protos';
3433
import {ServiceError} from 'google-gax';
3534
import * as v2 from './v2';
@@ -842,7 +841,6 @@ export class Bigtable {
842841
currentRetryAttempt: 0,
843842
noResponseRetries: 0,
844843
objectMode: true,
845-
shouldRetryFn: shouldRetryRequest,
846844
},
847845
config.retryOpts
848846
);

src/table.ts

Lines changed: 93 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import {promisifyAll} from '@google-cloud/promisify';
1616
import arrify = require('arrify');
1717
import {ServiceError} from 'google-gax';
18-
import {decorateStatus} from './decorateStatus';
18+
import {BackoffSettings} from 'google-gax/build/src/gax';
1919
import {PassThrough, Transform} from 'stream';
2020

2121
// eslint-disable-next-line @typescript-eslint/no-var-requires
@@ -46,9 +46,16 @@ import {Duplex} from 'stream';
4646
// See protos/google/rpc/code.proto
4747
// (4=DEADLINE_EXCEEDED, 10=ABORTED, 14=UNAVAILABLE)
4848
const RETRYABLE_STATUS_CODES = new Set([4, 10, 14]);
49+
const IDEMPOTENT_RETRYABLE_STATUS_CODES = new Set([4, 14]);
4950
// (1=CANCELLED)
5051
const IGNORED_STATUS_CODES = new Set([1]);
5152

53+
const DEFAULT_BACKOFF_SETTINGS: BackoffSettings = {
54+
initialRetryDelayMillis: 10,
55+
retryDelayMultiplier: 2,
56+
maxRetryDelayMillis: 60000,
57+
};
58+
5259
/**
5360
* @typedef {object} Policy
5461
* @property {number} [version] Specifies the format of the policy.
@@ -735,7 +742,8 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
735742
const rowsLimit = options.limit || 0;
736743
const hasLimit = rowsLimit !== 0;
737744
let rowsRead = 0;
738-
let numRequestsMade = 0;
745+
let numConsecutiveErrors = 0;
746+
let retryTimer: NodeJS.Timeout | null;
739747

740748
rowKeys = options.keys || [];
741749

@@ -788,13 +796,20 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
788796
if (activeRequestStream) {
789797
activeRequestStream.abort();
790798
}
799+
if (retryTimer) {
800+
clearTimeout(retryTimer);
801+
}
791802
return end();
792803
};
793804

794805
let chunkTransformer: ChunkTransformer;
795806
let rowStream: Duplex;
796807

797808
const makeNewRequest = () => {
809+
// Avoid cancelling an expired timer if user
810+
// cancelled the stream in the middle of a retry
811+
retryTimer = null;
812+
798813
const lastRowKey = chunkTransformer ? chunkTransformer.lastRowKey : '';
799814
// eslint-disable-next-line @typescript-eslint/no-explicit-any
800815
chunkTransformer = new ChunkTransformer({decode: options.decode} as any);
@@ -805,7 +820,13 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
805820
} as google.bigtable.v2.IReadRowsRequest;
806821

807822
const retryOpts = {
808-
currentRetryAttempt: numRequestsMade,
823+
currentRetryAttempt: numConsecutiveErrors,
824+
// Handling retries in this client. Specify the retry options to
825+
// make sure nothing is retried in retry-request.
826+
noResponseRetries: 0,
827+
shouldRetryFn: (_: any) => {
828+
return false;
829+
},
809830
};
810831

811832
if (lastRowKey) {
@@ -915,7 +936,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
915936
) {
916937
return next();
917938
}
918-
numRequestsMade = 0;
919939
rowsRead++;
920940
const row = this.row(rowData.key);
921941
row.data = rowData.data;
@@ -936,20 +956,32 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
936956
userStream.end();
937957
return;
938958
}
959+
numConsecutiveErrors++;
939960
if (
940-
numRequestsMade <= maxRetries &&
961+
numConsecutiveErrors <= maxRetries &&
941962
RETRYABLE_STATUS_CODES.has(error.code)
942963
) {
943-
makeNewRequest();
964+
const backOffSettings =
965+
options.gaxOptions?.retry?.backoffSettings ||
966+
DEFAULT_BACKOFF_SETTINGS;
967+
const nextRetryDelay = getNextDelay(
968+
numConsecutiveErrors,
969+
backOffSettings
970+
);
971+
retryTimer = setTimeout(makeNewRequest, nextRetryDelay);
944972
} else {
945973
userStream.emit('error', error);
946974
}
947975
})
976+
.on('data', _ => {
977+
// Reset error count after a successful read so the backoff
978+
// time won't keep increasing when as stream had multiple errors
979+
numConsecutiveErrors = 0;
980+
})
948981
.on('end', () => {
949982
activeRequestStream = null;
950983
});
951984
rowStream.pipe(userStream);
952-
numRequestsMade++;
953985
};
954986

955987
makeNewRequest();
@@ -1504,23 +1536,43 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
15041536
);
15051537
const mutationErrorsByEntryIndex = new Map();
15061538

1507-
const onBatchResponse = (
1508-
err: ServiceError | PartialFailureError | null
1509-
) => {
1510-
// TODO: enable retries when the entire RPC fails
1511-
if (err) {
1512-
// The error happened before a request was even made, don't retry.
1539+
const isRetryable = (err: ServiceError | null) => {
1540+
// Don't retry if there are no more entries or retry attempts
1541+
if (pendingEntryIndices.size === 0 || numRequestsMade >= maxRetries + 1) {
1542+
return false;
1543+
}
1544+
// If the error is empty but there are still outstanding mutations,
1545+
// it means that there are retryable errors in the mutate response
1546+
// even when the RPC succeeded
1547+
return !err || IDEMPOTENT_RETRYABLE_STATUS_CODES.has(err.code);
1548+
};
1549+
1550+
const onBatchResponse = (err: ServiceError | null) => {
1551+
// Return if the error happened before a request was made
1552+
if (numRequestsMade === 0) {
15131553
callback(err);
15141554
return;
15151555
}
1516-
if (pendingEntryIndices.size !== 0 && numRequestsMade <= maxRetries) {
1517-
makeNextBatchRequest();
1556+
1557+
if (isRetryable(err)) {
1558+
const backOffSettings =
1559+
options.gaxOptions?.retry?.backoffSettings ||
1560+
DEFAULT_BACKOFF_SETTINGS;
1561+
const nextDelay = getNextDelay(numRequestsMade, backOffSettings);
1562+
setTimeout(makeNextBatchRequest, nextDelay);
15181563
return;
15191564
}
15201565

1566+
// If there's no more pending mutations, set the error
1567+
// to null
1568+
if (pendingEntryIndices.size === 0) {
1569+
err = null;
1570+
}
1571+
15211572
if (mutationErrorsByEntryIndex.size !== 0) {
15221573
const mutationErrors = Array.from(mutationErrorsByEntryIndex.values());
1523-
err = new PartialFailureError(mutationErrors);
1574+
callback(new PartialFailureError(mutationErrors, err));
1575+
return;
15241576
}
15251577

15261578
callback(err);
@@ -1541,6 +1593,12 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
15411593

15421594
const retryOpts = {
15431595
currentRetryAttempt: numRequestsMade,
1596+
// Handling retries in this client. Specify the retry options to
1597+
// make sure nothing is retried in retry-request.
1598+
noResponseRetries: 0,
1599+
shouldRetryFn: (_: any) => {
1600+
return false;
1601+
},
15441602
};
15451603

15461604
this.bigtable
@@ -1552,13 +1610,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
15521610
retryOpts,
15531611
})
15541612
.on('error', (err: ServiceError) => {
1555-
// TODO: this check doesn't actually do anything, onBatchResponse
1556-
// currently doesn't retry RPC errors, only entry failures
1557-
if (numRequestsMade === 0) {
1558-
callback(err); // Likely a "projectId not detected" error.
1559-
return;
1560-
}
1561-
15621613
onBatchResponse(err);
15631614
})
15641615
.on('data', (obj: google.bigtable.v2.IMutateRowsResponse) => {
@@ -1572,13 +1623,13 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
15721623
mutationErrorsByEntryIndex.delete(originalEntriesIndex);
15731624
return;
15741625
}
1575-
if (!RETRYABLE_STATUS_CODES.has(entry.status!.code!)) {
1626+
if (!IDEMPOTENT_RETRYABLE_STATUS_CODES.has(entry.status!.code!)) {
15761627
pendingEntryIndices.delete(originalEntriesIndex);
15771628
}
1578-
const status = decorateStatus(entry.status);
1629+
const errorDetails = entry.status;
15791630
// eslint-disable-next-line @typescript-eslint/no-explicit-any
1580-
(status as any).entry = originalEntry;
1581-
mutationErrorsByEntryIndex.set(originalEntriesIndex, status);
1631+
(errorDetails as any).entry = originalEntry;
1632+
mutationErrorsByEntryIndex.set(originalEntriesIndex, errorDetails);
15821633
});
15831634
})
15841635
.on('end', onBatchResponse);
@@ -1997,14 +2048,25 @@ promisifyAll(Table, {
19972048
exclude: ['family', 'row'],
19982049
});
19992050

2051+
function getNextDelay(numConsecutiveErrors: number, config: BackoffSettings) {
2052+
// 0 - 100 ms jitter
2053+
const jitter = Math.floor(Math.random() * 100);
2054+
const calculatedNextRetryDelay =
2055+
config.initialRetryDelayMillis *
2056+
Math.pow(config.retryDelayMultiplier, numConsecutiveErrors) +
2057+
jitter;
2058+
2059+
return Math.min(calculatedNextRetryDelay, config.maxRetryDelayMillis);
2060+
}
2061+
20002062
export interface GoogleInnerError {
20012063
reason?: string;
20022064
message?: string;
20032065
}
20042066

20052067
export class PartialFailureError extends Error {
20062068
errors?: GoogleInnerError[];
2007-
constructor(errors: GoogleInnerError[]) {
2069+
constructor(errors: GoogleInnerError[], rpcError?: ServiceError | null) {
20082070
super();
20092071
this.errors = errors;
20102072
this.name = 'PartialFailureError';
@@ -2017,5 +2079,8 @@ export class PartialFailureError extends Error {
20172079
messages.push('\n');
20182080
}
20192081
this.message = messages.join('\n');
2082+
if (rpcError) {
2083+
this.message += 'Request failed with: ' + rpcError.message;
2084+
}
20202085
}
20212086
}

system-test/data/mutate-rows-retry-test.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@
7070
],
7171
"responses": [
7272
{ "code": 200, "entry_codes": [ 4, 4, 4, 4, 4, 1 ] },
73-
{ "code": 200, "entry_codes": [ 10, 14, 10, 14, 0 ] },
73+
{ "code": 200, "entry_codes": [ 4, 14, 14, 14, 0 ] },
7474
{ "code": 200, "entry_codes": [ 1, 4, 4, 0 ] },
7575
{ "code": 200, "entry_codes": [ 0, 4 ] },
7676
{ "code": 200, "entry_codes": [ 4 ] },

system-test/data/read-rows-retry-test.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676

7777
{
7878
"name": "resets the retry counter after a successful read",
79-
"max_retries": 3,
79+
"max_retries": 4,
8080
"request_options": [
8181
{ "rowKeys": [],
8282
"rowRanges": [{}]
@@ -211,7 +211,7 @@
211211

212212
{
213213
"name": "does the previous 5 things in one giant test case",
214-
"max_retries": 3,
214+
"max_retries": 4,
215215
"createReadStream_options": {
216216
"limit": 10,
217217
"ranges": [{

0 commit comments

Comments
 (0)