-
Notifications
You must be signed in to change notification settings - Fork 782
Description
If we use the RefCount overload that takes a disconnectDelay argument, and a minObservers of 2 or more, it does not work with sources that complete immediately on subscription.
For example, this:
var o2 = Observable.Empty<int>().Publish().RefCount(2, TimeSpan.FromSeconds(10));
var s1 = o2.Subscribe();
var s2 = o2.Subscribe();
s1.Dispose();
s2.Dispose();
var s3 = o2.Subscribe();will throw an InvalidOperationException reporting that a Disposable has already been assigned from the final line.
Here's the sequence of events:
- Initial subscription does not connect because we've not reached the
minObserversthreshold - Second subscription hits the threshold, so
RefCountcallsConnecton the connectable observable returned byPublish, which immediately completes both subscriptions (before returning fromConnect) - Because both subscriptions have now completed, the
RefCount's internal count drops to 0, so it schedules the work item that will eventually disconnect after the specified delay - The code calls
Disposeto shut down the first two subscriptions, but this doesn't do anything because they have both already terminated as a result of the underlying source delivering anOnComplete - The third subscription to
RefCounthappens before thedisconnectDelayhas elapsed. (Note that theRefCountis still connected to the connectable observable returned byPublish.) RefCountcallsSubscribeon its source (the observable returned byPublish) and since that happens to be aSubject<int>that has already completed, that source immediately callsOnCompletebefore returning fromSubscribe, meaning that this newly-created subscription is already completed before that call toSubscribereturns.- The
RefCount's internal count is 1 at this point, but it then sets up the callback to run on completion, which executes immediately because this new subscription is already complete, and this causes the internal count to drop back to 0 a second time. RefCounttries to schedule a second work item to disconnect after the specified delay, but the one it set up earlier the first time the count dropped to 0 remains in place. It's using aSingleAssignmentDisposableto hold that, which notices the attempt to assign a second disposable, and throws this exception.
The basic problem seems to be that RefCount.Lazy (which is used when a disconnectDelay is specified) does not correctly handle the transition from the "0 subscribers, but still connected" state to "at least 1 subscriber" state. It should cancel the work item that would perform the delayed disconnect, but that still seems to be in place, which seems to be why we get this exception.