Skip to content

Conversation

@nilsauf
Copy link
Contributor

@nilsauf nilsauf commented Nov 24, 2024

Hej,

this PR adds the class ObservableDisposable, which allows to observe when the disposable will be disposed.

This comes in handy when you want to end streams if the parent object is disposed.
I use it in serveral of my projects and want to share it, albeit the implementation might not be perfect. (I am also here to learn 😄)

Example:

private ObservableDisposable observableDisposable = new();

IObservbale<T1> oldObservable = ... ;
IObservbale<T2> newObservable = oldObservable
	.TakeUntil(this.observableDisposable)
	.Select(t1 => */ read data /*)
	.Publish()
	.RefCount(


public void Dispose() => this.observableDisposable.Dispose()

@nilsauf
Copy link
Contributor Author

nilsauf commented Nov 24, 2024

@dotnet-policy-service agree

Copy link
Collaborator

@idg10 idg10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this offering.

You said you were "here to learn." Brace yourself! 😊

I have one basic issue with this that I'd want to be addressed before we could merge this, which is that this currently looks more like code from an application consuming Rx than it does like Rx internals. I explain in the comments in this review why we normally implement observables in a particular way that looks different from how you'd normally do it as an Rx consumer.

I think there are also questions over exactly how this functionality should look in Rx's public API that need to be resolved. The current approach might be incompatible with addressing the preceding point.

I also have a broader question: how often have you needed this? Any time we add new surface area to Rx, that's more support overhead, more potential bugs, more surface area for security flaws. To be fair, this one doesn't look likely to be a big problem on any of those fronts, so I'm really just asking: how useful is this? I don't think it's something we've had a request for before. (That's not a "no thanks" by the way. But if you've needed this a lot, that's definitely useful to know.)

It's not obvious to me how I would use this. The cases where I typically want an IDisposable are where some method does something for the duration of some scope and then needs to clean up. These are inherently synchronous scenarios. (Even in async methods, there's still a basic "get something, use it, dispose it" sequence that is very different from the kinds of ongoing processes that Rx typically models.) So I don't yet get what this is really for.

It would be especially helpful if you could post a couple of code snippets illustrating the kinds of scenarios in which you would use this.

/// </summary>
public sealed class ObservableDisposable : ICancelable, IObservable<Unit>
{
private Subject<Unit> _disposedNotification = new();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although Subject<T> is a good way for consumers of Rx to get an IObservable<T> implementation, we tend not to use it internally for efficiency reasons.

If you look at pretty much any operator, you'll see that the IObservable<T> implementation they provide is usually based on one of the IProducer<T> implementations in Producer.cs

The reason for this is that when chaining together a sequence of operators (and most applications that use Rx rarely use just a single operator) we make a distinction between ones that are implemented by Rx, and IObservable<T> implementations where we don't know where they come from.

To ensure reliability, Rx is cautious when dealing with sources of unknown origin. It does not presume they will necessarily comply fully with the laws of Rx. We create wrappers around external observable sources to detect and handle anomalous behaviour, and we also do the same with external observers.

But with something like this:

// the details of this are nonsense - the point is we have a load of operators
// chained together
IObservable<double> r = src
    .Where(x => x.Name[0] == 'I')
    .OrderBy(x => x.Age)
    .Skip(1)
    .Select(x => x.Height)
    .Average();

the thing we don't want to do is have these safeguarding wrappers between every stage. We don't want this:

src -> guard -> Where -> guard -> OrderBy -> Skip -> guard -> Select -> guard -> Average -> guard

We want this:

src -> guard -> Where -> OrderBy -> Skip -> Select -> Average -> guard

We need those safeguards on the input side (assuming src here is something outside of Rx's control) and also on the output side (because who knows what exceptions might emerge from an application-supplied IObserver<T>? we need to ensure Rx's state remains coherent if the app does something odd). But we really don't need them between each and every operator, because we know that for every operator we've implemented, as long as its inputs are well behaved, its outputs will be well behaved. Similarly, as long as it is talking to a well-behaved IObserver<T>, the IObserver<T> that each of our operators presents to its predecessor will also be well behaved.

So the basic strategy is that Rx operators all recognize when they've been connected to another Rx operator, and they can bypass the relatively expensive safeguards that we need when interfacing with external code.

And although Subject<T> is an Rx library feature, it is designed very much to be used at the boundaries between Rx and application code, so we don't treat it as one of our recognized trusted operators.

So, taking that background into account, I think I would want to see this type implemented more like one of the other built-in Rx operators.

I say "I think" because one thing about the way we normally do things is that we mostly avoid making the implementing type of an IObservable<T> visible. Although when you use, say, Any, you're getting a type that derives from Producer<bool, ...>, that implementing type is hidden from you. We just expose an IObservable<T>. (And in fact Any doesn't always return the same type. Like a lot of our operators, we select between a number of different implementation strategies depending on inputs. And we want to retain the flexibility to change the set of strategies available, which means we can't have the concrete type be visible at the API. Also, our IProducer<T> base classes are all internal which prevents anything deriving from them from being public types.)

But the usage model here is quite simply:

new ObservableDisposable()

This is consistent with many of our other Disposables. But it is not consistent with most of our IObservable<T>s. You've created a conundrum here by coming up with a type that is both disposable and observable. We have different common practices for each category, so what is the correct choice for something that is in both categories?

I'm inclined to say this should work more like Disposable.Empty: that gives you an IDisposable whose concrete type is a private class. So with your observable disposable, we might have a factory method like Disposable.Observable(). For that to work, we'd need also to introduce an ICancelableObservable<T> interface that inherits both interfaces, and use ICancelableObservable<Unit> as the return type of that factory method.

One alternative would be just to accept that this ObservableDisposable type has suboptimal performance when used as the source for other Rx operators.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the conflict the class being an obserable AND a disposable.

  • If it will be a class or a Disposable.Observable() factory method, it will be be the first creation method of an observable outside of the Observable class (and QueryLanguage, if i see it right).
  • If it will be a factory method in the Observable class, it will be the first disposable outside of the Dispoable namespace.

Would it be an option to go both ways to create such object?

I also used Subject here to not have to worry about the list of observers. I had a go at implementing ObservableDisposable as Producer and while this is great, I now have a list of sinks to notify and worry about. But maybe there is a better solution?

Since this Observable would only send one OnNext and one OnComplete notification, I think the suboptiomal performance is acceptable.

/// <inheritdoc/>
public void Dispose()
{
if (Volatile.Read(ref _disposed) == false)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it's your intention for this code to work in the face of concurrent calls to Dispose. The use of Volatile.Read does suggest that this is the goal, although maybe this is just intending to handle correctly strictly sequential calls to Dispose that happen on different threads.

If this is meant to handle concurrent calls, I think there's a bug here. If two threads call Dispose simultaneously, it's possible for each of them to complete this Volatile.Read before the other reaches the Volatile.Write. So both threads will end up calling OnNext and OnCompleted. We will have violated the IObserver<T> contract:

  • A call to OnCompleted followed by further calls (at least one more OnCompleted, and possibly a call to OnNext after a call to OnCompleted)
  • Potentiall starting a new calls into an IObserver<T> when another call was already in progress and had not returned

There are two ways to deal with this:

  1. declare that this is the caller's problem
  2. implement something more robust
  3. rely on the fact that Rx will treat the Subject<T> as a suspect external source, and it will actually block illegal call sequences

We can rule out 3. Rx helps out applications that make such mistakes, up to a point, but we certainly shouldn't be relying on these safety guards in Rx's own code. In any case, while it would detect and block the IObservable<T> calls that occur after the first OnCompleted I don't think it would help at all with concurrent calls.

Option 1 actually has its charms (and may be what you intended, in which case the XML doc comments should definitely clarify that instances of this class are not safe for concurrent use; someone looking at the source code might see Volatile and assume that it is designed for multithreaded use). It's basically our official position for any IObserver<T> we hand out: concurrent calls are not supported, and you get undefined behaviour if you make them. In that case, this method could be simplified: you'd remove the use of Volatile entirely and make no attempt to handle concurrent use.

However, although that's reasonable for IObservable<T> (because there are well defined rules restricting its use) arguably it isn't for IDisposable. It's not unreasonable to want an IDisposable that tolerates concurrent calls to Dispose.

So that would leave 2. But although this could probably be fixed by using some sort of interlocked exchange instead of simply a volatile read, but in practice, I think if this were all changed to fit the IProducer<T> pattern, and if you used one of the Sink<...> types normally used as part of such an implementation, we'd need to look at how concurrent calls to Dispose interact with the existing disposal handling in there. This already uses Interlocked.Exchange to prevent double disposal, so it's possible you could take advantage of that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Interlocked.Exchange seems to be the better solution.

Volatile.Write(ref _disposed, true);
_disposedNotification = null!;
}
GC.SuppressFinalize(this);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a sealed class without a finalizer, I don't see how instances of this type can ever end up on the finalization queue, so this won't do anything will it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I shouls discard this call then... 🤦

public sealed class ObservableDisposable : ICancelable, IObservable<Unit>
{
private Subject<Unit> _disposedNotification = new();
private bool _disposed;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, you don't really need this, because when the object is in a disposed state, the _disposedNotification field is null. Removing this field, and just relying on the null/not-null state of _disposedNotification would simplify things because it removes an intermediate state where Dispose has changed this flag but hasn't yet got around to nulling out the _disposedNotification field.

We use a similar technique in the Sink<...> base class that almost all of our operators use in their implementation: we use a single field to a) hold the downstream observer and b) know whether we've been disposed. (We use a sentinel observer rather than null to signify disposal, but the basic principle is the same.)

However, as noted elsewhere, perhaps this should be using Sink and Producer, at which point you might be able to defer entirely to its disposal handling.

.Subscribe(_ => { Assert.False(disp1); disp1 = true; });

sub.Dispose();

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this also dispose the observableDiposable?

You've checked that the act of unsubscribing doesn't cause our subscriber to be notified. But I think we also want to check that unsubscribing means we no longer get notified when the thing that normally causes notification happens.

{
if (Volatile.Read(ref _disposed) == false)
{
_disposedNotification.OnNext(Unit.Default);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My assumption was that this source would simply complete upon disposal. I wasn't expecting it to produce a value too.

Did you consider both options? I can see either design choice as being valid, and it's not obvious to me which is 'best'. If you thought of both, what made you choose this one.

In any case, the XML doc comments should say what the behaviour is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I use it mostly with the TakeUntil Operator, I want to produce a value. I added OnComplete, because I think it is good behavior to complete a created stream.

I also like to support both scenarios.


var observableDisposable = new ObservableDisposable();

var sub = observableDisposable
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should test that we also get the completion notification, as well as the OnNext.

For completeness, we could also verify that we don't get an OnError.

@danielcweber
Copy link
Collaborator

danielcweber commented Dec 6, 2024

@nilsauf Just in case the utility of this PR is found to be not broad enough for an addition to the API, you could rework this into a TakeUntil(CancellationToken ct)-operator that'll complete the upstream source whenever the CancellationToken signals its cancellation. In combination with the already existing CancellationDisposable, this would do what you need (I guess) and its scope might be considered a bit broader (also just a feeling).

@nilsauf
Copy link
Contributor Author

nilsauf commented Dec 8, 2024

@idg10 Thank you! As a consumer of this package for some years now, I appreciate this valuable insights. This is so interesting 😊

Well, this code looks like it is consuming Rx because it always was 😉 I will refactor it with your suggestions.

I mostly use it with EventPattern Observables (from external "old-fashioned" events) and in front of Publish().RefCount(). Since the EventPattern Observables will never send a OnComplete, I want to complete it myself.

I hope the following snippet will clarify my use case, I think it is very clean this way.

public class ExampleClass : IDisposable
{
    private readonly IObservable<T2> exampleStream;
    private readonly ObservableDisposable observableDisposable = new();

    public ExampleClass(IObservable<T1> eventStream)
    {
        exampleStream = eventStream
            .TakeUntil(this.observableDisposable)
	    .Select(t1 => */ read data /*)
	    .Publish()
	    .RefCount();
    }

    public IObservable<T2> Observe() => this.exampleStream;

    public void Dispose() => this.observableDisposable.Dispose()
}

I will address your other points in their own comment threads.

@danielcweber Your suggestion to my use case is also very nice. I would love to add this operator, if it is considered in favor of ObservableDisposable.

@idg10
Copy link
Collaborator

idg10 commented Dec 9, 2024

If this is the usage model, then I think @danielcweber's suggestion is an improvement over the ObservableDisposable concept.

The reason I prefer it is that your usage model is an adaptation of an existing IObservable<T>: you have an IObservable<T> going in and an IObservable<T> coming out. This sort of thing is normally modelled as an operator in Rx, so this looks like it might be the right way to handle this particular scenario.

I apologise for the ramble that follows, but if we're introducing something new, I think it's really important to consider all the options. So I wanted to think about a few other ways we could approach this, so we can be confident that a) there isn't already a better way to solve this problem and b) that we've got the best solution.

I briefly wondered if it should be called TakeUntilCancelled, but TakeUntil already offers a few overloads that accept different kinds of temporal condition, so adding one more seems reasonable. Currently, TakeUntil can use a DateTimeOffset, or a predicate that inspects each object emerging from the source, or some other IObservable<T>. So the idea of adding an overload that accepts a CancellationToken doesn't seem like a huge leap.

Two other options have occurred to me.

One is that we could model this as a Subject, e.g. we could introduce a CancellableSubject<T>. With the initial design I was slightly uneasy about a Disposable implementing IObservable<T>. A Subject seems like a better way to approach it, because your use case is basically forwarding values from some underlying source, but with some additional behaviour around how that is done. My main worry with this, though, is that since SubjectBase<T> already implements IDisposable, the usage model might not be obvious - people might expect to be able to cancel the subject with Dispose, but that actually causes all further calls to OnXxx to throw an ObjectDisposedException.

The second is that we could define AsObservable extension methods for CancellationToken. Just like we have AsObservable extensions for Task, we could define them for CancellationToken. In a way, this feels like a more fundamental mechanism. But I'm not sure exactly what the design would look like. To be useful for the TakeUntil scenario you need to be able to produce a single value upon cancellation (because TakeUntil ignores a source that completes without producing a value). So you might want to be able to write ct.AsObservable(Unit.Default). But perhaps it should be able to take any observable, so maybe you'd want to be able to write ct.AsObservable(Observable.Return(Unit.Default)). I don't think I like the idea of a zero-args ct.AsObservable() that just produces a single Unit, because it doesn't seem obvious to me that that's what it would do.

I have one nagging doubt about the whole concept for this though (regardless exactly how we would present it in the API).

Seeing your usage model, this reminds me of things Rx already does to ensure that we don't emit notifications after unsubscription. When Dispose is called on a subscription, we 'quench' all further notifications by putting the subscription into a state where it will not call any more methods on the subscriber's IObserver<T>. (We do this before tearing down everything inside the subscription, and it prevents spurious further notifications emerging while we shut things down.)

So I'm getting a feeling of "don't we already have this mechanism?" I feels like it might be one of those situations where there's a slightly different way of solving the problem that avoids needing this new mechanism entirely.

The thing you're looking to achieve here is a consumer-side thing: essentially you're saying "I no longer wish to hear from this source". But you're modelling it as an IObservable<T>. I tend to think about observables as being a potential stream of values, and a subscription as being one particular realisation of that potential. So it feels slightly weird to have an observable change its state just because a consumer is now done with it. That feels like it's a subscription concern, not an observable concern.

E.g., think about Observable.Timer. When you create one of those, you don't actually start a timer. You build a description of a timer, and it's only when you subscribe that an actual timer comes into existence. If you subscribe a bunch of times you get a bunch of timers. And each subscriber gets to say individually "I'm done now, thanks." If you want to share you can use Publish but the standard semantics are that each subscription has its own lifetime.

So having an observable that can be put into a state where all further new subscriptions instantly get unsubscribed makes me slightly uneasy. Now admittedly there's a precedent: this is precisely what AsyncSubject<T> does, which would be a point in favour of modelling this idea as a CancellableSubject I suppose. And actually, thinking about it, the TakeUntil overload that takes a DateTimeOffset already has this characteristic: once the specified time has passed, all subsequent subscriptions to that source will get no elements. So there is a precedent there.

So right now, I'm still leaning towards TakeUntil as the answer.

@nilsauf
Copy link
Contributor Author

nilsauf commented Dec 10, 2024

Yes, I also prefer the new TakeUntil operator now.

Actually CancellationDisposable is doing the same as my original proposal, just not as IObservable. And I think the new operator could be used in much more scenarios.

Thank you both for your ideas and insight into this library!

I will close this PR and open an issue about the new TakeUntil operator, to discuss details about it.

@idg10
Copy link
Collaborator

idg10 commented Sep 3, 2025

FYI, this is available to test on the Rx.NET nuget feed. If you add this NuGet.config to your solution folder:

<?xml version="1.0" encoding="utf-8"?>
<configuration>
  <packageSources>
    <clear />
    <add key="nuget.org" value="https://api.nuget.org/v3/index.json" />
    <add key="Rx.NET" value="https://pkgs.dev.azure.com/dotnet/Rx.NET/_packaging/RxNet/nuget/v3/index.json" />
  </packageSources>
</configuration>

and then add a reference to this specific version (there are versions on that feed that appear to have a later version number, but which don't have this feature):

``xml



then the following works:

```cs
using System.Reactive.Linq;

CancellationTokenSource cs = new();

Observable
    .Interval(TimeSpan.FromSeconds(0.5))
    .TakeUntil(cs.Token)
    .Subscribe(Console.WriteLine);

Console.WriteLine("Subscribed. Press any key to unsubscribe.");

Console.ReadKey();
cs.Cancel();


Console.WriteLine("Unsubscribed. Press any key to exit.");
Console.ReadKey();

@idg10 idg10 added the [area] Rx label Sep 3, 2025
@idg10 idg10 added this to the Rx 6.1 milestone Sep 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants