1515
1616import static com .google .common .base .Preconditions .checkState ;
1717
18+ import com .google .common .collect .ImmutableList ;
1819import com .google .common .collect .ImmutableSet ;
1920import io .reactivex .rxjava3 .annotations .NonNull ;
2021import io .reactivex .rxjava3 .core .Completable ;
2122import io .reactivex .rxjava3 .core .Single ;
2223import io .reactivex .rxjava3 .core .SingleObserver ;
2324import io .reactivex .rxjava3 .disposables .Disposable ;
24- import io . reactivex . rxjava3 . subjects . AsyncSubject ;
25+ import java . util . ArrayList ;
2526import java .util .HashMap ;
27+ import java .util .List ;
2628import java .util .Map ;
2729import java .util .Optional ;
28- import java .util .concurrent .CancellationException ;
2930import java .util .concurrent .atomic .AtomicBoolean ;
30- import java .util .concurrent .atomic .AtomicInteger ;
31- import java .util .concurrent .atomic .AtomicReference ;
3231import javax .annotation .concurrent .GuardedBy ;
3332import javax .annotation .concurrent .ThreadSafe ;
3433
@@ -55,7 +54,7 @@ public final class AsyncTaskCache<KeyT, ValueT> {
5554 private final Map <KeyT , ValueT > finished ;
5655
5756 @ GuardedBy ("lock" )
58- private final Map <KeyT , Execution < ValueT > > inProgress ;
57+ private final Map <KeyT , Execution > inProgress ;
5958
6059 public static <KeyT , ValueT > AsyncTaskCache <KeyT , ValueT > create () {
6160 return new AsyncTaskCache <>();
@@ -91,79 +90,136 @@ public Single<ValueT> executeIfNot(KeyT key, Single<ValueT> task) {
9190 return execute (key , task , false );
9291 }
9392
94- private static class Execution <ValueT > {
95- private final AtomicBoolean isTaskDisposed = new AtomicBoolean (false );
96- private final Single <ValueT > task ;
97- private final AsyncSubject <ValueT > asyncSubject = AsyncSubject .create ();
98- private final AtomicInteger referenceCount = new AtomicInteger (0 );
99- private final AtomicReference <Disposable > taskDisposable = new AtomicReference <>(null );
93+ /** Returns count of subscribers for a task. */
94+ public int getSubscriberCount (KeyT key ) {
95+ synchronized (lock ) {
96+ Execution task = inProgress .get (key );
97+ if (task != null ) {
98+ return task .getSubscriberCount ();
99+ }
100+ }
101+
102+ return 0 ;
103+ }
104+
105+ class Execution extends Single <ValueT > implements SingleObserver <ValueT > {
106+ private final KeyT key ;
107+ private final Single <ValueT > upstream ;
108+
109+ @ GuardedBy ("lock" )
110+ private boolean terminated = false ;
111+
112+ @ GuardedBy ("lock" )
113+ private Disposable upstreamDisposable ;
100114
101- Execution (Single <ValueT > task ) {
102- this .task = task ;
115+ @ GuardedBy ("lock" )
116+ private final List <SingleObserver <? super ValueT >> observers = new ArrayList <>();
117+
118+ Execution (KeyT key , Single <ValueT > upstream ) {
119+ this .key = key ;
120+ this .upstream = upstream ;
103121 }
104122
105- Single <ValueT > executeIfNot () {
106- checkState (!isTaskDisposed (), "disposed" );
107-
108- int subscribed = referenceCount .getAndIncrement ();
109- if (taskDisposable .get () == null && subscribed == 0 ) {
110- task .subscribe (
111- new SingleObserver <ValueT >() {
112- @ Override
113- public void onSubscribe (@ NonNull Disposable d ) {
114- taskDisposable .compareAndSet (null , d );
115- }
116-
117- @ Override
118- public void onSuccess (@ NonNull ValueT value ) {
119- asyncSubject .onNext (value );
120- asyncSubject .onComplete ();
121- }
122-
123- @ Override
124- public void onError (@ NonNull Throwable e ) {
125- asyncSubject .onError (e );
126- }
127- });
123+ int getSubscriberCount () {
124+ synchronized (lock ) {
125+ return observers .size ();
128126 }
127+ }
128+
129+ @ Override
130+ protected void subscribeActual (@ NonNull SingleObserver <? super ValueT > observer ) {
131+ synchronized (lock ) {
132+ checkState (!terminated , "terminated" );
133+
134+ boolean shouldSubscribe = observers .isEmpty ();
135+
136+ observers .add (observer );
137+
138+ observer .onSubscribe (new ExecutionDisposable (this , observer ));
129139
130- return Single .fromObservable (asyncSubject );
140+ if (shouldSubscribe ) {
141+ upstream .subscribe (this );
142+ }
143+ }
131144 }
132145
133- boolean isTaskTerminated () {
134- return asyncSubject .hasComplete () || asyncSubject .hasThrowable ();
146+ @ Override
147+ public void onSubscribe (@ NonNull Disposable d ) {
148+ synchronized (lock ) {
149+ upstreamDisposable = d ;
150+
151+ if (terminated ) {
152+ d .dispose ();
153+ }
154+ }
135155 }
136156
137- boolean isTaskDisposed () {
138- return isTaskDisposed .get ();
157+ @ Override
158+ public void onSuccess (@ NonNull ValueT value ) {
159+ synchronized (lock ) {
160+ if (!terminated ) {
161+ inProgress .remove (key );
162+ finished .put (key , value );
163+ terminated = true ;
164+
165+ for (SingleObserver <? super ValueT > observer : ImmutableList .copyOf (observers )) {
166+ observer .onSuccess (value );
167+ }
168+ }
169+ }
139170 }
140171
141- void tryDisposeTask () {
142- checkState (!isTaskDisposed (), "disposed" );
143- checkState (!isTaskTerminated (), "terminated" );
172+ @ Override
173+ public void onError (@ NonNull Throwable error ) {
174+ synchronized (lock ) {
175+ if (!terminated ) {
176+ inProgress .remove (key );
177+ terminated = true ;
144178
145- if (referenceCount .decrementAndGet () == 0 ) {
146- isTaskDisposed .set (true );
147- asyncSubject .onError (new CancellationException ("disposed" ));
179+ for (SingleObserver <? super ValueT > observer : ImmutableList .copyOf (observers )) {
180+ observer .onError (error );
181+ }
182+ }
183+ }
184+ }
148185
149- Disposable d = taskDisposable .get ();
150- if (d != null ) {
151- d .dispose ();
186+ void remove (SingleObserver <? super ValueT > observer ) {
187+ synchronized (lock ) {
188+ observers .remove (observer );
189+
190+ if (observers .isEmpty () && !terminated ) {
191+ inProgress .remove (key );
192+ terminated = true ;
193+
194+ if (upstreamDisposable != null ) {
195+ upstreamDisposable .dispose ();
196+ }
152197 }
153198 }
154199 }
155200 }
156201
157- /** Returns count of subscribers for a task. */
158- public int getSubscriberCount (KeyT key ) {
159- synchronized (lock ) {
160- Execution <ValueT > execution = inProgress .get (key );
161- if (execution != null ) {
162- return execution .referenceCount .get ();
202+ class ExecutionDisposable implements Disposable {
203+ final Execution execution ;
204+ final SingleObserver <? super ValueT > observer ;
205+ AtomicBoolean isDisposed = new AtomicBoolean (false );
206+
207+ ExecutionDisposable (Execution execution , SingleObserver <? super ValueT > observer ) {
208+ this .execution = execution ;
209+ this .observer = observer ;
210+ }
211+
212+ @ Override
213+ public void dispose () {
214+ if (isDisposed .compareAndSet (false , true )) {
215+ execution .remove (observer );
163216 }
164217 }
165218
166- return 0 ;
219+ @ Override
220+ public boolean isDisposed () {
221+ return isDisposed .get ();
222+ }
167223 }
168224
169225 /**
@@ -185,62 +241,34 @@ public Single<ValueT> execute(KeyT key, Single<ValueT> task, boolean force) {
185241
186242 finished .remove (key );
187243
188- Execution <ValueT > execution =
189- inProgress .computeIfAbsent (
190- key ,
191- ignoredKey -> {
192- AtomicInteger subscribeTimes = new AtomicInteger (0 );
193- return new Execution <>(
194- Single .defer (
195- () -> {
196- int times = subscribeTimes .incrementAndGet ();
197- checkState (times == 1 , "Subscribed more than once to the task" );
198- return task ;
199- }));
200- });
201-
202- execution
203- .executeIfNot ()
204- .subscribe (
205- new SingleObserver <ValueT >() {
206- @ Override
207- public void onSubscribe (@ NonNull Disposable d ) {
208- emitter .setCancellable (
209- () -> {
210- d .dispose ();
211-
212- if (!execution .isTaskTerminated ()) {
213- synchronized (lock ) {
214- execution .tryDisposeTask ();
215- if (execution .isTaskDisposed ()) {
216- inProgress .remove (key );
217- }
218- }
219- }
220- });
221- }
222-
223- @ Override
224- public void onSuccess (@ NonNull ValueT value ) {
225- synchronized (lock ) {
226- finished .put (key , value );
227- inProgress .remove (key );
228- }
229-
230- emitter .onSuccess (value );
231- }
232-
233- @ Override
234- public void onError (@ NonNull Throwable e ) {
235- synchronized (lock ) {
236- inProgress .remove (key );
237- }
238-
239- if (!emitter .isDisposed ()) {
240- emitter .onError (e );
241- }
242- }
243- });
244+ Execution execution =
245+ inProgress .computeIfAbsent (key , ignoredKey -> new Execution (key , task ));
246+
247+ // We must subscribe the execution within the scope of lock to avoid race condition
248+ // that:
249+ // 1. Two callers get the same execution instance
250+ // 2. One decides to dispose the execution, since no more observers, the execution
251+ // will change to the terminate state
252+ // 3. Another one try to subscribe, will get "terminated" error.
253+ execution .subscribe (
254+ new SingleObserver <ValueT >() {
255+ @ Override
256+ public void onSubscribe (@ NonNull Disposable d ) {
257+ emitter .setDisposable (d );
258+ }
259+
260+ @ Override
261+ public void onSuccess (@ NonNull ValueT valueT ) {
262+ emitter .onSuccess (valueT );
263+ }
264+
265+ @ Override
266+ public void onError (@ NonNull Throwable e ) {
267+ if (!emitter .isDisposed ()) {
268+ emitter .onError (e );
269+ }
270+ }
271+ });
244272 }
245273 });
246274 }
0 commit comments