-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Closed
Description
using Observable#mergeWith(CompletableSource), upstream occurs an error, but the ComletableSource won't be disposed.
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
Schedulers.io().scheduleDirect(
() -> emitter.tryOnError(new Throwable("occurs error")),
1, TimeUnit.SECONDS);
}
})
.mergeWith(
Completable.create(emmit ->
emmit.setCancellable(() -> System.out.println("mergeWith dispose"))
)
.doOnSubscribe(dis -> System.out.println("mergeWith doOnSubscribe"))
)
.subscribe(ob -> {}, System.out::println);the result is :
mergeWith doOnSubscribe
java.lang.Throwable: occurs error
but if add 'toObservable()'
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
Schedulers.io().scheduleDirect(() -> emitter.tryOnError(new Throwable("occurs error")), 1, TimeUnit.SECONDS);
}
})
.mergeWith(Completable.create(emmit ->
emmit.setCancellable(() -> System.out.println("mergeWith dispose")))
.doOnSubscribe(dis -> System.out.println("mergeWith doOnSubscribe")).toObservable())
.subscribe(ob -> {}, System.out::println);
the result is:
mergeWith doOnSubscribe
mergeWith dispose
java.lang.Throwable: occurs error
The 'dispose' will be invoked。
In case of first,it will produce a memory leak.
Is that a bug?