Skip to content
This repository was archived by the owner on Mar 17, 2026. It is now read-only.

Commit 0ba3b21

Browse files
authored
Merge branch 'master' into autosynth-synthtool
2 parents f8827e7 + a9c7e0b commit 0ba3b21

3 files changed

Lines changed: 29 additions & 55 deletions

File tree

src/message-stream.ts

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ export interface MessageStreamOptions {
127127
* @param {MessageStreamOptions} [options] The message stream options.
128128
*/
129129
export class MessageStream extends PassThrough {
130-
destroyed: boolean;
131130
private _keepAliveHandle: NodeJS.Timer;
132131
private _fillHandle?: NodeJS.Timer;
133132
private _options: MessageStreamOptions;
@@ -139,7 +138,6 @@ export class MessageStream extends PassThrough {
139138

140139
super({objectMode: true, highWaterMark: options.highWaterMark});
141140

142-
this.destroyed = false;
143141
this._options = options;
144142
this._retrier = new PullRetry();
145143
this._streams = new Map();
@@ -156,14 +154,24 @@ export class MessageStream extends PassThrough {
156154
/**
157155
* Destroys the stream and any underlying streams.
158156
*
159-
* @param {error?} err An error to emit, if any.
157+
* @param {error?} error An error to emit, if any.
160158
* @private
161159
*/
162-
destroy(err?: Error): void {
160+
destroy(error?: Error | null): void {
161+
// We can't assume Node has taken care of this in <14.
163162
if (this.destroyed) {
164163
return;
165164
}
166-
165+
super.destroy(error ? error : undefined);
166+
}
167+
/**
168+
* Destroys the stream and any underlying streams.
169+
*
170+
* @param {error?} error An error to emit, if any.
171+
* @param {Function} callback Callback for completion of any destruction.
172+
* @private
173+
*/
174+
_destroy(error: Error | null, callback: (error: Error | null) => void): void {
167175
this.destroyed = true;
168176
clearInterval(this._keepAliveHandle);
169177

@@ -172,16 +180,7 @@ export class MessageStream extends PassThrough {
172180
stream.cancel();
173181
}
174182

175-
if (typeof super.destroy === 'function') {
176-
return super.destroy(err);
177-
}
178-
179-
process.nextTick(() => {
180-
if (err) {
181-
this.emit('error', err);
182-
}
183-
this.emit('close');
184-
});
183+
callback(error);
185184
}
186185
/**
187186
* Adds a StreamingPull stream to the combined stream.
@@ -226,8 +225,12 @@ export class MessageStream extends PassThrough {
226225
const request: StreamingPullRequest = {
227226
subscription: this._subscriber.name,
228227
streamAckDeadlineSeconds: this._subscriber.ackDeadline,
229-
maxOutstandingMessages: this._subscriber.maxMessages,
230-
maxOutstandingBytes: this._subscriber.maxBytes,
228+
maxOutstandingMessages: this._subscriber.useLegacyFlowControl
229+
? 0
230+
: this._subscriber.maxMessages,
231+
maxOutstandingBytes: this._subscriber.useLegacyFlowControl
232+
? 0
233+
: this._subscriber.maxBytes,
231234
};
232235

233236
delete this._fillHandle;

src/subscriber.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ export interface SubscriberOptions {
203203
ackDeadline?: number;
204204
batching?: BatchOptions;
205205
flowControl?: FlowControlOptions;
206+
useLegacyFlowControl?: boolean;
206207
streamingOptions?: MessageStreamOptions;
207208
enableOpenTelemetryTracing?: boolean;
208209
}
@@ -214,6 +215,9 @@ export interface SubscriberOptions {
214215
* 99th percentile time it takes to acknowledge a message.
215216
* @property {BatchOptions} [batching] Request batching options.
216217
* @property {FlowControlOptions} [flowControl] Flow control options.
218+
* @property {boolean} [useLegacyFlowControl] Disables enforcing flow control
219+
* settings at the Cloud PubSub server and uses the less accurate method
220+
* of only enforcing flow control at the client side.
217221
* @property {MessageStreamOptions} [streamingOptions] Streaming options.
218222
*/
219223
/**
@@ -229,6 +233,7 @@ export class Subscriber extends EventEmitter {
229233
ackDeadline: number;
230234
maxMessages: number;
231235
maxBytes: number;
236+
useLegacyFlowControl: boolean;
232237
isOpen: boolean;
233238
private _acks!: AckQueue;
234239
private _histogram: Histogram;
@@ -247,6 +252,7 @@ export class Subscriber extends EventEmitter {
247252
this.ackDeadline = 10;
248253
this.maxMessages = defaultOptions.subscription.maxOutstandingMessages;
249254
this.maxBytes = defaultOptions.subscription.maxOutstandingBytes;
255+
this.useLegacyFlowControl = false;
250256
this.isOpen = false;
251257
this._isUserSetDeadline = false;
252258
this._histogram = new Histogram({min: 10, max: 600});
@@ -402,6 +408,7 @@ export class Subscriber extends EventEmitter {
402408
this._isUserSetDeadline = true;
403409
}
404410

411+
this.useLegacyFlowControl = options.useLegacyFlowControl || false;
405412
if (options.flowControl) {
406413
this.maxMessages =
407414
options.flowControl!.maxMessages ||

test/message-stream.ts

Lines changed: 2 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616

1717
import * as assert from 'assert';
18-
import {describe, it, before, beforeEach, afterEach, after} from 'mocha';
18+
import {describe, it, before, beforeEach, afterEach} from 'mocha';
1919
import {grpc} from 'google-gax';
2020
import * as proxyquire from 'proxyquire';
2121
import * as sinon from 'sinon';
@@ -302,13 +302,7 @@ describe('MessageStream', () => {
302302

303303
describe('destroy', () => {
304304
it('should noop if already destroyed', done => {
305-
sandbox
306-
.stub(FakePassThrough.prototype, 'destroy')
307-
.callsFake(function (this: Duplex) {
308-
if (this === messageStream) {
309-
done();
310-
}
311-
});
305+
messageStream.on('close', done);
312306

313307
messageStream.destroy();
314308
messageStream.destroy();
@@ -350,36 +344,6 @@ describe('MessageStream', () => {
350344
assert.strictEqual(stub.callCount, 1);
351345
});
352346
});
353-
354-
describe('without native destroy', () => {
355-
let destroy: (err?: Error) => void;
356-
357-
before(() => {
358-
destroy = FakePassThrough.prototype.destroy;
359-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
360-
FakePassThrough.prototype.destroy = false as any;
361-
});
362-
363-
after(() => {
364-
FakePassThrough.prototype.destroy = destroy;
365-
});
366-
367-
it('should emit close', done => {
368-
messageStream.on('close', done);
369-
messageStream.destroy();
370-
});
371-
372-
it('should emit an error if present', done => {
373-
const fakeError = new Error('err');
374-
375-
messageStream.on('error', err => {
376-
assert.strictEqual(err, fakeError);
377-
done();
378-
});
379-
380-
messageStream.destroy(fakeError);
381-
});
382-
});
383347
});
384348

385349
describe('pull stream lifecycle', () => {

0 commit comments

Comments
 (0)