-
Notifications
You must be signed in to change notification settings - Fork 782
Adding ObservableDisposable #2180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@dotnet-policy-service agree |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this offering.
You said you were "here to learn." Brace yourself! 😊
I have one basic issue with this that I'd want to be addressed before we could merge this, which is that this currently looks more like code from an application consuming Rx than it does like Rx internals. I explain in the comments in this review why we normally implement observables in a particular way that looks different from how you'd normally do it as an Rx consumer.
I think there are also questions over exactly how this functionality should look in Rx's public API that need to be resolved. The current approach might be incompatible with addressing the preceding point.
I also have a broader question: how often have you needed this? Any time we add new surface area to Rx, that's more support overhead, more potential bugs, more surface area for security flaws. To be fair, this one doesn't look likely to be a big problem on any of those fronts, so I'm really just asking: how useful is this? I don't think it's something we've had a request for before. (That's not a "no thanks" by the way. But if you've needed this a lot, that's definitely useful to know.)
It's not obvious to me how I would use this. The cases where I typically want an IDisposable are where some method does something for the duration of some scope and then needs to clean up. These are inherently synchronous scenarios. (Even in async methods, there's still a basic "get something, use it, dispose it" sequence that is very different from the kinds of ongoing processes that Rx typically models.) So I don't yet get what this is really for.
It would be especially helpful if you could post a couple of code snippets illustrating the kinds of scenarios in which you would use this.
| /// </summary> | ||
| public sealed class ObservableDisposable : ICancelable, IObservable<Unit> | ||
| { | ||
| private Subject<Unit> _disposedNotification = new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although Subject<T> is a good way for consumers of Rx to get an IObservable<T> implementation, we tend not to use it internally for efficiency reasons.
If you look at pretty much any operator, you'll see that the IObservable<T> implementation they provide is usually based on one of the IProducer<T> implementations in Producer.cs
The reason for this is that when chaining together a sequence of operators (and most applications that use Rx rarely use just a single operator) we make a distinction between ones that are implemented by Rx, and IObservable<T> implementations where we don't know where they come from.
To ensure reliability, Rx is cautious when dealing with sources of unknown origin. It does not presume they will necessarily comply fully with the laws of Rx. We create wrappers around external observable sources to detect and handle anomalous behaviour, and we also do the same with external observers.
But with something like this:
// the details of this are nonsense - the point is we have a load of operators
// chained together
IObservable<double> r = src
.Where(x => x.Name[0] == 'I')
.OrderBy(x => x.Age)
.Skip(1)
.Select(x => x.Height)
.Average();the thing we don't want to do is have these safeguarding wrappers between every stage. We don't want this:
src -> guard -> Where -> guard -> OrderBy -> Skip -> guard -> Select -> guard -> Average -> guard
We want this:
src -> guard -> Where -> OrderBy -> Skip -> Select -> Average -> guard
We need those safeguards on the input side (assuming src here is something outside of Rx's control) and also on the output side (because who knows what exceptions might emerge from an application-supplied IObserver<T>? we need to ensure Rx's state remains coherent if the app does something odd). But we really don't need them between each and every operator, because we know that for every operator we've implemented, as long as its inputs are well behaved, its outputs will be well behaved. Similarly, as long as it is talking to a well-behaved IObserver<T>, the IObserver<T> that each of our operators presents to its predecessor will also be well behaved.
So the basic strategy is that Rx operators all recognize when they've been connected to another Rx operator, and they can bypass the relatively expensive safeguards that we need when interfacing with external code.
And although Subject<T> is an Rx library feature, it is designed very much to be used at the boundaries between Rx and application code, so we don't treat it as one of our recognized trusted operators.
So, taking that background into account, I think I would want to see this type implemented more like one of the other built-in Rx operators.
I say "I think" because one thing about the way we normally do things is that we mostly avoid making the implementing type of an IObservable<T> visible. Although when you use, say, Any, you're getting a type that derives from Producer<bool, ...>, that implementing type is hidden from you. We just expose an IObservable<T>. (And in fact Any doesn't always return the same type. Like a lot of our operators, we select between a number of different implementation strategies depending on inputs. And we want to retain the flexibility to change the set of strategies available, which means we can't have the concrete type be visible at the API. Also, our IProducer<T> base classes are all internal which prevents anything deriving from them from being public types.)
But the usage model here is quite simply:
new ObservableDisposable()This is consistent with many of our other Disposables. But it is not consistent with most of our IObservable<T>s. You've created a conundrum here by coming up with a type that is both disposable and observable. We have different common practices for each category, so what is the correct choice for something that is in both categories?
I'm inclined to say this should work more like Disposable.Empty: that gives you an IDisposable whose concrete type is a private class. So with your observable disposable, we might have a factory method like Disposable.Observable(). For that to work, we'd need also to introduce an ICancelableObservable<T> interface that inherits both interfaces, and use ICancelableObservable<Unit> as the return type of that factory method.
One alternative would be just to accept that this ObservableDisposable type has suboptimal performance when used as the source for other Rx operators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the conflict the class being an obserable AND a disposable.
- If it will be a class or a
Disposable.Observable()factory method, it will be be the first creation method of an observable outside of theObservableclass (and QueryLanguage, if i see it right). - If it will be a factory method in the
Observableclass, it will be the first disposable outside of the Dispoable namespace.
Would it be an option to go both ways to create such object?
I also used Subject here to not have to worry about the list of observers. I had a go at implementing ObservableDisposable as Producer and while this is great, I now have a list of sinks to notify and worry about. But maybe there is a better solution?
Since this Observable would only send one OnNext and one OnComplete notification, I think the suboptiomal performance is acceptable.
| /// <inheritdoc/> | ||
| public void Dispose() | ||
| { | ||
| if (Volatile.Read(ref _disposed) == false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if it's your intention for this code to work in the face of concurrent calls to Dispose. The use of Volatile.Read does suggest that this is the goal, although maybe this is just intending to handle correctly strictly sequential calls to Dispose that happen on different threads.
If this is meant to handle concurrent calls, I think there's a bug here. If two threads call Dispose simultaneously, it's possible for each of them to complete this Volatile.Read before the other reaches the Volatile.Write. So both threads will end up calling OnNext and OnCompleted. We will have violated the IObserver<T> contract:
- A call to
OnCompletedfollowed by further calls (at least one moreOnCompleted, and possibly a call toOnNextafter a call toOnCompleted) - Potentiall starting a new calls into an
IObserver<T>when another call was already in progress and had not returned
There are two ways to deal with this:
- declare that this is the caller's problem
- implement something more robust
- rely on the fact that Rx will treat the
Subject<T>as a suspect external source, and it will actually block illegal call sequences
We can rule out 3. Rx helps out applications that make such mistakes, up to a point, but we certainly shouldn't be relying on these safety guards in Rx's own code. In any case, while it would detect and block the IObservable<T> calls that occur after the first OnCompleted I don't think it would help at all with concurrent calls.
Option 1 actually has its charms (and may be what you intended, in which case the XML doc comments should definitely clarify that instances of this class are not safe for concurrent use; someone looking at the source code might see Volatile and assume that it is designed for multithreaded use). It's basically our official position for any IObserver<T> we hand out: concurrent calls are not supported, and you get undefined behaviour if you make them. In that case, this method could be simplified: you'd remove the use of Volatile entirely and make no attempt to handle concurrent use.
However, although that's reasonable for IObservable<T> (because there are well defined rules restricting its use) arguably it isn't for IDisposable. It's not unreasonable to want an IDisposable that tolerates concurrent calls to Dispose.
So that would leave 2. But although this could probably be fixed by using some sort of interlocked exchange instead of simply a volatile read, but in practice, I think if this were all changed to fit the IProducer<T> pattern, and if you used one of the Sink<...> types normally used as part of such an implementation, we'd need to look at how concurrent calls to Dispose interact with the existing disposal handling in there. This already uses Interlocked.Exchange to prevent double disposal, so it's possible you could take advantage of that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Interlocked.Exchange seems to be the better solution.
| Volatile.Write(ref _disposed, true); | ||
| _disposedNotification = null!; | ||
| } | ||
| GC.SuppressFinalize(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a sealed class without a finalizer, I don't see how instances of this type can ever end up on the finalization queue, so this won't do anything will it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well I shouls discard this call then... 🤦
| public sealed class ObservableDisposable : ICancelable, IObservable<Unit> | ||
| { | ||
| private Subject<Unit> _disposedNotification = new(); | ||
| private bool _disposed; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell, you don't really need this, because when the object is in a disposed state, the _disposedNotification field is null. Removing this field, and just relying on the null/not-null state of _disposedNotification would simplify things because it removes an intermediate state where Dispose has changed this flag but hasn't yet got around to nulling out the _disposedNotification field.
We use a similar technique in the Sink<...> base class that almost all of our operators use in their implementation: we use a single field to a) hold the downstream observer and b) know whether we've been disposed. (We use a sentinel observer rather than null to signify disposal, but the basic principle is the same.)
However, as noted elsewhere, perhaps this should be using Sink and Producer, at which point you might be able to defer entirely to its disposal handling.
| .Subscribe(_ => { Assert.False(disp1); disp1 = true; }); | ||
|
|
||
| sub.Dispose(); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this also dispose the observableDiposable?
You've checked that the act of unsubscribing doesn't cause our subscriber to be notified. But I think we also want to check that unsubscribing means we no longer get notified when the thing that normally causes notification happens.
| { | ||
| if (Volatile.Read(ref _disposed) == false) | ||
| { | ||
| _disposedNotification.OnNext(Unit.Default); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My assumption was that this source would simply complete upon disposal. I wasn't expecting it to produce a value too.
Did you consider both options? I can see either design choice as being valid, and it's not obvious to me which is 'best'. If you thought of both, what made you choose this one.
In any case, the XML doc comments should say what the behaviour is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since I use it mostly with the TakeUntil Operator, I want to produce a value. I added OnComplete, because I think it is good behavior to complete a created stream.
I also like to support both scenarios.
|
|
||
| var observableDisposable = new ObservableDisposable(); | ||
|
|
||
| var sub = observableDisposable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should test that we also get the completion notification, as well as the OnNext.
For completeness, we could also verify that we don't get an OnError.
|
@nilsauf Just in case the utility of this PR is found to be not broad enough for an addition to the API, you could rework this into a |
|
@idg10 Thank you! As a consumer of this package for some years now, I appreciate this valuable insights. This is so interesting 😊 Well, this code looks like it is consuming Rx because it always was 😉 I will refactor it with your suggestions. I mostly use it with EventPattern Observables (from external "old-fashioned" events) and in front of I hope the following snippet will clarify my use case, I think it is very clean this way. I will address your other points in their own comment threads. @danielcweber Your suggestion to my use case is also very nice. I would love to add this operator, if it is considered in favor of ObservableDisposable. |
|
If this is the usage model, then I think @danielcweber's suggestion is an improvement over the The reason I prefer it is that your usage model is an adaptation of an existing I apologise for the ramble that follows, but if we're introducing something new, I think it's really important to consider all the options. So I wanted to think about a few other ways we could approach this, so we can be confident that a) there isn't already a better way to solve this problem and b) that we've got the best solution. I briefly wondered if it should be called Two other options have occurred to me. One is that we could model this as a Subject, e.g. we could introduce a The second is that we could define I have one nagging doubt about the whole concept for this though (regardless exactly how we would present it in the API). Seeing your usage model, this reminds me of things Rx already does to ensure that we don't emit notifications after unsubscription. When So I'm getting a feeling of "don't we already have this mechanism?" I feels like it might be one of those situations where there's a slightly different way of solving the problem that avoids needing this new mechanism entirely. The thing you're looking to achieve here is a consumer-side thing: essentially you're saying "I no longer wish to hear from this source". But you're modelling it as an E.g., think about So having an observable that can be put into a state where all further new subscriptions instantly get unsubscribed makes me slightly uneasy. Now admittedly there's a precedent: this is precisely what So right now, I'm still leaning towards |
|
Yes, I also prefer the new Actually Thank you both for your ideas and insight into this library! I will close this PR and open an issue about the new |
|
FYI, this is available to test on the Rx.NET nuget feed. If you add this <?xml version="1.0" encoding="utf-8"?>
<configuration>
<packageSources>
<clear />
<add key="nuget.org" value="https://api.nuget.org/v3/index.json" />
<add key="Rx.NET" value="https://pkgs.dev.azure.com/dotnet/Rx.NET/_packaging/RxNet/nuget/v3/index.json" />
</packageSources>
</configuration>and then add a reference to this specific version (there are versions on that feed that appear to have a later version number, but which don't have this feature): ``xml |
Hej,
this PR adds the class ObservableDisposable, which allows to observe when the disposable will be disposed.
This comes in handy when you want to end streams if the parent object is disposed.
I use it in serveral of my projects and want to share it, albeit the implementation might not be perfect. (I am also here to learn 😄)
Example: