Skip to content
This repository was archived by the owner on Mar 17, 2026. It is now read-only.

Commit a9c7e0b

Browse files
feat: Enable server side flow control by default with the option to turn it off (#1147)
This change enables sending flow control settings automatically to the server. If flowControl.maxMessages > 0 or flowControl.maxBytes > 0, flow control will be enforced at the server side (in addition to the client side). This behavior is enabled by default and SubscriberOptions.useLegacyFlowControl can be used for users who would like to opt-out of this feature in case they encounter issues with server side flow control.
1 parent e421749 commit a9c7e0b

2 files changed

Lines changed: 13 additions & 2 deletions

File tree

src/message-stream.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,12 @@ export class MessageStream extends PassThrough {
225225
const request: StreamingPullRequest = {
226226
subscription: this._subscriber.name,
227227
streamAckDeadlineSeconds: this._subscriber.ackDeadline,
228-
maxOutstandingMessages: this._subscriber.maxMessages,
229-
maxOutstandingBytes: this._subscriber.maxBytes,
228+
maxOutstandingMessages: this._subscriber.useLegacyFlowControl
229+
? 0
230+
: this._subscriber.maxMessages,
231+
maxOutstandingBytes: this._subscriber.useLegacyFlowControl
232+
? 0
233+
: this._subscriber.maxBytes,
230234
};
231235

232236
delete this._fillHandle;

src/subscriber.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ export interface SubscriberOptions {
203203
ackDeadline?: number;
204204
batching?: BatchOptions;
205205
flowControl?: FlowControlOptions;
206+
useLegacyFlowControl?: boolean;
206207
streamingOptions?: MessageStreamOptions;
207208
enableOpenTelemetryTracing?: boolean;
208209
}
@@ -214,6 +215,9 @@ export interface SubscriberOptions {
214215
* 99th percentile time it takes to acknowledge a message.
215216
* @property {BatchOptions} [batching] Request batching options.
216217
* @property {FlowControlOptions} [flowControl] Flow control options.
218+
* @property {boolean} [useLegacyFlowControl] Disables enforcing flow control
219+
* settings at the Cloud PubSub server and uses the less accurate method
220+
* of only enforcing flow control at the client side.
217221
* @property {MessageStreamOptions} [streamingOptions] Streaming options.
218222
*/
219223
/**
@@ -229,6 +233,7 @@ export class Subscriber extends EventEmitter {
229233
ackDeadline: number;
230234
maxMessages: number;
231235
maxBytes: number;
236+
useLegacyFlowControl: boolean;
232237
isOpen: boolean;
233238
private _acks!: AckQueue;
234239
private _histogram: Histogram;
@@ -247,6 +252,7 @@ export class Subscriber extends EventEmitter {
247252
this.ackDeadline = 10;
248253
this.maxMessages = defaultOptions.subscription.maxOutstandingMessages;
249254
this.maxBytes = defaultOptions.subscription.maxOutstandingBytes;
255+
this.useLegacyFlowControl = false;
250256
this.isOpen = false;
251257
this._isUserSetDeadline = false;
252258
this._histogram = new Histogram({min: 10, max: 600});
@@ -402,6 +408,7 @@ export class Subscriber extends EventEmitter {
402408
this._isUserSetDeadline = true;
403409
}
404410

411+
this.useLegacyFlowControl = options.useLegacyFlowControl || false;
405412
if (options.flowControl) {
406413
this.maxMessages =
407414
options.flowControl!.maxMessages ||

0 commit comments

Comments
 (0)