Skip to content
This repository was archived by the owner on Mar 26, 2026. It is now read-only.

Commit 33754a2

Browse files
authored
fix: do not cancel stream after server returned ok or cancelled status (#1029)
* fix: do not cancel stream after server returned ok or cancelled status * remove comment * lint * lint
1 parent 2576d14 commit 33754a2

1 file changed

Lines changed: 23 additions & 19 deletions

File tree

src/table.ts

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -728,7 +728,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
728728
createReadStream(opts?: GetRowsOptions) {
729729
const options = opts || {};
730730
const maxRetries = is.number(this.maxRetries) ? this.maxRetries! : 3;
731-
let activeRequestStream: AbortableDuplex;
731+
let activeRequestStream: AbortableDuplex | null;
732732
let rowKeys: string[];
733733
const ranges = options.ranges || [];
734734
let filter: {} | null;
@@ -786,7 +786,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
786786
userStream.end = () => {
787787
rowStream?.unpipe(userStream);
788788
if (activeRequestStream) {
789-
// TODO: properly end the stream instead of abort
790789
activeRequestStream.abort();
791790
}
792791
return end();
@@ -927,23 +926,28 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
927926

928927
rowStream = pumpify.obj([requestStream, chunkTransformer, toRowStream]);
929928

930-
rowStream.on('error', (error: ServiceError) => {
931-
if (IGNORED_STATUS_CODES.has(error.code)) {
932-
// We ignore the `cancelled` "error", since we are the ones who cause
933-
// it when the user calls `.abort()`.
934-
userStream.end();
935-
return;
936-
}
937-
rowStream.unpipe(userStream);
938-
if (
939-
numRequestsMade <= maxRetries &&
940-
RETRYABLE_STATUS_CODES.has(error.code)
941-
) {
942-
makeNewRequest();
943-
} else {
944-
userStream.emit('error', error);
945-
}
946-
});
929+
rowStream
930+
.on('error', (error: ServiceError) => {
931+
rowStream.unpipe(userStream);
932+
activeRequestStream = null;
933+
if (IGNORED_STATUS_CODES.has(error.code)) {
934+
// We ignore the `cancelled` "error", since we are the ones who cause
935+
// it when the user calls `.abort()`.
936+
userStream.end();
937+
return;
938+
}
939+
if (
940+
numRequestsMade <= maxRetries &&
941+
RETRYABLE_STATUS_CODES.has(error.code)
942+
) {
943+
makeNewRequest();
944+
} else {
945+
userStream.emit('error', error);
946+
}
947+
})
948+
.on('end', () => {
949+
activeRequestStream = null;
950+
});
947951
rowStream.pipe(userStream);
948952
numRequestsMade++;
949953
};

0 commit comments

Comments
 (0)