-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Description
Not sure if this intentional or a bug. The following code:
package Test;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.Single;
public class Test {
public static void main(String[] args) {
// Observable.just(0,1) // UndeliverableException - First concatMapSingle-block throws, interrupting second one
Observable.just(2, 0) // Just "MyError" output, exception in first block appears to vanish
.concatMapSingle(workItem2 -> {
return Single.just(workItem2).subscribeOn(Schedulers.computation()).map(workItem -> {
try {
Thread.sleep(1000);
if (workItem == 1)
throw new Exception("Something in first block failed");
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("InterruptedException in block 1");
throw e;
}
return workItem;
});
}).concatMapSingle(workItem2 -> {
return Single.just(workItem2).subscribeOn(Schedulers.computation()).map(workItem -> {
try {
Thread.sleep(1000);
if (workItem == 2) {
throw new Exception("Something in second block failed");
}
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("InterruptedException in block 2");
throw e;
}
return workItem;
});
}).blockingSubscribe(item -> System.out.println("Item finished " + item), err -> {
System.out.println("MyError: " + err.toString());
err.printStackTrace();
});
}
}has 2 processing stages. If the first one throws an exception, the second one gets interrupted and throws an UndeliverableException. In the opposite case, if the 2nd block throws an exception, the 1st one gets interrupted, but its InterruptedException is completely ignored - not delivered to any error handler, nor thrown as UndeliverableException.
This is caused by some kind of race condition within ConcatMapSingleMainObserver. When the 2nd block throws, on the ConcatMapSingleMainObserver instance belonging to the 1st block, dispose is called before innerError. Because at this point errors is empty, addThrowable succeeds, but the drain loop is never called, because dispose has set the AtomicInteger to 1. Therefore, the exception in error is never retrieved and forwarded.
This can be fixed (if it needs fixing, anyways) by having innerError, dispose and the drain loop check for cancelled and forward exceptions to RxJavaPlugins.onError if appropriate. I will make a PR...