Skip to content

Commit 162cacd

Browse files
coeuvrecopybara-github
authored andcommitted
Automated rollback of commit 915fb3e.
*** Reason for rollback *** Might cause build to hang forever. b/320630578 *** Original change description *** Optimize prefetchInputs. Use a pre-allocated array to hold the intermediate transfers to avoid allocations. Replace some of RxJava code with Futures to avoid RxJava overheads. This improves the perfromance of prefetchInputs on a large set of inputs from ~400ms to ~16ms. Fixes #20555. Closes #20557. PiperOrigin-RevId: 599135847 Change-Id: Idae6a1c57e634d16091e31e097b16ca97a67e62d
1 parent ee7762e commit 162cacd

File tree

3 files changed

+46
-102
lines changed

3 files changed

+46
-102
lines changed

src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java

Lines changed: 34 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,17 @@
1616
import static com.google.common.base.Preconditions.checkArgument;
1717
import static com.google.common.base.Preconditions.checkNotNull;
1818
import static com.google.common.base.Preconditions.checkState;
19-
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
20-
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
2119
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
2220
import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
2321
import static com.google.devtools.build.lib.remote.util.RxFutures.toListenableFuture;
22+
import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
2423
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
25-
import static com.google.devtools.build.lib.remote.util.Utils.mergeBulkTransfer;
2624

2725
import com.google.auto.value.AutoValue;
2826
import com.google.common.annotations.VisibleForTesting;
2927
import com.google.common.collect.ImmutableSet;
3028
import com.google.common.collect.Sets;
3129
import com.google.common.flogger.GoogleLogger;
32-
import com.google.common.util.concurrent.Futures;
3330
import com.google.common.util.concurrent.ListenableFuture;
3431
import com.google.devtools.build.lib.actions.Action;
3532
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
@@ -47,13 +44,17 @@
4744
import com.google.devtools.build.lib.profiler.ProfilerTask;
4845
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
4946
import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
47+
import com.google.devtools.build.lib.remote.util.RxUtils;
48+
import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
5049
import com.google.devtools.build.lib.remote.util.TempPathGenerator;
5150
import com.google.devtools.build.lib.vfs.FileSymlinkLoopException;
5251
import com.google.devtools.build.lib.vfs.FileSystemUtils;
5352
import com.google.devtools.build.lib.vfs.OutputPermissions;
5453
import com.google.devtools.build.lib.vfs.Path;
5554
import com.google.devtools.build.lib.vfs.PathFragment;
5655
import io.reactivex.rxjava3.core.Completable;
56+
import io.reactivex.rxjava3.core.Flowable;
57+
import io.reactivex.rxjava3.core.Single;
5758
import java.io.IOException;
5859
import java.util.ArrayList;
5960
import java.util.Arrays;
@@ -282,10 +283,6 @@ public ListenableFuture<Void> prefetchFiles(
282283
files.add(input);
283284
}
284285

285-
if (files.isEmpty()) {
286-
return immediateVoidFuture();
287-
}
288-
289286
// Collect the set of directories whose output permissions must be set at the end of this call.
290287
// This responsibility cannot lie with the downloading of an individual file, because multiple
291288
// files may be concurrently downloaded into the same directory within a single call to
@@ -294,38 +291,30 @@ public ListenableFuture<Void> prefetchFiles(
294291
// it must still synchronize on the output permissions having been set.
295292
Set<Path> dirsWithOutputPermissions = Sets.newConcurrentHashSet();
296293

297-
// Using plain futures to avoid RxJava overheads.
298-
List<ListenableFuture<Void>> transfers = new ArrayList<>(files.size());
299-
try (var s = Profiler.instance().profile("compose prefetches")) {
300-
for (var file : files) {
301-
transfers.add(
302-
prefetchFile(action, dirsWithOutputPermissions, metadataSupplier, file, priority));
303-
}
304-
}
305-
306-
ListenableFuture<Void> mergedTransfer;
307-
try (var s = Profiler.instance().profile("mergeBulkTransfer")) {
308-
mergedTransfer = mergeBulkTransfer(transfers);
309-
}
310-
311-
return Futures.transformAsync(
312-
mergedTransfer,
313-
unused -> {
314-
try {
315-
// Set output permissions on tree artifact subdirectories, matching the behavior of
316-
// SkyframeActionExecutor#checkOutputs for artifacts produced by local actions.
317-
for (Path dir : dirsWithOutputPermissions) {
318-
directoryTracker.setOutputPermissions(dir);
319-
}
320-
} catch (IOException e) {
321-
return immediateFailedFuture(e);
322-
}
323-
return immediateVoidFuture();
324-
},
325-
directExecutor());
294+
Completable prefetch =
295+
mergeBulkTransfer(
296+
Flowable.fromIterable(files)
297+
.flatMapSingle(
298+
input ->
299+
prefetchFile(
300+
action,
301+
dirsWithOutputPermissions,
302+
metadataSupplier,
303+
input,
304+
priority)))
305+
.doOnComplete(
306+
// Set output permissions on tree artifact subdirectories, matching the behavior of
307+
// SkyframeActionExecutor#checkOutputs for artifacts produced by local actions.
308+
() -> {
309+
for (Path dir : dirsWithOutputPermissions) {
310+
directoryTracker.setOutputPermissions(dir);
311+
}
312+
});
313+
314+
return toListenableFuture(prefetch);
326315
}
327316

328-
private ListenableFuture<Void> prefetchFile(
317+
private Single<TransferResult> prefetchFile(
329318
ActionExecutionMetadata action,
330319
Set<Path> dirsWithOutputPermissions,
331320
MetadataSupplier metadataSupplier,
@@ -334,14 +323,14 @@ private ListenableFuture<Void> prefetchFile(
334323
try {
335324
if (input instanceof VirtualActionInput) {
336325
prefetchVirtualActionInput((VirtualActionInput) input);
337-
return immediateVoidFuture();
326+
return Single.just(TransferResult.ok());
338327
}
339328

340329
PathFragment execPath = input.getExecPath();
341330

342331
FileArtifactValue metadata = metadataSupplier.getMetadata(input);
343332
if (metadata == null || !canDownloadFile(execRoot.getRelative(execPath), metadata)) {
344-
return immediateVoidFuture();
333+
return Single.just(TransferResult.ok());
345334
}
346335

347336
@Nullable Symlink symlink = maybeGetSymlink(action, input, metadata, metadataSupplier);
@@ -368,9 +357,11 @@ private ListenableFuture<Void> prefetchFile(
368357
result = result.andThen(plantSymlink(symlink));
369358
}
370359

371-
return toListenableFuture(result);
372-
} catch (IOException | InterruptedException e) {
373-
return immediateFailedFuture(e);
360+
return RxUtils.toTransferResult(result);
361+
} catch (IOException e) {
362+
return Single.just(TransferResult.error(e));
363+
} catch (InterruptedException e) {
364+
return Single.just(TransferResult.interrupted());
374365
}
375366
}
376367

src/main/java/com/google/devtools/build/lib/remote/util/Utils.java

Lines changed: 10 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,8 @@
1313
// limitations under the License.
1414
package com.google.devtools.build.lib.remote.util;
1515

16-
import static com.google.common.base.Preconditions.checkNotNull;
1716
import static com.google.common.base.Strings.isNullOrEmpty;
1817
import static com.google.common.base.Throwables.getStackTraceAsString;
19-
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
20-
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
21-
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
2218
import static java.util.stream.Collectors.joining;
2319

2420
import build.bazel.remote.execution.v2.Action;
@@ -33,6 +29,7 @@
3329
import com.google.common.util.concurrent.FluentFuture;
3430
import com.google.common.util.concurrent.Futures;
3531
import com.google.common.util.concurrent.ListenableFuture;
32+
import com.google.common.util.concurrent.MoreExecutors;
3633
import com.google.devtools.build.lib.actions.ActionInput;
3734
import com.google.devtools.build.lib.actions.ExecutionRequirements;
3835
import com.google.devtools.build.lib.actions.Spawn;
@@ -420,11 +417,11 @@ public static ListenableFuture<ActionResult> downloadAsActionResult(
420417
try {
421418
return Futures.immediateFuture(ActionResult.parseFrom(data.toByteArray()));
422419
} catch (InvalidProtocolBufferException e) {
423-
return immediateFailedFuture(e);
420+
return Futures.immediateFailedFuture(e);
424421
}
425422
},
426-
directExecutor())
427-
.catching(CacheNotFoundException.class, (e) -> null, directExecutor());
423+
MoreExecutors.directExecutor())
424+
.catching(CacheNotFoundException.class, (e) -> null, MoreExecutors.directExecutor());
428425
}
429426

430427
public static void verifyBlobContents(Digest expected, Digest actual) throws IOException {
@@ -486,15 +483,15 @@ public ByteString getContents() {
486483
*/
487484
public static <V> ListenableFuture<V> refreshIfUnauthenticatedAsync(
488485
AsyncCallable<V> call, CallCredentialsProvider callCredentialsProvider) {
489-
checkNotNull(call);
490-
checkNotNull(callCredentialsProvider);
486+
Preconditions.checkNotNull(call);
487+
Preconditions.checkNotNull(callCredentialsProvider);
491488

492489
try {
493490
return Futures.catchingAsync(
494491
call.call(),
495492
Throwable.class,
496493
(e) -> refreshIfUnauthenticatedAsyncOnException(e, call, callCredentialsProvider),
497-
directExecutor());
494+
MoreExecutors.directExecutor());
498495
} catch (Throwable t) {
499496
return refreshIfUnauthenticatedAsyncOnException(t, call, callCredentialsProvider);
500497
}
@@ -514,15 +511,15 @@ private static <V> ListenableFuture<V> refreshIfUnauthenticatedAsyncOnException(
514511
}
515512
}
516513

517-
return immediateFailedFuture(t);
514+
return Futures.immediateFailedFuture(t);
518515
}
519516

520517
/** Same as {@link #refreshIfUnauthenticatedAsync} but calling a synchronous code block. */
521518
public static <V> V refreshIfUnauthenticated(
522519
Callable<V> call, CallCredentialsProvider callCredentialsProvider)
523520
throws IOException, InterruptedException {
524-
checkNotNull(call);
525-
checkNotNull(callCredentialsProvider);
521+
Preconditions.checkNotNull(call);
522+
Preconditions.checkNotNull(callCredentialsProvider);
526523

527524
try {
528525
return call.call();
@@ -621,49 +618,4 @@ public static void waitForBulkTransfer(
621618
throw bulkTransferException;
622619
}
623620
}
624-
625-
public static ListenableFuture<Void> mergeBulkTransfer(
626-
Iterable<ListenableFuture<Void>> transfers) {
627-
return Futures.whenAllComplete(transfers)
628-
.callAsync(
629-
() -> {
630-
BulkTransferException bulkTransferException = null;
631-
632-
for (var transfer : transfers) {
633-
IOException error = null;
634-
try {
635-
transfer.get();
636-
} catch (CancellationException e) {
637-
return immediateFailedFuture(new InterruptedException());
638-
} catch (InterruptedException e) {
639-
return immediateFailedFuture(e);
640-
} catch (ExecutionException e) {
641-
var cause = e.getCause();
642-
if (cause instanceof InterruptedException) {
643-
return immediateFailedFuture(cause);
644-
} else if (cause instanceof IOException) {
645-
error = (IOException) cause;
646-
} else {
647-
error = new IOException(cause);
648-
}
649-
}
650-
651-
if (error == null) {
652-
continue;
653-
}
654-
655-
if (bulkTransferException == null) {
656-
bulkTransferException = new BulkTransferException();
657-
}
658-
bulkTransferException.add(error);
659-
}
660-
661-
if (bulkTransferException != null) {
662-
return immediateFailedFuture(bulkTransferException);
663-
}
664-
665-
return immediateVoidFuture();
666-
},
667-
directExecutor());
668-
}
669621
}

src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import java.util.HashMap;
7272
import java.util.Map;
7373
import java.util.concurrent.Callable;
74+
import java.util.concurrent.CancellationException;
7475
import java.util.concurrent.ExecutionException;
7576
import java.util.concurrent.Future;
7677
import java.util.concurrent.LinkedBlockingQueue;
@@ -759,7 +760,7 @@ public void prefetchFile_interruptingMetadataSupplier_interruptsDownload() throw
759760
prefetcher.prefetchFiles(
760761
action, ImmutableList.of(a1), interruptedMetadataSupplier, Priority.MEDIUM);
761762

762-
assertThrows(InterruptedException.class, () -> getFromFuture(future));
763+
assertThrows(CancellationException.class, future::get);
763764
}
764765

765766
@Test

0 commit comments

Comments
 (0)