-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
It looks like SinkOneMulticast keeps track of subscribers even after the disposition of corresponding subscriptions.
Here is the repro:
// Create a SinkOne
Sinks.One<Boolean> signalSink = Sinks.one();
// First set of 3 subscriptions (A1, A2, A3)
//
Disposable disposableA1 = signalSink.asMono()
.doOnSuccess(b -> {
System.out.println("doOnSuccessA1");
})
.doFinally(signalType -> {
System.out.println("doFinallyA1:" + signalType);
})
.subscribe();
Disposable disposableA2 = signalSink.asMono()
.doOnSuccess(b -> {
System.out.println("doOnSuccessA2");
})
.doFinally(signalType -> {
System.out.println("doFinallyA2:" + signalType);
})
.subscribe();
Disposable disposableA3 = signalSink.asMono()
.doOnSuccess(b -> {
System.out.println("doOnSuccessA3");
})
.doFinally(signalType -> {
System.out.println("doFinallyA3:" + signalType);
})
.subscribe();
// Composite Disposable for first set (A1, A2, A3)
Disposable.Composite disposablesA = Disposables.composite();
disposablesA.add(disposableA1);
disposablesA.add(disposableA2);
disposablesA.add(disposableA3);
// Second set of 3 subscriptions (B1, B2, B3)
//
Disposable disposableB1 = signalSink.asMono()
.doOnSuccess(b -> {
System.out.println("doOnSuccessB1");
})
.doFinally(signalType -> {
System.out.println("doFinallyB1:" + signalType);
})
.subscribe();
Disposable disposableB2 = signalSink.asMono()
.doOnSuccess(b -> {
System.out.println("doOnSuccessB2");
})
.doFinally(signalType -> {
System.out.println("doFinallyB2:" + signalType);
})
.subscribe();
Disposable disposableB3 = signalSink.asMono()
.doOnSuccess(b -> {
System.out.println("doOnSuccessB3");
})
.doFinally(signalType -> {
System.out.println("doFinallyB3:" + signalType);
})
.subscribe();
// Composite Disposable for second set (B1, B2, B3)
Disposable.Composite disposablesB = Disposables.composite();
disposablesB.add(disposableB1);
disposablesB.add(disposableB2);
disposablesB.add(disposableB3);As expected SinkOneMulticast tracks 6 subscribers (3 from the first and 3 from the second set)
Now dispose of the first set (A1, A2, A3)
// Dispose only the first set
disposablesA.dispose();doFinally is invoked for the first set as expected
doFinallyA1:cancel
doFinallyA2:cancel
doFinallyA3:cancel
but SinkOneMulticast won't release those 3 subscribers but continues to hold reference to those
SinkOneMulticast keeps holding those references until the sink emits
signalSink.tryEmitValue(true);
The above behavior of SinkOneMulticast holding references to disposed subscribers blocks its GC collection. This leads to memory leaks in the applications.
The application might have different components listening (subscribed) to the SinkOne. Some of the components may not be interested in the signal at a later point and dispose the subscriptions (for example, in the component close() method). When huge number of such components are created and closed, those subscribers in SinkOne leads to memory leak.


