feat: add timeout option and graceful shutdown to Subscription.close()#2068
feat: add timeout option and graceful shutdown to Subscription.close()#2068
Conversation
Implements a new `timeout` option (using `Duration`) for the `Subscription.close()` method. This provides more control over the shutdown process: - If `timeout` is zero, the subscription closes as quickly as possible without nacking buffered messages. - If `timeout` is positive, the subscription attempts to nack any buffered messages (in the lease manager) and waits up to the specified duration for pending acknowledgements and nacks to be sent to the server. - If no timeout is provided, the behavior remains as before (waits indefinitely for pending acks/modacks, no nacking). The core logic is implemented in `Subscriber.close()`. `PubSub.close()` documentation is updated to clarify its scope and recommend using `Subscription.close()` directly for this feature. Includes: - Unit tests for the new timeout behavior in `Subscriber.close()`. - A TypeScript sample (`samples/closeSubscriptionWithTimeout.ts`) demonstrating usage. - Updated JSDoc documentation for relevant methods.
| const remaining = this._inventory.clear(); | ||
|
|
||
| await this._waitForFlush(); | ||
| const options = this._options.closeOptions; |
There was a problem hiding this comment.
this._options is guaranteed to be non-undefined, so I don't think it's necessary here. The lines that use options do optional chaining, since closeOptions may be undefined.
| // The timeout can't realistically be longer than the longest time we're willing | ||
| // to lease messages. | ||
| let timeout = durationAtMost( | ||
| options?.timeout ?? this.maxExtensionTime, |
There was a problem hiding this comment.
is maxExtensionTime a const? How do we know it will be available? If the former, maybe worth uppercasing?
There was a problem hiding this comment.
It's a config option with a default. So it's set by the constructor:
this.maxExtensionTime = defaultOptions.subscription.maxExtensionTime;
And then if the user passed a value, it overwrites the default.
miguelvelezsa
left a comment
There was a problem hiding this comment.
I would highly recommend to add types in all the new code :)
| * | ||
| * @private | ||
| */ | ||
| dispatched(): void { |
There was a problem hiding this comment.
It's less of a command, more of an event. Basically there is something else doing dispatching, and this method is called to notify telemetry and such. So I don't think I'd call it dispatch, but I'm open to other ideas.
src/subscriber.ts
Outdated
| const err = e as [unknown, boolean]; | ||
| if (err[1] === false) { |
There was a problem hiding this comment.
for my own knowledge, how err[1] === true means was timeout? probably adding type for 'e' here will help :)
There was a problem hiding this comment.
Yeah, this has to do with implementing the time-limited Promise wait... it's super annoying to make graceful. I'll look again.
There was a problem hiding this comment.
Not sure why it didn't give me a separate box for comments on the one above, but check out Sofia's comment on why I did this like I did.
I agree with you on the second one. I'll add an interface.
Can you elaborate here? The only time I tend to omit them is when they're super obvious (like making a class member like Edit: it didn't tag you :) @miguelvelezsa |
|
I need to look at a few more review comments before merging anything. |
…imeout' into feat-close-timeout
|
Warning: This pull request is touching the following templated files:
|
| if ( | ||
| behavior === SubscriberCloseBehaviors.WaitForProcessing && | ||
| !this._inventory.isEmpty | ||
| ) { |
There was a problem hiding this comment.
Was just debugging the WaitForProcessing functionality locally and stumbled upon this check.
@feywind wouldn't this need to be the following? isEmpty seems to be undefined, instead this might have to be a method call?
| if ( | |
| behavior === SubscriberCloseBehaviors.WaitForProcessing && | |
| !this._inventory.isEmpty | |
| ) { | |
| if ( | |
| behavior === SubscriberCloseBehaviors.WaitForProcessing && | |
| !this._inventory.isEmpty() | |
| ) { |
Implements a new
timeoutandbehavioroption (usingDuration) for theSubscription.close()method. These options are on the SubscriberOptions/SubscriptionOptions passed in when opening a subscriber.This provides more control over the shutdown process:
(This is a re-open of #2037 to make it from the main repo.)