Skip to content

Commit 36611c3

Browse files
buchgrcopybara-github
authored andcommitted
Refactor SimpleBlobStore#put
Split it into SimpleBlobStore#uploadFile and SimpleBlobStore#uploadBlob. The return type changes to ListenableFuture<Void> but all implementations are still blocking for now. This is a refactoring towards the larger goal of removing SimpleBlobStoreActionCache and making GrpcRemoteCache a SimpleBlobStore implementation. Closes bazelbuild#9028. PiperOrigin-RevId: 262929928
1 parent 137f969 commit 36611c3

26 files changed

Lines changed: 341 additions & 197 deletions

File tree

src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java

Lines changed: 28 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import build.bazel.remote.execution.v2.Directory;
2222
import build.bazel.remote.execution.v2.DirectoryNode;
2323
import build.bazel.remote.execution.v2.FileNode;
24+
import com.google.common.base.Throwables;
2425
import com.google.common.hash.HashingOutputStream;
2526
import com.google.common.util.concurrent.FutureCallback;
2627
import com.google.common.util.concurrent.Futures;
@@ -38,14 +39,12 @@
3839
import com.google.devtools.build.lib.vfs.Path;
3940
import com.google.protobuf.ByteString;
4041
import com.google.protobuf.InvalidProtocolBufferException;
41-
import java.io.ByteArrayInputStream;
4242
import java.io.ByteArrayOutputStream;
4343
import java.io.IOException;
44-
import java.io.InputStream;
4544
import java.io.OutputStream;
4645
import java.util.Collection;
4746
import java.util.Map;
48-
import java.util.concurrent.ConcurrentHashMap;
47+
import java.util.concurrent.ExecutionException;
4948
import javax.annotation.Nullable;
5049

5150
/**
@@ -57,17 +56,12 @@
5756
*/
5857
@ThreadSafe
5958
public final class SimpleBlobStoreActionCache extends AbstractRemoteActionCache {
60-
private static final int MAX_BLOB_SIZE_FOR_INLINE = 10 * 1024;
61-
6259
private final SimpleBlobStore blobStore;
6360

64-
private final ConcurrentHashMap<String, Boolean> storedBlobs;
65-
6661
public SimpleBlobStoreActionCache(
6762
RemoteOptions options, SimpleBlobStore blobStore, DigestUtil digestUtil) {
6863
super(options, digestUtil);
6964
this.blobStore = blobStore;
70-
this.storedBlobs = new ConcurrentHashMap<>();
7165
}
7266

7367
public void downloadTree(Digest rootDigest, Path rootLocation)
@@ -84,13 +78,6 @@ public void downloadTree(Digest rootDigest, Path rootLocation)
8478
}
8579
}
8680

87-
private Digest uploadFileContents(Path file) throws IOException, InterruptedException {
88-
Digest digest = digestUtil.compute(file);
89-
try (InputStream in = file.getInputStream()) {
90-
return uploadStream(digest, in);
91-
}
92-
}
93-
9481
@Override
9582
public void upload(
9683
DigestUtil.ActionKey actionKey,
@@ -103,12 +90,14 @@ public void upload(
10390
ActionResult.Builder result = ActionResult.newBuilder();
10491
upload(result, actionKey, action, command, execRoot, files, /* uploadAction= */ true);
10592
if (outErr.getErrorPath().exists()) {
106-
Digest stderr = uploadFileContents(outErr.getErrorPath());
107-
result.setStderrDigest(stderr);
93+
Digest stdErrDigest = digestUtil.compute(outErr.getErrorPath());
94+
uploadFile(stdErrDigest, outErr.getErrorPath());
95+
result.setStderrDigest(stdErrDigest);
10896
}
10997
if (outErr.getOutputPath().exists()) {
110-
Digest stdout = uploadFileContents(outErr.getOutputPath());
111-
result.setStdoutDigest(stdout);
98+
Digest stdoutDigest = digestUtil.compute(outErr.getOutputPath());
99+
uploadFile(stdoutDigest, outErr.getOutputPath());
100+
result.setStdoutDigest(stdoutDigest);
112101
}
113102
blobStore.putActionResult(actionKey.getDigest().getHash(), result.build().toByteArray());
114103
}
@@ -135,54 +124,32 @@ public void upload(
135124
}
136125

137126
for (Map.Entry<Digest, Path> entry : manifest.getDigestToFile().entrySet()) {
138-
try (InputStream in = entry.getValue().getInputStream()) {
139-
uploadStream(entry.getKey(), in);
140-
}
127+
uploadFile(entry.getKey(), entry.getValue());
141128
}
142129

143130
for (Map.Entry<Digest, Chunker> entry : manifest.getDigestToChunkers().entrySet()) {
144-
uploadBlob(entry.getValue().next().getData().toByteArray(), entry.getKey());
145-
}
146-
}
147-
148-
public void uploadOutErr(ActionResult.Builder result, byte[] stdout, byte[] stderr)
149-
throws IOException, InterruptedException {
150-
if (stdout.length <= MAX_BLOB_SIZE_FOR_INLINE) {
151-
result.setStdoutRaw(ByteString.copyFrom(stdout));
152-
} else if (stdout.length > 0) {
153-
result.setStdoutDigest(uploadBlob(stdout));
154-
}
155-
if (stderr.length <= MAX_BLOB_SIZE_FOR_INLINE) {
156-
result.setStderrRaw(ByteString.copyFrom(stderr));
157-
} else if (stderr.length > 0) {
158-
result.setStderrDigest(uploadBlob(stderr));
131+
uploadBlob(entry.getKey(), entry.getValue().next().getData());
159132
}
160133
}
161134

162-
public Digest uploadBlob(byte[] blob) throws IOException, InterruptedException {
163-
return uploadBlob(blob, digestUtil.compute(blob));
164-
}
165-
166-
private Digest uploadBlob(byte[] blob, Digest digest) throws IOException, InterruptedException {
167-
try (InputStream in = new ByteArrayInputStream(blob)) {
168-
return uploadStream(digest, in);
135+
public void uploadFile(Digest digest, Path file) throws IOException, InterruptedException {
136+
try {
137+
blobStore.uploadFile(digest, file).get();
138+
} catch (ExecutionException e) {
139+
Throwables.propagateIfPossible(e.getCause(), IOException.class, InterruptedException.class);
140+
throw new IOException(
141+
String.format("Uploading of file '%s' failed: %s", file.getPathString(), e.getCause()));
169142
}
170143
}
171144

172-
public Digest uploadStream(Digest digest, InputStream in)
173-
throws IOException, InterruptedException {
174-
final String hash = digest.getHash();
175-
176-
if (storedBlobs.putIfAbsent(hash, true) == null) {
177-
try {
178-
blobStore.put(hash, digest.getSizeBytes(), in);
179-
} catch (Exception e) {
180-
storedBlobs.remove(hash);
181-
throw e;
182-
}
145+
public void uploadBlob(Digest digest, ByteString data) throws IOException, InterruptedException {
146+
try {
147+
blobStore.uploadBlob(digest, data).get();
148+
} catch (ExecutionException e) {
149+
Throwables.propagateIfPossible(e.getCause(), IOException.class, InterruptedException.class);
150+
throw new IOException(
151+
String.format("Uploading of blob with digest '%s' failed: %s", digest, e.getCause()));
183152
}
184-
185-
return digest;
186153
}
187154

188155
public boolean containsKey(Digest digest) throws IOException, InterruptedException {
@@ -257,4 +224,8 @@ public void onFailure(Throwable throwable) {
257224
MoreExecutors.directExecutor());
258225
return outerF;
259226
}
227+
228+
public DigestUtil getDigestUtil() {
229+
return digestUtil;
230+
}
260231
}

src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,7 @@ java_library(
1818
"//src/main/java/com/google/devtools/build/lib/vfs",
1919
"//src/main/java/com/google/devtools/common/options",
2020
"//third_party:guava",
21+
"//third_party/protobuf:protobuf_java",
22+
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
2123
],
2224
)

src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,15 @@
1313
// limitations under the License.
1414
package com.google.devtools.build.lib.remote.blobstore;
1515

16+
import build.bazel.remote.execution.v2.Digest;
1617
import com.google.common.base.Preconditions;
1718
import com.google.common.io.ByteStreams;
19+
import com.google.common.util.concurrent.Futures;
1820
import com.google.common.util.concurrent.ListenableFuture;
1921
import com.google.common.util.concurrent.SettableFuture;
2022
import com.google.devtools.build.lib.remote.common.SimpleBlobStore;
23+
import com.google.devtools.build.lib.vfs.Path;
24+
import com.google.protobuf.ByteString;
2125
import java.io.IOException;
2226
import java.io.InputStream;
2327
import java.io.OutputStream;
@@ -64,13 +68,6 @@ public ListenableFuture<Boolean> getActionResult(String key, OutputStream out) {
6468
return get(ACTION_KEY_PREFIX + key, out);
6569
}
6670

67-
@Override
68-
public void put(String key, long length, InputStream in) throws IOException {
69-
byte[] value = ByteStreams.toByteArray(in);
70-
Preconditions.checkState(value.length == length);
71-
map.put(key, value);
72-
}
73-
7471
@Override
7572
public void putActionResult(String key, byte[] in) {
7673
map.put(ACTION_KEY_PREFIX + key, in);
@@ -79,4 +76,29 @@ public void putActionResult(String key, byte[] in) {
7976
@Override
8077
public void close() {}
8178

79+
@Override
80+
public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
81+
try (InputStream in = file.getInputStream()) {
82+
upload(digest.getHash(), digest.getSizeBytes(), in);
83+
} catch (IOException e) {
84+
return Futures.immediateFailedFuture(e);
85+
}
86+
return Futures.immediateFuture(null);
87+
}
88+
89+
@Override
90+
public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
91+
try (InputStream in = data.newInput()) {
92+
upload(digest.getHash(), digest.getSizeBytes(), in);
93+
} catch (IOException e) {
94+
return Futures.immediateFailedFuture(e);
95+
}
96+
return Futures.immediateFuture(null);
97+
}
98+
99+
private void upload(String key, long length, InputStream in) throws IOException {
100+
byte[] value = ByteStreams.toByteArray(in);
101+
Preconditions.checkState(value.length == length);
102+
map.put(key, value);
103+
}
82104
}

src/main/java/com/google/devtools/build/lib/remote/common/BUILD

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ java_library(
1313
srcs = glob(["*.java"]),
1414
tags = ["bazel"],
1515
deps = [
16+
"//src/main/java/com/google/devtools/build/lib/vfs",
1617
"//third_party:guava",
18+
"//third_party/protobuf:protobuf_java",
19+
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
1720
],
1821
)

src/main/java/com/google/devtools/build/lib/remote/common/SimpleBlobStore.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414

1515
package com.google.devtools.build.lib.remote.common;
1616

17+
import build.bazel.remote.execution.v2.Digest;
1718
import com.google.common.util.concurrent.ListenableFuture;
19+
import com.google.devtools.build.lib.vfs.Path;
20+
import com.google.protobuf.ByteString;
1821
import java.io.IOException;
19-
import java.io.InputStream;
2022
import java.io.OutputStream;
2123

2224
/**
@@ -50,16 +52,25 @@ public interface SimpleBlobStore {
5052
*/
5153
ListenableFuture<Boolean> getActionResult(String actionKey, OutputStream out);
5254

53-
/**
54-
* Uploads a BLOB (as {@code in}) with length {@code length} indexed by {@code key} to the CAS.
55-
*
56-
* <p>The caller is responsible to close {@code in}.
57-
*/
58-
void put(String key, long length, InputStream in) throws IOException, InterruptedException;
59-
6055
/** Uploads a bytearray BLOB (as {@code in}) indexed by {@code key} to the Action Cache. */
6156
void putActionResult(String actionKey, byte[] in) throws IOException, InterruptedException;
6257

6358
/** Close resources associated with the blob store. */
6459
void close();
60+
61+
/**
62+
* Uploads a file.
63+
*
64+
* @param digest the digest of the file.
65+
* @param file the file to upload.
66+
*/
67+
ListenableFuture<Void> uploadFile(Digest digest, Path file);
68+
69+
/**
70+
* Uploads a BLOB.
71+
*
72+
* @param digest the digest of the blob.
73+
* @param data the blob to upload.
74+
*/
75+
ListenableFuture<Void> uploadBlob(Digest digest, ByteString data);
6576
}

src/main/java/com/google/devtools/build/lib/remote/disk/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,7 @@ java_library(
1818
"//src/main/java/com/google/devtools/build/lib/vfs",
1919
"//src/main/java/com/google/devtools/common/options",
2020
"//third_party:guava",
21+
"//third_party/protobuf:protobuf_java",
22+
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
2123
],
2224
)

src/main/java/com/google/devtools/build/lib/remote/disk/CombinedDiskHttpBlobStore.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,18 @@
1313
// limitations under the License.
1414
package com.google.devtools.build.lib.remote.disk;
1515

16+
import build.bazel.remote.execution.v2.Digest;
1617
import com.google.common.base.Preconditions;
1718
import com.google.common.util.concurrent.Futures;
1819
import com.google.common.util.concurrent.ListenableFuture;
1920
import com.google.common.util.concurrent.MoreExecutors;
2021
import com.google.devtools.build.lib.remote.common.SimpleBlobStore;
2122
import com.google.devtools.build.lib.vfs.Path;
23+
import com.google.protobuf.ByteString;
2224
import java.io.IOException;
23-
import java.io.InputStream;
2425
import java.io.OutputStream;
2526
import java.util.UUID;
27+
import java.util.concurrent.ExecutionException;
2628
import java.util.logging.Level;
2729
import java.util.logging.Logger;
2830

@@ -52,15 +54,6 @@ public boolean containsActionResult(String key) {
5254
return diskCache.containsActionResult(key);
5355
}
5456

55-
@Override
56-
public void put(String key, long length, InputStream in)
57-
throws IOException, InterruptedException {
58-
diskCache.put(key, length, in);
59-
try (InputStream inFile = diskCache.toPath(key, /* actionResult= */ false).getInputStream()) {
60-
remoteCache.put(key, length, inFile);
61-
}
62-
}
63-
6457
@Override
6558
public void putActionResult(String key, byte[] in) throws IOException, InterruptedException {
6659
diskCache.putActionResult(key, in);
@@ -73,6 +66,32 @@ public void close() {
7366
remoteCache.close();
7467
}
7568

69+
@Override
70+
public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
71+
try {
72+
diskCache.uploadFile(digest, file).get();
73+
remoteCache.uploadFile(digest, file).get();
74+
} catch (ExecutionException e) {
75+
return Futures.immediateFailedFuture(e.getCause());
76+
} catch (InterruptedException e) {
77+
return Futures.immediateFailedFuture(e);
78+
}
79+
return Futures.immediateFuture(null);
80+
}
81+
82+
@Override
83+
public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
84+
try {
85+
diskCache.uploadBlob(digest, data).get();
86+
remoteCache.uploadBlob(digest, data).get();
87+
} catch (ExecutionException e) {
88+
return Futures.immediateFailedFuture(e.getCause());
89+
} catch (InterruptedException e) {
90+
return Futures.immediateFailedFuture(e);
91+
}
92+
return Futures.immediateFuture(null);
93+
}
94+
7695
@Override
7796
public ListenableFuture<Boolean> get(String key, OutputStream out) {
7897
return get(key, out, /* actionResult= */ false);

0 commit comments

Comments
 (0)