1616import static com .google .common .base .Preconditions .checkArgument ;
1717import static com .google .common .base .Preconditions .checkNotNull ;
1818import static com .google .common .base .Preconditions .checkState ;
19- import static com .google .common .util .concurrent .Futures .immediateFailedFuture ;
20- import static com .google .common .util .concurrent .Futures .immediateVoidFuture ;
2119import static com .google .common .util .concurrent .MoreExecutors .directExecutor ;
2220import static com .google .devtools .build .lib .remote .util .RxFutures .toCompletable ;
2321import static com .google .devtools .build .lib .remote .util .RxFutures .toListenableFuture ;
22+ import static com .google .devtools .build .lib .remote .util .RxUtils .mergeBulkTransfer ;
2423import static com .google .devtools .build .lib .remote .util .Utils .getFromFuture ;
25- import static com .google .devtools .build .lib .remote .util .Utils .mergeBulkTransfer ;
2624
2725import com .google .auto .value .AutoValue ;
2826import com .google .common .annotations .VisibleForTesting ;
2927import com .google .common .collect .ImmutableSet ;
3028import com .google .common .collect .Sets ;
3129import com .google .common .flogger .GoogleLogger ;
32- import com .google .common .util .concurrent .Futures ;
3330import com .google .common .util .concurrent .ListenableFuture ;
3431import com .google .devtools .build .lib .actions .Action ;
3532import com .google .devtools .build .lib .actions .ActionExecutionMetadata ;
4744import com .google .devtools .build .lib .profiler .ProfilerTask ;
4845import com .google .devtools .build .lib .remote .common .CacheNotFoundException ;
4946import com .google .devtools .build .lib .remote .util .AsyncTaskCache ;
47+ import com .google .devtools .build .lib .remote .util .RxUtils ;
48+ import com .google .devtools .build .lib .remote .util .RxUtils .TransferResult ;
5049import com .google .devtools .build .lib .remote .util .TempPathGenerator ;
5150import com .google .devtools .build .lib .vfs .FileSymlinkLoopException ;
5251import com .google .devtools .build .lib .vfs .FileSystemUtils ;
5352import com .google .devtools .build .lib .vfs .OutputPermissions ;
5453import com .google .devtools .build .lib .vfs .Path ;
5554import com .google .devtools .build .lib .vfs .PathFragment ;
5655import io .reactivex .rxjava3 .core .Completable ;
56+ import io .reactivex .rxjava3 .core .Flowable ;
57+ import io .reactivex .rxjava3 .core .Single ;
5758import java .io .IOException ;
5859import java .util .ArrayList ;
5960import java .util .Arrays ;
@@ -282,10 +283,6 @@ public ListenableFuture<Void> prefetchFiles(
282283 files .add (input );
283284 }
284285
285- if (files .isEmpty ()) {
286- return immediateVoidFuture ();
287- }
288-
289286 // Collect the set of directories whose output permissions must be set at the end of this call.
290287 // This responsibility cannot lie with the downloading of an individual file, because multiple
291288 // files may be concurrently downloaded into the same directory within a single call to
@@ -294,38 +291,30 @@ public ListenableFuture<Void> prefetchFiles(
294291 // it must still synchronize on the output permissions having been set.
295292 Set <Path > dirsWithOutputPermissions = Sets .newConcurrentHashSet ();
296293
297- // Using plain futures to avoid RxJava overheads.
298- List <ListenableFuture <Void >> transfers = new ArrayList <>(files .size ());
299- try (var s = Profiler .instance ().profile ("compose prefetches" )) {
300- for (var file : files ) {
301- transfers .add (
302- prefetchFile (action , dirsWithOutputPermissions , metadataSupplier , file , priority ));
303- }
304- }
305-
306- ListenableFuture <Void > mergedTransfer ;
307- try (var s = Profiler .instance ().profile ("mergeBulkTransfer" )) {
308- mergedTransfer = mergeBulkTransfer (transfers );
309- }
310-
311- return Futures .transformAsync (
312- mergedTransfer ,
313- unused -> {
314- try {
315- // Set output permissions on tree artifact subdirectories, matching the behavior of
316- // SkyframeActionExecutor#checkOutputs for artifacts produced by local actions.
317- for (Path dir : dirsWithOutputPermissions ) {
318- directoryTracker .setOutputPermissions (dir );
319- }
320- } catch (IOException e ) {
321- return immediateFailedFuture (e );
322- }
323- return immediateVoidFuture ();
324- },
325- directExecutor ());
294+ Completable prefetch =
295+ mergeBulkTransfer (
296+ Flowable .fromIterable (files )
297+ .flatMapSingle (
298+ input ->
299+ prefetchFile (
300+ action ,
301+ dirsWithOutputPermissions ,
302+ metadataSupplier ,
303+ input ,
304+ priority )))
305+ .doOnComplete (
306+ // Set output permissions on tree artifact subdirectories, matching the behavior of
307+ // SkyframeActionExecutor#checkOutputs for artifacts produced by local actions.
308+ () -> {
309+ for (Path dir : dirsWithOutputPermissions ) {
310+ directoryTracker .setOutputPermissions (dir );
311+ }
312+ });
313+
314+ return toListenableFuture (prefetch );
326315 }
327316
328- private ListenableFuture < Void > prefetchFile (
317+ private Single < TransferResult > prefetchFile (
329318 ActionExecutionMetadata action ,
330319 Set <Path > dirsWithOutputPermissions ,
331320 MetadataSupplier metadataSupplier ,
@@ -334,14 +323,14 @@ private ListenableFuture<Void> prefetchFile(
334323 try {
335324 if (input instanceof VirtualActionInput ) {
336325 prefetchVirtualActionInput ((VirtualActionInput ) input );
337- return immediateVoidFuture ( );
326+ return Single . just ( TransferResult . ok () );
338327 }
339328
340329 PathFragment execPath = input .getExecPath ();
341330
342331 FileArtifactValue metadata = metadataSupplier .getMetadata (input );
343332 if (metadata == null || !canDownloadFile (execRoot .getRelative (execPath ), metadata )) {
344- return immediateVoidFuture ( );
333+ return Single . just ( TransferResult . ok () );
345334 }
346335
347336 @ Nullable Symlink symlink = maybeGetSymlink (action , input , metadata , metadataSupplier );
@@ -368,9 +357,11 @@ private ListenableFuture<Void> prefetchFile(
368357 result = result .andThen (plantSymlink (symlink ));
369358 }
370359
371- return toListenableFuture (result );
372- } catch (IOException | InterruptedException e ) {
373- return immediateFailedFuture (e );
360+ return RxUtils .toTransferResult (result );
361+ } catch (IOException e ) {
362+ return Single .just (TransferResult .error (e ));
363+ } catch (InterruptedException e ) {
364+ return Single .just (TransferResult .interrupted ());
374365 }
375366 }
376367
0 commit comments