-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Closed
Labels
Description
Tested using RxJava v2.2.10
Use Case:
We want to handle a known exception that can occur anywhere in a long observable stream using onErrorResumeNext.
The Exception is thrown due to async behavior: A resource is closed which triggers the a takeUntil however due to async operation (observeOn) in the stream, an operator may still access the closed resource causing the exception.
Problem
- The observable emits a value
- During the execution of an operator A (in this case
map) thetakeUntilis triggered - The subscription completes
- Operator A throws an Exception
- XXX
Depending on which operators are between the operator that throws the exception and the onErrorResumeNext RX behaves differently in step 5, not allowing us to properly handle the Error:
doOnNext,map,concatMapand probably most others:onErrorResumeNextis called and handles the errorswitchMap: the Exception is completely swallowed, neitheronErrorResumeNextis called nor anUndeliverableExceptionis thrown or printed anywhere.flatMap: AUndeliverableExceptionis thrown.
For us all cases except for the flatMap behavior is okay.
Example:
final Subject<String> takeUntil$ = BehaviorSubject.create();
Observable.just("Some value")
.map(e -> {
takeUntil$.onNext("complete");
throw new RuntimeException("Exception after unsubscribe");
})
// Add any of these to test the different cases:
//.doOnNext(e -> System.out.println("On Next"))
//.map(v -> v)
//.concatMap(o -> Observable.just(o))
//.switchMap(o -> Observable.just(o))
.flatMap(o -> Observable.just(o))
.onErrorResumeNext(e -> {
System.out.println(System.currentTimeMillis() + " onErrorResumeNext GOT ERROR: " + e);
return Observable.empty();
})
.takeUntil(takeUntil$)
.subscribe(v -> System.out.println("GOT NEXT" + v), v -> System.out.println("Subscribe GOT ERROR" + v), () -> System.out.println(System.currentTimeMillis() + " COMPLETED"));Exception Printed with flatMap:
io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.RuntimeException: Exception after unsubscribe
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:349)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onError(ObservableFlatMap.java:290)
at io.reactivex.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:100)
at io.reactivex.internal.observers.BasicFuseableObserver.fail(BasicFuseableObserver.java:110)
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:61)
at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
at io.reactivex.Observable.subscribe(Observable.java:10981)
at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
at io.reactivex.Observable.subscribe(Observable.java:10981)
at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55)
at io.reactivex.Observable.subscribe(Observable.java:10981)
at io.reactivex.internal.operators.observable.ObservableOnErrorNext.subscribeActual(ObservableOnErrorNext.java:38)
at io.reactivex.Observable.subscribe(Observable.java:10981)
at io.reactivex.internal.operators.observable.ObservableTakeUntil.subscribeActual(ObservableTakeUntil.java:41)
at io.reactivex.Observable.subscribe(Observable.java:10981)
at io.reactivex.Observable.subscribe(Observable.java:10967)
at io.reactivex.Observable.subscribe(Observable.java:10927)
at test.Main.main(Main.java:29)
Caused by: java.lang.RuntimeException: Exception after unsubscribe
at test.Main.lambda$0(Main.java:17)
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:59)
... 14 more```