Skip to content

using Observable#mergeWith(CompletableSource), upstream occurs an error, but the ComletableSource won't be disposed #6597

@chxchen

Description

@chxchen

using Observable#mergeWith(CompletableSource), upstream occurs an error, but the ComletableSource won't be disposed.

Observable.create(new ObservableOnSubscribe<Object>() {
    @Override
    public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
        Schedulers.io().scheduleDirect(
            () -> emitter.tryOnError(new Throwable("occurs error")),
            1, TimeUnit.SECONDS);
        }
    })
    .mergeWith(
        Completable.create(emmit ->
            emmit.setCancellable(() -> System.out.println("mergeWith dispose"))
         )
         .doOnSubscribe(dis -> System.out.println("mergeWith doOnSubscribe"))
    )
    .subscribe(ob -> {}, System.out::println);

the result is :

mergeWith doOnSubscribe
java.lang.Throwable: occurs error

but if add 'toObservable()'

Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                Schedulers.io().scheduleDirect(() -> emitter.tryOnError(new Throwable("occurs error")), 1, TimeUnit.SECONDS);
            }
        })
                .mergeWith(Completable.create(emmit ->
                        emmit.setCancellable(() -> System.out.println("mergeWith dispose")))
                        .doOnSubscribe(dis -> System.out.println("mergeWith doOnSubscribe")).toObservable())
                .subscribe(ob -> {}, System.out::println);

the result is:

mergeWith doOnSubscribe
mergeWith dispose
java.lang.Throwable: occurs error

The 'dispose' will be invoked。
In case of first,it will produce a memory leak.
Is that a bug?

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions