Skip to content

Possible concurrency issue with .refCount()? (2.2.0) #6185

@BernhardReu

Description

@BernhardReu

First, I want to thank you for this wonderful library which I gladly use in many professional applications!

While I was writing a component for one of our company applications, I wrote a multithreaded stress test, because the component itself introduces shared state. Sometimes the test failed. Of course, I suspected my component to be the culprit. After further investigation I could completely rule out my own component and still reproduce the problem.
I think there could be a problem with the refCount() operator.

Less words, more code:

The problem is reproducible with the following unit test (well, at least as reproducible as a test in a multithreaded manner can be...)

public void replayRefCountShallBeThreadSafe() {
        for (int i = 0; i < 10000; i++) {
            Observable<Object> observable = Observable.just(new Object()).replay(1).refCount();

            TestObserver<Object> observer1 = observable
                    .subscribeOn(Schedulers.io())
                    .test();

            TestObserver<Object> observer2 = observable
                    .subscribeOn(Schedulers.io())
                    .test();

            assertThat(observer1.awaitTerminalEvent(5, TimeUnit.SECONDS), is(true));
            assertThat(observer2.awaitTerminalEvent(5, TimeUnit.SECONDS), is(true));
        }
}

As you can see, I am repeating the logic 10000 times. This (mostly) guarantees that the test fails on my environment. One of the observers does NOT receive the completed event and therefore the awaitTerminalEvent times out.

On the other hand, the following unit test where I manually .connect() the ConnectableObservable returned vom .replay(1) passes:

public void replayShallBeThreadSafe() {
        for (int i = 0; i < 10000; i++) {
            ConnectableObservable<Object> observable = Observable.just(new Object()).replay(1);
            Disposable connection = observable.connect();

            TestObserver<Object> observer1 = observable
                    .subscribeOn(Schedulers.io())
                    .test();

            TestObserver<Object> observer2 = observable
                    .subscribeOn(Schedulers.io())
                    .test();

            assertThat(observer1.awaitTerminalEvent(5, TimeUnit.SECONDS), is(true));
            assertThat(observer2.awaitTerminalEvent(5, TimeUnit.SECONDS), is(true));
            connection.dispose();
        }
}

Please tell me if I am missing something...

Version: RxJava 2.2.0
Tests are JUnit tests, with hamcrest assertions.

Thanks!

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions