1313// limitations under the License.
1414package com .google .devtools .build .lib .remote ;
1515
16- import static com .google .common .base .Preconditions .checkArgument ;
17- import static com .google .common .base .Preconditions .checkState ;
16+ import static com .google .common .collect .ImmutableList .toImmutableList ;
17+ import static com .google .common .util .concurrent .Futures .immediateFailedFuture ;
18+ import static com .google .common .util .concurrent .Futures .immediateFuture ;
1819import static com .google .common .util .concurrent .MoreExecutors .directExecutor ;
1920import static com .google .devtools .build .lib .remote .util .RxFutures .toCompletable ;
2021import static com .google .devtools .build .lib .remote .util .RxFutures .toSingle ;
2526import build .bazel .remote .execution .v2 .Digest ;
2627import build .bazel .remote .execution .v2 .Directory ;
2728import com .google .common .base .Throwables ;
29+ import com .google .common .collect .ImmutableList ;
2830import com .google .common .collect .ImmutableSet ;
29- import com .google .common .util .concurrent .Futures ;
3031import com .google .common .util .concurrent .ListenableFuture ;
32+ import com .google .devtools .build .lib .profiler .Profiler ;
33+ import com .google .devtools .build .lib .profiler .SilentCloseable ;
3134import com .google .devtools .build .lib .remote .common .RemoteActionExecutionContext ;
3235import com .google .devtools .build .lib .remote .common .RemoteCacheClient ;
3336import com .google .devtools .build .lib .remote .merkletree .MerkleTree ;
3639import com .google .devtools .build .lib .remote .util .DigestUtil ;
3740import com .google .devtools .build .lib .remote .util .RxUtils .TransferResult ;
3841import com .google .protobuf .Message ;
42+ import io .reactivex .rxjava3 .annotations .NonNull ;
3943import io .reactivex .rxjava3 .core .Completable ;
44+ import io .reactivex .rxjava3 .core .CompletableObserver ;
4045import io .reactivex .rxjava3 .core .Flowable ;
46+ import io .reactivex .rxjava3 .core .Maybe ;
47+ import io .reactivex .rxjava3 .core .Observable ;
4148import io .reactivex .rxjava3 .core .Single ;
49+ import io .reactivex .rxjava3 .core .SingleEmitter ;
50+ import io .reactivex .rxjava3 .disposables .Disposable ;
4251import io .reactivex .rxjava3 .subjects .AsyncSubject ;
4352import java .io .IOException ;
44- import java .util .HashSet ;
53+ import java .util .List ;
4554import java .util .Map ;
46- import java .util .Set ;
47- import java .util .concurrent .atomic .AtomicBoolean ;
48- import javax .annotation .concurrent .GuardedBy ;
55+ import java .util .concurrent .atomic .AtomicReference ;
4956
5057/** A {@link RemoteCache} with additional functionality needed for remote execution. */
5158public class RemoteExecutionCache extends RemoteCache {
@@ -85,13 +92,10 @@ public void ensureInputsPresent(
8592 return ;
8693 }
8794
88- MissingDigestFinder missingDigestFinder = new MissingDigestFinder (context , allDigests .size ());
8995 Flowable <TransferResult > uploads =
90- Flowable .fromIterable (allDigests )
91- .flatMapSingle (
92- digest ->
93- uploadBlobIfMissing (
94- context , merkleTree , additionalInputs , force , missingDigestFinder , digest ));
96+ createUploadTasks (context , merkleTree , additionalInputs , allDigests , force )
97+ .flatMap (uploadTasks -> findMissingBlobs (context , uploadTasks ))
98+ .flatMapPublisher (this ::waitForUploadTasks );
9599
96100 try {
97101 mergeBulkTransfer (uploads ).blockingAwait ();
@@ -105,36 +109,6 @@ public void ensureInputsPresent(
105109 }
106110 }
107111
108- private Single <TransferResult > uploadBlobIfMissing (
109- RemoteActionExecutionContext context ,
110- MerkleTree merkleTree ,
111- Map <Digest , Message > additionalInputs ,
112- boolean force ,
113- MissingDigestFinder missingDigestFinder ,
114- Digest digest ) {
115- Completable upload =
116- casUploadCache .execute (
117- digest ,
118- Completable .defer (
119- () ->
120- // Only reach here if the digest is missing and is not being uploaded.
121- missingDigestFinder
122- .registerAndCount (digest )
123- .flatMapCompletable (
124- missingDigests -> {
125- if (missingDigests .contains (digest )) {
126- return toCompletable (
127- () -> uploadBlob (context , digest , merkleTree , additionalInputs ),
128- directExecutor ());
129- } else {
130- return Completable .complete ();
131- }
132- })),
133- /* onIgnored= */ missingDigestFinder ::count ,
134- force );
135- return toTransferResult (upload );
136- }
137-
138112 private ListenableFuture <Void > uploadBlob (
139113 RemoteActionExecutionContext context ,
140114 Digest digest ,
@@ -158,99 +132,159 @@ private ListenableFuture<Void> uploadBlob(
158132 return cacheProtocol .uploadBlob (context , digest , message .toByteString ());
159133 }
160134
161- return Futures . immediateFailedFuture (
135+ return immediateFailedFuture (
162136 new IOException (
163137 format (
164138 "findMissingDigests returned a missing digest that has not been requested: %s" ,
165139 digest )));
166140 }
167141
168- /**
169- * A missing digest finder that initiates the request when the internal counter reaches an
170- * expected count.
171- */
172- class MissingDigestFinder {
173- private final int expectedCount ;
174-
175- private final AsyncSubject <ImmutableSet <Digest >> digestsSubject ;
176- private final Single <ImmutableSet <Digest >> resultSingle ;
142+ static class UploadTask {
143+ Digest digest ;
144+ AtomicReference <Disposable > disposable ;
145+ SingleEmitter <Boolean > continuation ;
146+ Completable completion ;
147+ }
177148
178- @ GuardedBy ("this" )
179- private final Set <Digest > digests ;
149+ private Single <List <UploadTask >> createUploadTasks (
150+ RemoteActionExecutionContext context ,
151+ MerkleTree merkleTree ,
152+ Map <Digest , Message > additionalInputs ,
153+ Iterable <Digest > allDigests ,
154+ boolean force ) {
155+ return Single .using (
156+ () -> Profiler .instance ().profile ("collect digests" ),
157+ ignored ->
158+ Flowable .fromIterable (allDigests )
159+ .flatMapMaybe (
160+ digest ->
161+ maybeCreateUploadTask (context , merkleTree , additionalInputs , digest , force ))
162+ .collect (toImmutableList ()),
163+ SilentCloseable ::close );
164+ }
180165
181- @ GuardedBy ("this" )
182- private int currentCount = 0 ;
166+ private Maybe <UploadTask > maybeCreateUploadTask (
167+ RemoteActionExecutionContext context ,
168+ MerkleTree merkleTree ,
169+ Map <Digest , Message > additionalInputs ,
170+ Digest digest ,
171+ boolean force ) {
172+ return Maybe .create (
173+ emitter -> {
174+ AsyncSubject <Void > completion = AsyncSubject .create ();
175+ UploadTask uploadTask = new UploadTask ();
176+ uploadTask .digest = digest ;
177+ uploadTask .disposable = new AtomicReference <>();
178+ uploadTask .completion =
179+ Completable .fromObservable (
180+ completion .doOnDispose (
181+ () -> {
182+ Disposable d = uploadTask .disposable .getAndSet (null );
183+ if (d != null ) {
184+ d .dispose ();
185+ }
186+ }));
187+ Completable upload =
188+ casUploadCache .execute (
189+ digest ,
190+ Single .<Boolean >create (
191+ continuation -> {
192+ uploadTask .continuation = continuation ;
193+ emitter .onSuccess (uploadTask );
194+ })
195+ .flatMapCompletable (
196+ shouldUpload -> {
197+ if (!shouldUpload ) {
198+ return Completable .complete ();
199+ }
183200
184- MissingDigestFinder (RemoteActionExecutionContext context , int expectedCount ) {
185- checkArgument (expectedCount > 0 , "expectedCount should be greater than 0" );
186- this .expectedCount = expectedCount ;
187- this .digestsSubject = AsyncSubject .create ();
188- this .digests = new HashSet <>();
201+ return toCompletable (
202+ () ->
203+ uploadBlob (
204+ context , uploadTask .digest , merkleTree , additionalInputs ),
205+ directExecutor ());
206+ }),
207+ /* onAlreadyRunning= */ () -> emitter .onSuccess (uploadTask ),
208+ /* onAlreadyFinished= */ emitter ::onComplete ,
209+ force );
210+ upload .subscribe (
211+ new CompletableObserver () {
212+ @ Override
213+ public void onSubscribe (@ NonNull Disposable d ) {
214+ uploadTask .disposable .set (d );
215+ }
189216
190- AtomicBoolean findMissingDigestsCalled = new AtomicBoolean (false );
191- this .resultSingle =
192- Single .fromObservable (
193- digestsSubject
194- .flatMapSingle (
195- digests -> {
196- boolean wasCalled = findMissingDigestsCalled .getAndSet (true );
197- // Make sure we don't have re-subscription caused by refCount() below.
198- checkState (!wasCalled , "FindMissingDigests is called more than once" );
199- return toSingle (
200- () -> findMissingDigests (context , digests ), directExecutor ());
201- })
202- // Use replay here because we could have a race condition that downstream hasn't
203- // been added to the subscription list (to receive the upstream result) while
204- // upstream is completed.
205- .replay (1 )
206- .refCount ());
207- }
217+ @ Override
218+ public void onComplete () {
219+ completion .onComplete ();
220+ }
208221
209- /**
210- * Register the {@code digest} and increase the counter.
211- *
212- * <p>Returned Single cannot be subscribed more than once.
213- *
214- * @return Single that emits the result of the {@code FindMissingDigest} request.
215- */
216- Single <ImmutableSet <Digest >> registerAndCount (Digest digest ) {
217- AtomicBoolean subscribed = new AtomicBoolean (false );
218- // count() will potentially trigger the findMissingDigests call. Adding and counting before
219- // returning the Single could introduce a race that the result of findMissingDigests is
220- // available but the consumer doesn't get it because it hasn't subscribed the returned
221- // Single. In this case, it subscribes after upstream is completed resulting a re-run of
222- // findMissingDigests (due to refCount()).
223- //
224- // Calling count() inside doOnSubscribe to ensure the consumer already subscribed to the
225- // returned Single to avoid a re-execution of findMissingDigests.
226- return resultSingle .doOnSubscribe (
227- d -> {
228- boolean wasSubscribed = subscribed .getAndSet (true );
229- checkState (!wasSubscribed , "Single is subscribed more than once" );
230- synchronized (this ) {
231- digests .add (digest );
232- }
233- count ();
234- });
235- }
222+ @ Override
223+ public void onError (@ NonNull Throwable e ) {
224+ Disposable d = uploadTask .disposable .get ();
225+ if (d != null && d .isDisposed ()) {
226+ return ;
227+ }
236228
237- /** Increase the counter. */
238- void count () {
239- ImmutableSet <Digest > digestsResult = null ;
229+ completion .onError (e );
230+ }
231+ });
232+ });
233+ }
240234
241- synchronized (this ) {
242- if (currentCount < expectedCount ) {
243- currentCount ++;
244- if (currentCount == expectedCount ) {
245- digestsResult = ImmutableSet .copyOf (digests );
246- }
247- }
248- }
235+ private Single <List <UploadTask >> findMissingBlobs (
236+ RemoteActionExecutionContext context , List <UploadTask > uploadTasks ) {
237+ return Single .using (
238+ () -> Profiler .instance ().profile ("findMissingDigests" ),
239+ ignored ->
240+ Single .fromObservable (
241+ Observable .fromSingle (
242+ toSingle (
243+ () -> {
244+ ImmutableList <Digest > digestsToQuery =
245+ uploadTasks .stream ()
246+ .filter (uploadTask -> uploadTask .continuation != null )
247+ .map (uploadTask -> uploadTask .digest )
248+ .collect (toImmutableList ());
249+ if (digestsToQuery .isEmpty ()) {
250+ return immediateFuture (ImmutableSet .of ());
251+ }
252+ return findMissingDigests (context , digestsToQuery );
253+ },
254+ directExecutor ())
255+ .map (
256+ missingDigests -> {
257+ for (UploadTask uploadTask : uploadTasks ) {
258+ if (uploadTask .continuation != null ) {
259+ uploadTask .continuation .onSuccess (
260+ missingDigests .contains (uploadTask .digest ));
261+ }
262+ }
263+ return uploadTasks ;
264+ }))
265+ // Use AsyncSubject so that if downstream is disposed, the
266+ // findMissingDigests call is not cancelled (because it may be needed by
267+ // other
268+ // threads).
269+ .subscribeWith (AsyncSubject .create ()))
270+ .doOnDispose (
271+ () -> {
272+ for (UploadTask uploadTask : uploadTasks ) {
273+ Disposable d = uploadTask .disposable .getAndSet (null );
274+ if (d != null ) {
275+ d .dispose ();
276+ }
277+ }
278+ }),
279+ SilentCloseable ::close );
280+ }
249281
250- if (digestsResult != null ) {
251- digestsSubject .onNext (digestsResult );
252- digestsSubject .onComplete ();
253- }
254- }
282+ private Flowable <TransferResult > waitForUploadTasks (List <UploadTask > uploadTasks ) {
283+ return Flowable .using (
284+ () -> Profiler .instance ().profile ("upload" ),
285+ ignored ->
286+ Flowable .fromIterable (uploadTasks )
287+ .flatMapSingle (uploadTask -> toTransferResult (uploadTask .completion )),
288+ SilentCloseable ::close );
255289 }
256290}
0 commit comments