Skip to content

Inner errors within concatMapSingle vanish if disposed #6587

@Erlkoenig90

Description

@Erlkoenig90

Not sure if this intentional or a bug. The following code:

package Test;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.Single;

public class Test {
	public static void main(String[] args) {
//		Observable.just(0,1)		// UndeliverableException - First concatMapSingle-block throws, interrupting second one
		Observable.just(2, 0)		// Just "MyError" output, exception in first block appears to vanish
				.concatMapSingle(workItem2 -> {
					return Single.just(workItem2).subscribeOn(Schedulers.computation()).map(workItem -> {
						try {
							Thread.sleep(1000);
							if (workItem == 1)
								throw new Exception("Something in first block failed");
							Thread.sleep(1000);
						} catch (InterruptedException e) {
							System.out.println("InterruptedException in block 1");
							throw e;
						}
						return workItem;
					});
				}).concatMapSingle(workItem2 -> {
					return Single.just(workItem2).subscribeOn(Schedulers.computation()).map(workItem -> {
						try {
							Thread.sleep(1000);
							if (workItem == 2) {
								throw new Exception("Something in second block failed");
							}
							Thread.sleep(1000);
						} catch (InterruptedException e) {
							System.out.println("InterruptedException in block 2");
							throw e;
						}
						return workItem;
					});
				}).blockingSubscribe(item -> System.out.println("Item finished " + item), err -> {
					System.out.println("MyError: " + err.toString());
					err.printStackTrace();
				});
	}
}

has 2 processing stages. If the first one throws an exception, the second one gets interrupted and throws an UndeliverableException. In the opposite case, if the 2nd block throws an exception, the 1st one gets interrupted, but its InterruptedException is completely ignored - not delivered to any error handler, nor thrown as UndeliverableException.

This is caused by some kind of race condition within ConcatMapSingleMainObserver. When the 2nd block throws, on the ConcatMapSingleMainObserver instance belonging to the 1st block, dispose is called before innerError. Because at this point errors is empty, addThrowable succeeds, but the drain loop is never called, because dispose has set the AtomicInteger to 1. Therefore, the exception in error is never retrieved and forwarded.

This can be fixed (if it needs fixing, anyways) by having innerError, dispose and the drain loop check for cancelled and forward exceptions to RxJavaPlugins.onError if appropriate. I will make a PR...

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions