Skip to content

2.x: inconsistent error handling in onErrorResumeNext after completion #6571

@davideberlein

Description

@davideberlein

Tested using RxJava v2.2.10

Use Case:

We want to handle a known exception that can occur anywhere in a long observable stream using onErrorResumeNext.
The Exception is thrown due to async behavior: A resource is closed which triggers the a takeUntil however due to async operation (observeOn) in the stream, an operator may still access the closed resource causing the exception.

Problem

  1. The observable emits a value
  2. During the execution of an operator A (in this case map) the takeUntil is triggered
  3. The subscription completes
  4. Operator A throws an Exception
  5. XXX

Depending on which operators are between the operator that throws the exception and the onErrorResumeNext RX behaves differently in step 5, not allowing us to properly handle the Error:

  • doOnNext, map, concatMap and probably most others: onErrorResumeNext is called and handles the error
  • switchMap: the Exception is completely swallowed, neither onErrorResumeNext is called nor an UndeliverableException is thrown or printed anywhere.
  • flatMap: A UndeliverableException is thrown.

For us all cases except for the flatMap behavior is okay.

Example:

final Subject<String> takeUntil$ = BehaviorSubject.create();
Observable.just("Some value")
		.map(e -> {
			takeUntil$.onNext("complete");
			throw new RuntimeException("Exception after unsubscribe");
		})
		// Add any of these to test the different cases:
		//.doOnNext(e -> System.out.println("On Next"))
		//.map(v -> v)
		//.concatMap(o -> Observable.just(o))
		//.switchMap(o -> Observable.just(o))
		.flatMap(o -> Observable.just(o))
		.onErrorResumeNext(e -> {
			System.out.println(System.currentTimeMillis() + " onErrorResumeNext GOT ERROR: " + e);
			return Observable.empty();
		})
		.takeUntil(takeUntil$)
		.subscribe(v -> System.out.println("GOT NEXT" + v), v -> System.out.println("Subscribe GOT ERROR" + v), () -> System.out.println(System.currentTimeMillis() + " COMPLETED"));

Exception Printed with flatMap:

io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.RuntimeException: Exception after unsubscribe
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:349)
    at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onError(ObservableFlatMap.java:290)
    at io.reactivex.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:100)
    at io.reactivex.internal.observers.BasicFuseableObserver.fail(BasicFuseableObserver.java:110)
    at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:61)
    at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
    at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
    at io.reactivex.Observable.subscribe(Observable.java:10981)
    at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
    at io.reactivex.Observable.subscribe(Observable.java:10981)
    at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55)
    at io.reactivex.Observable.subscribe(Observable.java:10981)
    at io.reactivex.internal.operators.observable.ObservableOnErrorNext.subscribeActual(ObservableOnErrorNext.java:38)
    at io.reactivex.Observable.subscribe(Observable.java:10981)
    at io.reactivex.internal.operators.observable.ObservableTakeUntil.subscribeActual(ObservableTakeUntil.java:41)
    at io.reactivex.Observable.subscribe(Observable.java:10981)
    at io.reactivex.Observable.subscribe(Observable.java:10967)
    at io.reactivex.Observable.subscribe(Observable.java:10927)
    at test.Main.main(Main.java:29)
Caused by: java.lang.RuntimeException: Exception after unsubscribe
    at test.Main.lambda$0(Main.java:17)
    at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:59)
    ... 14 more```

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions