@@ -728,7 +728,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
728728 createReadStream ( opts ?: GetRowsOptions ) {
729729 const options = opts || { } ;
730730 const maxRetries = is . number ( this . maxRetries ) ? this . maxRetries ! : 3 ;
731- let activeRequestStream : AbortableDuplex ;
731+ let activeRequestStream : AbortableDuplex | null ;
732732 let rowKeys : string [ ] ;
733733 const ranges = options . ranges || [ ] ;
734734 let filter : { } | null ;
@@ -786,7 +786,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
786786 userStream . end = ( ) => {
787787 rowStream ?. unpipe ( userStream ) ;
788788 if ( activeRequestStream ) {
789- // TODO: properly end the stream instead of abort
790789 activeRequestStream . abort ( ) ;
791790 }
792791 return end ( ) ;
@@ -927,23 +926,28 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
927926
928927 rowStream = pumpify . obj ( [ requestStream , chunkTransformer , toRowStream ] ) ;
929928
930- rowStream . on ( 'error' , ( error : ServiceError ) => {
931- if ( IGNORED_STATUS_CODES . has ( error . code ) ) {
932- // We ignore the `cancelled` "error", since we are the ones who cause
933- // it when the user calls `.abort()`.
934- userStream . end ( ) ;
935- return ;
936- }
937- rowStream . unpipe ( userStream ) ;
938- if (
939- numRequestsMade <= maxRetries &&
940- RETRYABLE_STATUS_CODES . has ( error . code )
941- ) {
942- makeNewRequest ( ) ;
943- } else {
944- userStream . emit ( 'error' , error ) ;
945- }
946- } ) ;
929+ rowStream
930+ . on ( 'error' , ( error : ServiceError ) => {
931+ rowStream . unpipe ( userStream ) ;
932+ activeRequestStream = null ;
933+ if ( IGNORED_STATUS_CODES . has ( error . code ) ) {
934+ // We ignore the `cancelled` "error", since we are the ones who cause
935+ // it when the user calls `.abort()`.
936+ userStream . end ( ) ;
937+ return ;
938+ }
939+ if (
940+ numRequestsMade <= maxRetries &&
941+ RETRYABLE_STATUS_CODES . has ( error . code )
942+ ) {
943+ makeNewRequest ( ) ;
944+ } else {
945+ userStream . emit ( 'error' , error ) ;
946+ }
947+ } )
948+ . on ( 'end' , ( ) => {
949+ activeRequestStream = null ;
950+ } ) ;
947951 rowStream . pipe ( userStream ) ;
948952 numRequestsMade ++ ;
949953 } ;
0 commit comments