@@ -127,7 +127,6 @@ export interface MessageStreamOptions {
127127 * @param {MessageStreamOptions } [options] The message stream options.
128128 */
129129export class MessageStream extends PassThrough {
130- destroyed : boolean ;
131130 private _keepAliveHandle : NodeJS . Timer ;
132131 private _fillHandle ?: NodeJS . Timer ;
133132 private _options : MessageStreamOptions ;
@@ -139,7 +138,6 @@ export class MessageStream extends PassThrough {
139138
140139 super ( { objectMode : true , highWaterMark : options . highWaterMark } ) ;
141140
142- this . destroyed = false ;
143141 this . _options = options ;
144142 this . _retrier = new PullRetry ( ) ;
145143 this . _streams = new Map ( ) ;
@@ -156,14 +154,24 @@ export class MessageStream extends PassThrough {
156154 /**
157155 * Destroys the stream and any underlying streams.
158156 *
159- * @param {error? } err An error to emit, if any.
157+ * @param {error? } error An error to emit, if any.
160158 * @private
161159 */
162- destroy ( err ?: Error ) : void {
160+ destroy ( error ?: Error | null ) : void {
161+ // We can't assume Node has taken care of this in <14.
163162 if ( this . destroyed ) {
164163 return ;
165164 }
166-
165+ super . destroy ( error ? error : undefined ) ;
166+ }
167+ /**
168+ * Destroys the stream and any underlying streams.
169+ *
170+ * @param {error? } error An error to emit, if any.
171+ * @param {Function } callback Callback for completion of any destruction.
172+ * @private
173+ */
174+ _destroy ( error : Error | null , callback : ( error : Error | null ) => void ) : void {
167175 this . destroyed = true ;
168176 clearInterval ( this . _keepAliveHandle ) ;
169177
@@ -172,16 +180,7 @@ export class MessageStream extends PassThrough {
172180 stream . cancel ( ) ;
173181 }
174182
175- if ( typeof super . destroy === 'function' ) {
176- return super . destroy ( err ) ;
177- }
178-
179- process . nextTick ( ( ) => {
180- if ( err ) {
181- this . emit ( 'error' , err ) ;
182- }
183- this . emit ( 'close' ) ;
184- } ) ;
183+ callback ( error ) ;
185184 }
186185 /**
187186 * Adds a StreamingPull stream to the combined stream.
@@ -226,8 +225,12 @@ export class MessageStream extends PassThrough {
226225 const request : StreamingPullRequest = {
227226 subscription : this . _subscriber . name ,
228227 streamAckDeadlineSeconds : this . _subscriber . ackDeadline ,
229- maxOutstandingMessages : this . _subscriber . maxMessages ,
230- maxOutstandingBytes : this . _subscriber . maxBytes ,
228+ maxOutstandingMessages : this . _subscriber . useLegacyFlowControl
229+ ? 0
230+ : this . _subscriber . maxMessages ,
231+ maxOutstandingBytes : this . _subscriber . useLegacyFlowControl
232+ ? 0
233+ : this . _subscriber . maxBytes ,
231234 } ;
232235
233236 delete this . _fillHandle ;
0 commit comments