Skip to content

Commit 9fb83b4

Browse files
buchgrcopybara-github
authored andcommitted
Refactor GrpcRemoteCache to use uploadFile and uploadBlob.
The methods have the same signature as the ones in SimpleBlobStore. This gets us one step closer to merging AbstractRemoteActionCache and SimpleBlobStoreActionCache. Closes #9039. PiperOrigin-RevId: 263333697
1 parent 2f42d17 commit 9fb83b4

8 files changed

Lines changed: 114 additions & 57 deletions

File tree

src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,28 @@ abstract void upload(
133133
FileOutErr outErr)
134134
throws ExecException, IOException, InterruptedException;
135135

136+
/**
137+
* Uploads a file
138+
*
139+
* <p>Any errors are being propagated via the returned future. If the future completes without
140+
* errors the upload was successful.
141+
*
142+
* @param digest the digest of the file.
143+
* @param file the file to upload.
144+
*/
145+
protected abstract ListenableFuture<Void> uploadFile(Digest digest, Path file);
146+
147+
/**
148+
* Uploads a BLOB.
149+
*
150+
* <p>Any errors are being propagated via the returned future. If the future completes without
151+
* errors the upload was successful
152+
*
153+
* @param digest the digest of the blob.
154+
* @param data the blob to upload.
155+
*/
156+
protected abstract ListenableFuture<Void> uploadBlob(Digest digest, ByteString data);
157+
136158
/**
137159
* Downloads a blob with a content hash {@code digest} to {@code out}.
138160
*
@@ -641,7 +663,7 @@ static class UploadManifest {
641663
private final boolean allowSymlinks;
642664
private final boolean uploadSymlinks;
643665
private final Map<Digest, Path> digestToFile = new HashMap<>();
644-
private final Map<Digest, Chunker> digestToChunkers = new HashMap<>();
666+
private final Map<Digest, ByteString> digestToBlobs = new HashMap<>();
645667
private Digest stderrDigest;
646668
private Digest stdoutDigest;
647669

@@ -731,16 +753,9 @@ public void addFiles(Collection<Path> files) throws ExecException, IOException {
731753
* Adds an action and command protos to upload. They need to be uploaded as part of the action
732754
* result.
733755
*/
734-
public void addAction(DigestUtil.ActionKey actionKey, Action action, Command command)
735-
throws IOException {
736-
byte[] actionBlob = action.toByteArray();
737-
digestToChunkers.put(
738-
actionKey.getDigest(),
739-
Chunker.builder().setInput(actionBlob).setChunkSize(actionBlob.length).build());
740-
byte[] commandBlob = command.toByteArray();
741-
digestToChunkers.put(
742-
action.getCommandDigest(),
743-
Chunker.builder().setInput(commandBlob).setChunkSize(commandBlob.length).build());
756+
public void addAction(DigestUtil.ActionKey actionKey, Action action, Command command) {
757+
digestToBlobs.put(actionKey.getDigest(), action.toByteString());
758+
digestToBlobs.put(action.getCommandDigest(), command.toByteString());
744759
}
745760

746761
/** Map of digests to file paths to upload. */
@@ -751,10 +766,10 @@ public Map<Digest, Path> getDigestToFile() {
751766
/**
752767
* Map of digests to chunkers to upload. When the file is a regular, non-directory file it is
753768
* transmitted through {@link #getDigestToFile()}. When it is a directory, it is transmitted as
754-
* a {@link Tree} protobuf message through {@link #getDigestToChunkers()}.
769+
* a {@link Tree} protobuf message through {@link #getDigestToBlobs()}.
755770
*/
756-
public Map<Digest, Chunker> getDigestToChunkers() {
757-
return digestToChunkers;
771+
public Map<Digest, ByteString> getDigestToBlobs() {
772+
return digestToBlobs;
758773
}
759774

760775
@Nullable
@@ -796,9 +811,8 @@ private void addDirectory(Path dir) throws ExecException, IOException {
796811
Directory root = computeDirectory(dir, tree);
797812
tree.setRoot(root);
798813

799-
byte[] blob = tree.build().toByteArray();
800-
Digest digest = digestUtil.compute(blob);
801-
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(blob.length).build();
814+
ByteString data = tree.build().toByteString();
815+
Digest digest = digestUtil.compute(data.toByteArray());
802816

803817
if (result != null) {
804818
result
@@ -807,7 +821,7 @@ private void addDirectory(Path dir) throws ExecException, IOException {
807821
.setTreeDigest(digest);
808822
}
809823

810-
digestToChunkers.put(digest, chunker);
824+
digestToBlobs.put(digest, data);
811825
}
812826

813827
private Directory computeDirectory(Path path, Tree.Builder tree)

src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.google.common.base.Ascii;
3939
import com.google.common.base.Preconditions;
4040
import com.google.common.base.Throwables;
41+
import com.google.common.collect.ImmutableList;
4142
import com.google.common.collect.ImmutableSet;
4243
import com.google.common.collect.Iterables;
4344
import com.google.common.collect.Maps;
@@ -401,6 +402,22 @@ public void upload(
401402
}
402403
}
403404

405+
@Override
406+
protected ListenableFuture<Void> uploadFile(Digest digest, Path path) {
407+
return uploader.uploadBlobAsync(
408+
HashCode.fromString(digest.getHash()),
409+
Chunker.builder().setInput(digest.getSizeBytes(), path).build(),
410+
/* forceUpload= */ true);
411+
}
412+
413+
@Override
414+
protected ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
415+
return uploader.uploadBlobAsync(
416+
HashCode.fromString(digest.getHash()),
417+
Chunker.builder().setInput(data.toByteArray()).build(),
418+
/* forceUpload= */ true);
419+
}
420+
404421
void upload(
405422
Path execRoot,
406423
ActionKey actionKey,
@@ -421,33 +438,29 @@ void upload(
421438
manifest.setStdoutStderr(outErr);
422439
manifest.addAction(actionKey, action, command);
423440

424-
Map<HashCode, Chunker> filesToUpload = Maps.newHashMap();
425-
426441
Map<Digest, Path> digestToFile = manifest.getDigestToFile();
427-
Map<Digest, Chunker> digestToChunkers = manifest.getDigestToChunkers();
442+
Map<Digest, ByteString> digestToBlobs = manifest.getDigestToBlobs();
428443
Collection<Digest> digests = new ArrayList<>();
429444
digests.addAll(digestToFile.keySet());
430-
digests.addAll(digestToChunkers.keySet());
445+
digests.addAll(digestToBlobs.keySet());
431446

432447
ImmutableSet<Digest> digestsToUpload = getMissingDigests(digests);
448+
ImmutableList.Builder<ListenableFuture<Void>> uploads = ImmutableList.builder();
433449
for (Digest digest : digestsToUpload) {
434-
Chunker chunker;
435450
Path file = digestToFile.get(digest);
436451
if (file != null) {
437-
chunker = Chunker.builder().setInput(digest.getSizeBytes(), file).build();
452+
uploads.add(uploadFile(digest, file));
438453
} else {
439-
chunker = digestToChunkers.get(digest);
440-
if (chunker == null) {
454+
ByteString blob = digestToBlobs.get(digest);
455+
if (blob == null) {
441456
String message = "FindMissingBlobs call returned an unknown digest: " + digest;
442457
throw new IOException(message);
443458
}
459+
uploads.add(uploadBlob(digest, blob));
444460
}
445-
filesToUpload.put(HashCode.fromString(digest.getHash()), chunker);
446461
}
447462

448-
if (!filesToUpload.isEmpty()) {
449-
uploader.uploadBlobs(filesToUpload, /*forceUpload=*/true);
450-
}
463+
waitForUploads(uploads.build());
451464

452465
if (manifest.getStderrDigest() != null) {
453466
result.setStderrDigest(manifest.getStderrDigest());
@@ -456,7 +469,26 @@ void upload(
456469
result.setStdoutDigest(manifest.getStdoutDigest());
457470
}
458471
}
459-
472+
473+
private static void waitForUploads(List<ListenableFuture<Void>> uploads)
474+
throws IOException, InterruptedException {
475+
try {
476+
for (ListenableFuture<Void> upload : uploads) {
477+
upload.get();
478+
}
479+
} catch (ExecutionException e) {
480+
// TODO(buchgr): Add support for cancellation and factor this method out to be shared
481+
// between ByteStreamUploader as well.
482+
Throwable cause = e.getCause();
483+
Throwables.throwIfInstanceOf(cause, IOException.class);
484+
Throwables.throwIfInstanceOf(cause, InterruptedException.class);
485+
if (cause != null) {
486+
throw new IOException(cause);
487+
}
488+
throw new IOException(e);
489+
}
490+
}
491+
460492
// Execution Cache API
461493

462494
@Override

src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import build.bazel.remote.execution.v2.Directory;
2222
import build.bazel.remote.execution.v2.DirectoryNode;
2323
import build.bazel.remote.execution.v2.FileNode;
24-
import com.google.common.base.Throwables;
2524
import com.google.common.hash.HashingOutputStream;
2625
import com.google.common.util.concurrent.FutureCallback;
2726
import com.google.common.util.concurrent.Futures;
@@ -44,7 +43,6 @@
4443
import java.io.OutputStream;
4544
import java.util.Collection;
4645
import java.util.Map;
47-
import java.util.concurrent.ExecutionException;
4846
import javax.annotation.Nullable;
4947

5048
/**
@@ -91,12 +89,12 @@ public void upload(
9189
upload(result, actionKey, action, command, execRoot, files, /* uploadAction= */ true);
9290
if (outErr.getErrorPath().exists()) {
9391
Digest stdErrDigest = digestUtil.compute(outErr.getErrorPath());
94-
uploadFile(stdErrDigest, outErr.getErrorPath());
92+
getFromFuture(uploadFile(stdErrDigest, outErr.getErrorPath()));
9593
result.setStderrDigest(stdErrDigest);
9694
}
9795
if (outErr.getOutputPath().exists()) {
9896
Digest stdoutDigest = digestUtil.compute(outErr.getOutputPath());
99-
uploadFile(stdoutDigest, outErr.getOutputPath());
97+
getFromFuture(uploadFile(stdoutDigest, outErr.getOutputPath()));
10098
result.setStdoutDigest(stdoutDigest);
10199
}
102100
blobStore.putActionResult(actionKey.getDigest().getHash(), result.build().toByteArray());
@@ -124,32 +122,22 @@ public void upload(
124122
}
125123

126124
for (Map.Entry<Digest, Path> entry : manifest.getDigestToFile().entrySet()) {
127-
uploadFile(entry.getKey(), entry.getValue());
125+
getFromFuture(uploadFile(entry.getKey(), entry.getValue()));
128126
}
129127

130-
for (Map.Entry<Digest, Chunker> entry : manifest.getDigestToChunkers().entrySet()) {
131-
uploadBlob(entry.getKey(), entry.getValue().next().getData());
128+
for (Map.Entry<Digest, ByteString> entry : manifest.getDigestToBlobs().entrySet()) {
129+
getFromFuture(uploadBlob(entry.getKey(), entry.getValue()));
132130
}
133131
}
134132

135-
public void uploadFile(Digest digest, Path file) throws IOException, InterruptedException {
136-
try {
137-
blobStore.uploadFile(digest, file).get();
138-
} catch (ExecutionException e) {
139-
Throwables.propagateIfPossible(e.getCause(), IOException.class, InterruptedException.class);
140-
throw new IOException(
141-
String.format("Uploading of file '%s' failed: %s", file.getPathString(), e.getCause()));
142-
}
133+
@Override
134+
public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
135+
return blobStore.uploadFile(digest, file);
143136
}
144137

145-
public void uploadBlob(Digest digest, ByteString data) throws IOException, InterruptedException {
146-
try {
147-
blobStore.uploadBlob(digest, data).get();
148-
} catch (ExecutionException e) {
149-
Throwables.propagateIfPossible(e.getCause(), IOException.class, InterruptedException.class);
150-
throw new IOException(
151-
String.format("Uploading of blob with digest '%s' failed: %s", digest, e.getCause()));
152-
}
138+
@Override
139+
public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
140+
return blobStore.uploadBlob(digest, data);
153141
}
154142

155143
public boolean containsKey(Digest digest) throws IOException, InterruptedException {

src/test/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCacheTests.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,6 +1167,16 @@ void upload(
11671167
throw new UnsupportedOperationException();
11681168
}
11691169

1170+
@Override
1171+
protected ListenableFuture<Void> uploadFile(Digest digest, Path path) {
1172+
throw new UnsupportedOperationException();
1173+
}
1174+
1175+
@Override
1176+
protected ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
1177+
throw new UnsupportedOperationException();
1178+
}
1179+
11701180
@Override
11711181
protected ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
11721182
SettableFuture<Void> result = SettableFuture.create();

src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,16 @@ void upload(
242242
throw new UnsupportedOperationException();
243243
}
244244

245+
@Override
246+
protected ListenableFuture<Void> uploadFile(Digest digest, Path path) {
247+
throw new UnsupportedOperationException();
248+
}
249+
250+
@Override
251+
protected ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
252+
throw new UnsupportedOperationException();
253+
}
254+
245255
@Override
246256
protected ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
247257
ByteString data = cacheEntries.get(digest);

src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ public void onCompleted() {
231231

232232
try {
233233
Digest d = digestUtil.compute(temp);
234-
cache.uploadFile(d, temp);
234+
getFromFuture(cache.uploadFile(d, temp));
235235
try {
236236
temp.delete();
237237
} catch (IOException e) {

src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
package com.google.devtools.build.remote.worker;
1616

17+
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
18+
1719
import build.bazel.remote.execution.v2.BatchUpdateBlobsRequest;
1820
import build.bazel.remote.execution.v2.BatchUpdateBlobsResponse;
1921
import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
@@ -66,7 +68,7 @@ public void batchUpdateBlobs(
6668
BatchUpdateBlobsResponse.Response.Builder resp = batchResponse.addResponsesBuilder();
6769
try {
6870
Digest digest = cache.getDigestUtil().compute(r.getData().toByteArray());
69-
cache.uploadBlob(digest, r.getData());
71+
getFromFuture(cache.uploadBlob(digest, r.getData()));
7072
if (!r.getDigest().equals(digest)) {
7173
String err =
7274
"Upload digest " + r.getDigest() + " did not match data digest: " + digest;

src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,12 +343,13 @@ private ActionResult execute(Digest actionDigest, Path execRoot)
343343
byte[] stdout = cmdResult.getStdout();
344344
if (stdout.length > 0) {
345345
Digest stdoutDigest = digestUtil.compute(stdout);
346-
cache.uploadBlob(stdoutDigest, ByteString.copyFrom(stdout));
346+
getFromFuture(cache.uploadBlob(stdoutDigest, ByteString.copyFrom(stdout)));
347347
result.setStdoutDigest(stdoutDigest);
348348
}
349349
byte[] stderr = cmdResult.getStderr();
350350
if (stderr.length > 0) {
351351
Digest stderrDigest = digestUtil.compute(stderr);
352+
getFromFuture(cache.uploadBlob(stderrDigest, ByteString.copyFrom(stderr)));
352353
result.setStderrDigest(stderrDigest);
353354
}
354355

0 commit comments

Comments
 (0)