-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Description
First, I want to thank you for this wonderful library which I gladly use in many professional applications!
While I was writing a component for one of our company applications, I wrote a multithreaded stress test, because the component itself introduces shared state. Sometimes the test failed. Of course, I suspected my component to be the culprit. After further investigation I could completely rule out my own component and still reproduce the problem.
I think there could be a problem with the refCount() operator.
Less words, more code:
The problem is reproducible with the following unit test (well, at least as reproducible as a test in a multithreaded manner can be...)
public void replayRefCountShallBeThreadSafe() {
for (int i = 0; i < 10000; i++) {
Observable<Object> observable = Observable.just(new Object()).replay(1).refCount();
TestObserver<Object> observer1 = observable
.subscribeOn(Schedulers.io())
.test();
TestObserver<Object> observer2 = observable
.subscribeOn(Schedulers.io())
.test();
assertThat(observer1.awaitTerminalEvent(5, TimeUnit.SECONDS), is(true));
assertThat(observer2.awaitTerminalEvent(5, TimeUnit.SECONDS), is(true));
}
}
As you can see, I am repeating the logic 10000 times. This (mostly) guarantees that the test fails on my environment. One of the observers does NOT receive the completed event and therefore the awaitTerminalEvent times out.
On the other hand, the following unit test where I manually .connect() the ConnectableObservable returned vom .replay(1) passes:
public void replayShallBeThreadSafe() {
for (int i = 0; i < 10000; i++) {
ConnectableObservable<Object> observable = Observable.just(new Object()).replay(1);
Disposable connection = observable.connect();
TestObserver<Object> observer1 = observable
.subscribeOn(Schedulers.io())
.test();
TestObserver<Object> observer2 = observable
.subscribeOn(Schedulers.io())
.test();
assertThat(observer1.awaitTerminalEvent(5, TimeUnit.SECONDS), is(true));
assertThat(observer2.awaitTerminalEvent(5, TimeUnit.SECONDS), is(true));
connection.dispose();
}
}
Please tell me if I am missing something...
Version: RxJava 2.2.0
Tests are JUnit tests, with hamcrest assertions.
Thanks!