@@ -729,13 +729,16 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
729729 const options = opts || { } ;
730730 const maxRetries = is . number ( this . maxRetries ) ? this . maxRetries ! : 3 ;
731731 let activeRequestStream : AbortableDuplex ;
732- let rowKeys : string [ ] | null ;
732+ let rowKeys : string [ ] ;
733733 const ranges = options . ranges || [ ] ;
734734 let filter : { } | null ;
735- let rowsLimit : number ;
735+ const rowsLimit = options . limit || 0 ;
736+ const hasLimit = rowsLimit !== 0 ;
736737 let rowsRead = 0 ;
737738 let numRequestsMade = 0 ;
738739
740+ rowKeys = options . keys || [ ] ;
741+
739742 if ( options . start || options . end ) {
740743 if ( options . ranges || options . prefix || options . prefixes ) {
741744 throw new Error (
@@ -748,10 +751,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
748751 } ) ;
749752 }
750753
751- if ( options . keys ) {
752- rowKeys = options . keys ;
753- }
754-
755754 if ( options . prefix ) {
756755 if ( options . ranges || options . start || options . end || options . prefixes ) {
757756 throw new Error (
@@ -772,19 +771,22 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
772771 } ) ;
773772 }
774773
775- if ( options . filter ) {
776- filter = Filter . parse ( options . filter ) ;
774+ // If rowKeys and ranges are both empty, the request is a full table scan.
775+ // Add an empty range to simplify the resumption logic.
776+ if ( rowKeys . length === 0 && ranges . length === 0 ) {
777+ ranges . push ( { } ) ;
777778 }
778779
779- if ( options . limit ) {
780- rowsLimit = options . limit ;
780+ if ( options . filter ) {
781+ filter = Filter . parse ( options . filter ) ;
781782 }
782783
783784 const userStream = new PassThrough ( { objectMode : true } ) ;
784785 const end = userStream . end . bind ( userStream ) ;
785786 userStream . end = ( ) => {
786787 rowStream ?. unpipe ( userStream ) ;
787788 if ( activeRequestStream ) {
789+ // TODO: properly end the stream instead of abort
788790 activeRequestStream . abort ( ) ;
789791 }
790792 return end ( ) ;
@@ -808,90 +810,90 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
808810 } ;
809811
810812 if ( lastRowKey ) {
813+ // TODO: lhs and rhs type shouldn't be string, it could be
814+ // string, number, Uint8Array, boolean. Fix the type
815+ // and clean up the casting.
811816 const lessThan = ( lhs : string , rhs : string ) => {
812817 const lhsBytes = Mutation . convertToBytes ( lhs ) ;
813818 const rhsBytes = Mutation . convertToBytes ( rhs ) ;
814819 return ( lhsBytes as Buffer ) . compare ( rhsBytes as Uint8Array ) === - 1 ;
815820 } ;
816821 const greaterThan = ( lhs : string , rhs : string ) => lessThan ( rhs , lhs ) ;
817- const greaterThanOrEqualTo = ( lhs : string , rhs : string ) =>
818- ! lessThan ( rhs , lhs ) ;
819-
820- if ( ranges . length === 0 ) {
821- ranges . push ( {
822- start : {
823- value : lastRowKey ,
824- inclusive : false ,
825- } ,
826- } ) ;
827- } else {
828- // Readjust and/or remove ranges based on previous valid row reads.
829-
830- // Iterate backward since items may need to be removed.
831- for ( let index = ranges . length - 1 ; index >= 0 ; index -- ) {
832- const range = ranges [ index ] ;
833- const startValue = is . object ( range . start )
834- ? ( range . start as BoundData ) . value
835- : range . start ;
836- const endValue = is . object ( range . end )
837- ? ( range . end as BoundData ) . value
838- : range . end ;
839- const isWithinStart =
840- ! startValue ||
841- greaterThanOrEqualTo ( startValue as string , lastRowKey as string ) ;
842- const isWithinEnd =
843- ! endValue || lessThan ( lastRowKey as string , endValue as string ) ;
844- if ( isWithinStart ) {
845- if ( isWithinEnd ) {
846- // The lastRowKey is within this range, adjust the start
847- // value.
848- range . start = {
849- value : lastRowKey ,
850- inclusive : false ,
851- } ;
852- } else {
853- // The lastRowKey is past this range, remove this range.
854- ranges . splice ( index , 1 ) ;
855- }
822+ const lessThanOrEqualTo = ( lhs : string , rhs : string ) =>
823+ ! greaterThan ( lhs , rhs ) ;
824+
825+ // Readjust and/or remove ranges based on previous valid row reads.
826+ // Iterate backward since items may need to be removed.
827+ for ( let index = ranges . length - 1 ; index >= 0 ; index -- ) {
828+ const range = ranges [ index ] ;
829+ const startValue = is . object ( range . start )
830+ ? ( range . start as BoundData ) . value
831+ : range . start ;
832+ const endValue = is . object ( range . end )
833+ ? ( range . end as BoundData ) . value
834+ : range . end ;
835+ const startKeyIsRead =
836+ ! startValue ||
837+ lessThanOrEqualTo ( startValue as string , lastRowKey as string ) ;
838+ const endKeyIsNotRead =
839+ ! endValue ||
840+ ( endValue as Buffer ) . length === 0 ||
841+ lessThan ( lastRowKey as string , endValue as string ) ;
842+ if ( startKeyIsRead ) {
843+ if ( endKeyIsNotRead ) {
844+ // EndKey is not read, reset the range to start from lastRowKey open
845+ range . start = {
846+ value : lastRowKey ,
847+ inclusive : false ,
848+ } ;
849+ } else {
850+ // EndKey is read, remove this range
851+ ranges . splice ( index , 1 ) ;
856852 }
857853 }
858854 }
859855
860856 // Remove rowKeys already read.
861- if ( rowKeys ) {
862- rowKeys = rowKeys . filter ( rowKey =>
863- greaterThan ( rowKey , lastRowKey as string )
864- ) ;
865- if ( rowKeys . length === 0 ) {
866- rowKeys = null ;
867- }
868- }
869- }
870- if ( rowKeys || ranges . length ) {
871- reqOpts . rows = { } ;
857+ rowKeys = rowKeys . filter ( rowKey =>
858+ greaterThan ( rowKey , lastRowKey as string )
859+ ) ;
872860
873- if ( rowKeys ) {
874- reqOpts . rows . rowKeys = rowKeys . map (
875- Mutation . convertToBytes
876- ) as { } as Uint8Array [ ] ;
861+ // If there was a row limit in the original request and
862+ // we've already read all the rows, end the stream and
863+ // do not retry.
864+ if ( hasLimit && rowsLimit === rowsRead ) {
865+ userStream . end ( ) ;
866+ return ;
877867 }
878-
879- if ( ranges . length ) {
880- reqOpts . rows . rowRanges = ranges . map ( range =>
881- Filter . createRange (
882- range . start as BoundData ,
883- range . end as BoundData ,
884- 'Key'
885- )
886- ) ;
868+ // If all the row keys and ranges are read, end the stream
869+ // and do not retry.
870+ if ( rowKeys . length === 0 && ranges . length === 0 ) {
871+ userStream . end ( ) ;
872+ return ;
887873 }
888874 }
889875
876+ // Create the new reqOpts
877+ reqOpts . rows = { } ;
878+
879+ // TODO: preprocess all the keys and ranges to Bytes
880+ reqOpts . rows . rowKeys = rowKeys . map (
881+ Mutation . convertToBytes
882+ ) as { } as Uint8Array [ ] ;
883+
884+ reqOpts . rows . rowRanges = ranges . map ( range =>
885+ Filter . createRange (
886+ range . start as BoundData ,
887+ range . end as BoundData ,
888+ 'Key'
889+ )
890+ ) ;
891+
890892 if ( filter ) {
891893 reqOpts . filter = filter ;
892894 }
893895
894- if ( rowsLimit ) {
896+ if ( hasLimit ) {
895897 reqOpts . rowsLimit = rowsLimit - rowsRead ;
896898 }
897899
0 commit comments