Skip to content

Commit 12ebb84

Browse files
buchgrcopybara-github
authored andcommitted
Pull upload(ActionResult) into super class.
Remove the custom upload(ActionResult) implementations from SimpleBlobStoreActionCache and GrpcRemoteCache. This is a big step towards merging SimpleBlobStoreActionCache and GrpcRemoteCache. Closes #9167. PiperOrigin-RevId: 264563384
1 parent 6e4f9e4 commit 12ebb84

9 files changed

Lines changed: 312 additions & 317 deletions

File tree

src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java

Lines changed: 114 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import build.bazel.remote.execution.v2.Tree;
3030
import com.google.common.annotations.VisibleForTesting;
3131
import com.google.common.base.Preconditions;
32+
import com.google.common.base.Throwables;
3233
import com.google.common.collect.ImmutableList;
3334
import com.google.common.collect.ImmutableMap;
3435
import com.google.common.collect.ImmutableSet;
@@ -79,6 +80,7 @@
7980
import java.util.List;
8081
import java.util.Map;
8182
import java.util.Map.Entry;
83+
import java.util.concurrent.ExecutionException;
8284
import java.util.stream.Collectors;
8385
import java.util.stream.Stream;
8486
import javax.annotation.Nullable;
@@ -120,20 +122,8 @@ public AbstractRemoteActionCache(RemoteOptions options, DigestUtil digestUtil) {
120122
abstract ActionResult getCachedActionResult(ActionKey actionKey)
121123
throws IOException, InterruptedException;
122124

123-
/**
124-
* Upload the result of a locally executed action to the remote cache.
125-
*
126-
* @throws IOException if there was an error uploading to the remote cache
127-
* @throws ExecException if uploading any of the action outputs is not supported
128-
*/
129-
abstract void upload(
130-
SimpleBlobStore.ActionKey actionKey,
131-
Action action,
132-
Command command,
133-
Path execRoot,
134-
Collection<Path> files,
135-
FileOutErr outErr)
136-
throws ExecException, IOException, InterruptedException;
125+
protected abstract void setCachedActionResult(ActionKey actionKey, ActionResult action)
126+
throws IOException, InterruptedException;
137127

138128
/**
139129
* Uploads a file
@@ -157,6 +147,116 @@ abstract void upload(
157147
*/
158148
protected abstract ListenableFuture<Void> uploadBlob(Digest digest, ByteString data);
159149

150+
protected abstract ImmutableSet<Digest> getMissingDigests(Iterable<Digest> digests)
151+
throws IOException, InterruptedException;
152+
153+
/**
154+
* Upload the result of a locally executed action to the remote cache.
155+
*
156+
* @throws IOException if there was an error uploading to the remote cache
157+
* @throws ExecException if uploading any of the action outputs is not supported
158+
*/
159+
public ActionResult upload(
160+
ActionKey actionKey,
161+
Action action,
162+
Command command,
163+
Path execRoot,
164+
Collection<Path> outputs,
165+
FileOutErr outErr,
166+
int exitCode)
167+
throws ExecException, IOException, InterruptedException {
168+
ActionResult.Builder resultBuilder = ActionResult.newBuilder();
169+
uploadOutputs(execRoot, actionKey, action, command, outputs, outErr, resultBuilder);
170+
resultBuilder.setExitCode(exitCode);
171+
ActionResult result = resultBuilder.build();
172+
if (exitCode == 0 && !action.getDoNotCache()) {
173+
setCachedActionResult(actionKey, result);
174+
}
175+
return result;
176+
}
177+
178+
public ActionResult upload(
179+
ActionKey actionKey,
180+
Action action,
181+
Command command,
182+
Path execRoot,
183+
Collection<Path> outputs,
184+
FileOutErr outErr)
185+
throws ExecException, IOException, InterruptedException {
186+
return upload(actionKey, action, command, execRoot, outputs, outErr, /* exitCode= */ 0);
187+
}
188+
189+
private void uploadOutputs(
190+
Path execRoot,
191+
ActionKey actionKey,
192+
Action action,
193+
Command command,
194+
Collection<Path> files,
195+
FileOutErr outErr,
196+
ActionResult.Builder result)
197+
throws ExecException, IOException, InterruptedException {
198+
UploadManifest manifest =
199+
new UploadManifest(
200+
digestUtil,
201+
result,
202+
execRoot,
203+
options.incompatibleRemoteSymlinks,
204+
options.allowSymlinkUpload);
205+
manifest.addFiles(files);
206+
manifest.setStdoutStderr(outErr);
207+
manifest.addAction(actionKey, action, command);
208+
209+
Map<Digest, Path> digestToFile = manifest.getDigestToFile();
210+
Map<Digest, ByteString> digestToBlobs = manifest.getDigestToBlobs();
211+
Collection<Digest> digests = new ArrayList<>();
212+
digests.addAll(digestToFile.keySet());
213+
digests.addAll(digestToBlobs.keySet());
214+
215+
ImmutableSet<Digest> digestsToUpload = getMissingDigests(digests);
216+
ImmutableList.Builder<ListenableFuture<Void>> uploads = ImmutableList.builder();
217+
for (Digest digest : digestsToUpload) {
218+
Path file = digestToFile.get(digest);
219+
if (file != null) {
220+
uploads.add(uploadFile(digest, file));
221+
} else {
222+
ByteString blob = digestToBlobs.get(digest);
223+
if (blob == null) {
224+
String message = "FindMissingBlobs call returned an unknown digest: " + digest;
225+
throw new IOException(message);
226+
}
227+
uploads.add(uploadBlob(digest, blob));
228+
}
229+
}
230+
231+
waitForUploads(uploads.build());
232+
233+
if (manifest.getStderrDigest() != null) {
234+
result.setStderrDigest(manifest.getStderrDigest());
235+
}
236+
if (manifest.getStdoutDigest() != null) {
237+
result.setStdoutDigest(manifest.getStdoutDigest());
238+
}
239+
}
240+
241+
private static void waitForUploads(List<ListenableFuture<Void>> uploads)
242+
throws IOException, InterruptedException {
243+
try {
244+
for (ListenableFuture<Void> upload : uploads) {
245+
upload.get();
246+
}
247+
} catch (ExecutionException e) {
248+
// TODO(buchgr): Add support for cancellation and factor this method out to be shared
249+
// between ByteStreamUploader as well.
250+
Throwable cause = e.getCause();
251+
Throwables.throwIfInstanceOf(cause, IOException.class);
252+
Throwables.throwIfInstanceOf(cause, InterruptedException.class);
253+
if (cause != null) {
254+
throw new IOException(cause);
255+
}
256+
throw new IOException(e);
257+
}
258+
}
259+
160260
/**
161261
* Downloads a blob with a content hash {@code digest} to {@code out}.
162262
*

src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java

Lines changed: 30 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@
1717
import static com.google.common.base.Strings.isNullOrEmpty;
1818
import static java.lang.String.format;
1919

20-
import build.bazel.remote.execution.v2.Action;
2120
import build.bazel.remote.execution.v2.ActionCacheGrpc;
2221
import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheBlockingStub;
2322
import build.bazel.remote.execution.v2.ActionResult;
24-
import build.bazel.remote.execution.v2.Command;
2523
import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc;
2624
import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageFutureStub;
2725
import build.bazel.remote.execution.v2.Digest;
@@ -38,7 +36,6 @@
3836
import com.google.common.base.Ascii;
3937
import com.google.common.base.Preconditions;
4038
import com.google.common.base.Throwables;
41-
import com.google.common.collect.ImmutableList;
4239
import com.google.common.collect.ImmutableSet;
4340
import com.google.common.collect.Iterables;
4441
import com.google.common.collect.Maps;
@@ -50,15 +47,13 @@
5047
import com.google.common.util.concurrent.MoreExecutors;
5148
import com.google.common.util.concurrent.SettableFuture;
5249
import com.google.devtools.build.lib.actions.ActionInput;
53-
import com.google.devtools.build.lib.actions.ExecException;
5450
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
5551
import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff;
5652
import com.google.devtools.build.lib.remote.common.SimpleBlobStore.ActionKey;
5753
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
5854
import com.google.devtools.build.lib.remote.options.RemoteOptions;
5955
import com.google.devtools.build.lib.remote.util.DigestUtil;
6056
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
61-
import com.google.devtools.build.lib.util.io.FileOutErr;
6257
import com.google.devtools.build.lib.vfs.Path;
6358
import com.google.protobuf.ByteString;
6459
import com.google.protobuf.Message;
@@ -70,7 +65,6 @@
7065
import java.io.IOException;
7166
import java.io.OutputStream;
7267
import java.util.ArrayList;
73-
import java.util.Collection;
7468
import java.util.List;
7569
import java.util.Map;
7670
import java.util.concurrent.ExecutionException;
@@ -166,17 +160,8 @@ public static boolean isRemoteCacheOptions(RemoteOptions options) {
166160
|| Ascii.toLowerCase(options.remoteCache).startsWith("https://"));
167161
}
168162

169-
private ListenableFuture<FindMissingBlobsResponse> getMissingDigests(
170-
FindMissingBlobsRequest request) throws IOException, InterruptedException {
171-
Context ctx = Context.current();
172-
try {
173-
return retrier.executeAsync(() -> ctx.call(() -> casFutureStub().findMissingBlobs(request)));
174-
} catch (StatusRuntimeException e) {
175-
throw new IOException(e);
176-
}
177-
}
178-
179-
private ImmutableSet<Digest> getMissingDigests(Iterable<Digest> digests)
163+
@Override
164+
protected ImmutableSet<Digest> getMissingDigests(Iterable<Digest> digests)
180165
throws IOException, InterruptedException {
181166
if (Iterables.isEmpty(digests)) {
182167
return ImmutableSet.of();
@@ -208,6 +193,16 @@ private ImmutableSet<Digest> getMissingDigests(Iterable<Digest> digests)
208193
return result.build();
209194
}
210195

196+
private ListenableFuture<FindMissingBlobsResponse> getMissingDigests(
197+
FindMissingBlobsRequest request) throws IOException, InterruptedException {
198+
Context ctx = Context.current();
199+
try {
200+
return retrier.executeAsync(() -> ctx.call(() -> casFutureStub().findMissingBlobs(request)));
201+
} catch (StatusRuntimeException e) {
202+
throw new IOException(e);
203+
}
204+
}
205+
211206
/**
212207
* Ensures that the tree structure of the inputs, the input files themselves, and the command are
213208
* available in the remote cache, such that the tree can be reassembled and executed on another
@@ -376,32 +371,6 @@ public void onCompleted() {
376371
return future;
377372
}
378373

379-
@Override
380-
public void upload(
381-
ActionKey actionKey,
382-
Action action,
383-
Command command,
384-
Path execRoot,
385-
Collection<Path> files,
386-
FileOutErr outErr)
387-
throws ExecException, IOException, InterruptedException {
388-
ActionResult.Builder result = ActionResult.newBuilder();
389-
upload(execRoot, actionKey, action, command, files, outErr, result);
390-
try {
391-
retrier.execute(
392-
() ->
393-
acBlockingStub()
394-
.updateActionResult(
395-
UpdateActionResultRequest.newBuilder()
396-
.setInstanceName(options.remoteInstanceName)
397-
.setActionDigest(actionKey.getDigest())
398-
.setActionResult(result)
399-
.build()));
400-
} catch (StatusRuntimeException e) {
401-
throw new IOException(e);
402-
}
403-
}
404-
405374
@Override
406375
protected ListenableFuture<Void> uploadFile(Digest digest, Path path) {
407376
return uploader.uploadBlobAsync(
@@ -418,77 +387,6 @@ protected ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
418387
/* forceUpload= */ true);
419388
}
420389

421-
void upload(
422-
Path execRoot,
423-
ActionKey actionKey,
424-
Action action,
425-
Command command,
426-
Collection<Path> files,
427-
FileOutErr outErr,
428-
ActionResult.Builder result)
429-
throws ExecException, IOException, InterruptedException {
430-
UploadManifest manifest =
431-
new UploadManifest(
432-
digestUtil,
433-
result,
434-
execRoot,
435-
options.incompatibleRemoteSymlinks,
436-
options.allowSymlinkUpload);
437-
manifest.addFiles(files);
438-
manifest.setStdoutStderr(outErr);
439-
manifest.addAction(actionKey, action, command);
440-
441-
Map<Digest, Path> digestToFile = manifest.getDigestToFile();
442-
Map<Digest, ByteString> digestToBlobs = manifest.getDigestToBlobs();
443-
Collection<Digest> digests = new ArrayList<>();
444-
digests.addAll(digestToFile.keySet());
445-
digests.addAll(digestToBlobs.keySet());
446-
447-
ImmutableSet<Digest> digestsToUpload = getMissingDigests(digests);
448-
ImmutableList.Builder<ListenableFuture<Void>> uploads = ImmutableList.builder();
449-
for (Digest digest : digestsToUpload) {
450-
Path file = digestToFile.get(digest);
451-
if (file != null) {
452-
uploads.add(uploadFile(digest, file));
453-
} else {
454-
ByteString blob = digestToBlobs.get(digest);
455-
if (blob == null) {
456-
String message = "FindMissingBlobs call returned an unknown digest: " + digest;
457-
throw new IOException(message);
458-
}
459-
uploads.add(uploadBlob(digest, blob));
460-
}
461-
}
462-
463-
waitForUploads(uploads.build());
464-
465-
if (manifest.getStderrDigest() != null) {
466-
result.setStderrDigest(manifest.getStderrDigest());
467-
}
468-
if (manifest.getStdoutDigest() != null) {
469-
result.setStdoutDigest(manifest.getStdoutDigest());
470-
}
471-
}
472-
473-
private static void waitForUploads(List<ListenableFuture<Void>> uploads)
474-
throws IOException, InterruptedException {
475-
try {
476-
for (ListenableFuture<Void> upload : uploads) {
477-
upload.get();
478-
}
479-
} catch (ExecutionException e) {
480-
// TODO(buchgr): Add support for cancellation and factor this method out to be shared
481-
// between ByteStreamUploader as well.
482-
Throwable cause = e.getCause();
483-
Throwables.throwIfInstanceOf(cause, IOException.class);
484-
Throwables.throwIfInstanceOf(cause, InterruptedException.class);
485-
if (cause != null) {
486-
throw new IOException(cause);
487-
}
488-
throw new IOException(e);
489-
}
490-
}
491-
492390
// Execution Cache API
493391

494392
@Override
@@ -511,4 +409,22 @@ public ActionResult getCachedActionResult(ActionKey actionKey)
511409
throw new IOException(e);
512410
}
513411
}
412+
413+
@Override
414+
protected void setCachedActionResult(ActionKey actionKey, ActionResult result)
415+
throws IOException, InterruptedException {
416+
try {
417+
retrier.execute(
418+
() ->
419+
acBlockingStub()
420+
.updateActionResult(
421+
UpdateActionResultRequest.newBuilder()
422+
.setInstanceName(options.remoteInstanceName)
423+
.setActionDigest(actionKey.getDigest())
424+
.setActionResult(result)
425+
.build()));
426+
} catch (StatusRuntimeException e) {
427+
throw new IOException(e);
428+
}
429+
}
514430
}

0 commit comments

Comments
 (0)