Skip to content

Commit 37ee252

Browse files
Googlercopybara-github
authored andcommitted
Remote: Use parameters instead of thread-local storage to provide tracing metadata. (Part 4)
Change RemoteCacheClient#upload{File,Blob} to use RemoteActionExecutionContext. PiperOrigin-RevId: 354472775
1 parent db69c9f commit 37ee252

25 files changed

Lines changed: 375 additions & 273 deletions

src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.google.devtools.build.lib.remote;
1515

1616
import build.bazel.remote.execution.v2.Digest;
17+
import build.bazel.remote.execution.v2.RequestMetadata;
1718
import com.google.common.base.Preconditions;
1819
import com.google.common.base.Strings;
1920
import com.google.common.collect.ImmutableList;
@@ -29,7 +30,11 @@
2930
import com.google.devtools.build.lib.buildeventstream.PathConverter;
3031
import com.google.devtools.build.lib.collect.ImmutableIterable;
3132
import com.google.devtools.build.lib.remote.common.MissingDigestsFinder;
33+
import com.google.devtools.build.lib.remote.common.NetworkTime;
34+
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
35+
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
3236
import com.google.devtools.build.lib.remote.util.DigestUtil;
37+
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
3338
import com.google.devtools.build.lib.vfs.Path;
3439
import io.grpc.Context;
3540
import io.netty.util.AbstractReferenceCounted;
@@ -50,7 +55,8 @@ class ByteStreamBuildEventArtifactUploader extends AbstractReferenceCounted
5055
implements BuildEventArtifactUploader {
5156

5257
private final ListeningExecutorService uploadExecutor;
53-
private final Context ctx;
58+
private final String buildRequestId;
59+
private final String commandId;
5460
private final ByteStreamUploader uploader;
5561
private final String remoteServerInstanceName;
5662
private final MissingDigestsFinder missingDigestsFinder;
@@ -61,15 +67,17 @@ class ByteStreamBuildEventArtifactUploader extends AbstractReferenceCounted
6167
ByteStreamUploader uploader,
6268
MissingDigestsFinder missingDigestsFinder,
6369
String remoteServerName,
64-
Context ctx,
70+
String buildRequestId,
71+
String commandId,
6572
@Nullable String remoteInstanceName,
6673
int maxUploadThreads) {
6774
this.uploader = Preconditions.checkNotNull(uploader);
6875
String remoteServerInstanceName = Preconditions.checkNotNull(remoteServerName);
6976
if (!Strings.isNullOrEmpty(remoteInstanceName)) {
7077
remoteServerInstanceName += "/" + remoteInstanceName;
7178
}
72-
this.ctx = ctx;
79+
this.buildRequestId = buildRequestId;
80+
this.commandId = commandId;
7381
this.remoteServerInstanceName = remoteServerInstanceName;
7482
// Limit the maximum threads number to 1000 (chosen arbitrarily)
7583
this.uploadExecutor =
@@ -153,6 +161,8 @@ private static List<PathMetadata> processQueryResult(
153161
*/
154162
private ListenableFuture<ImmutableIterable<PathMetadata>> queryRemoteCache(
155163
ImmutableList<ListenableFuture<PathMetadata>> allPaths) throws Exception {
164+
Context ctx = TracingMetadataUtils.contextWithMetadata(buildRequestId, commandId, "bes-upload");
165+
156166
List<PathMetadata> knownRemotePaths = new ArrayList<>(allPaths.size());
157167
List<PathMetadata> filesToQuery = new ArrayList<>();
158168
Set<Digest> digestsToQuery = new HashSet<>();
@@ -185,19 +195,20 @@ private ListenableFuture<ImmutableIterable<PathMetadata>> queryRemoteCache(
185195
*/
186196
private ListenableFuture<List<PathMetadata>> uploadLocalFiles(
187197
ImmutableIterable<PathMetadata> allPaths) {
198+
RequestMetadata metadata =
199+
TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "bes-upload");
200+
RemoteActionExecutionContext context =
201+
new RemoteActionExecutionContextImpl(metadata, new NetworkTime());
202+
188203
ImmutableList.Builder<ListenableFuture<PathMetadata>> allPathsUploaded =
189204
ImmutableList.builder();
190205
for (PathMetadata path : allPaths) {
191206
if (!path.isRemote() && !path.isDirectory()) {
192207
Chunker chunker =
193208
Chunker.builder().setInput(path.getDigest().getSizeBytes(), path.getPath()).build();
194209
final ListenableFuture<Void> upload;
195-
Context prevCtx = ctx.attach();
196-
try {
197-
upload = uploader.uploadBlobAsync(path.getDigest(), chunker, /* forceUpload=*/ false);
198-
} finally {
199-
ctx.detach(prevCtx);
200-
}
210+
upload =
211+
uploader.uploadBlobAsync(context, path.getDigest(), chunker, /* forceUpload= */ false);
201212
allPathsUploaded.add(Futures.transform(upload, unused -> path, uploadExecutor));
202213
} else {
203214
allPathsUploaded.add(Futures.immediateFuture(path));

src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import com.google.devtools.build.lib.remote.options.RemoteOptions;
1919
import com.google.devtools.build.lib.runtime.BuildEventArtifactUploaderFactory;
2020
import com.google.devtools.build.lib.runtime.CommandEnvironment;
21-
import io.grpc.Context;
2221
import javax.annotation.Nullable;
2322

2423
/**
@@ -29,20 +28,23 @@ class ByteStreamBuildEventArtifactUploaderFactory implements
2928

3029
private final ByteStreamUploader uploader;
3130
private final String remoteServerName;
32-
private final Context ctx;
31+
private final String buildRequestId;
32+
private final String commandId;
3333
private final MissingDigestsFinder missingDigestsFinder;
3434
@Nullable private final String remoteInstanceName;
3535

3636
ByteStreamBuildEventArtifactUploaderFactory(
3737
ByteStreamUploader uploader,
3838
MissingDigestsFinder missingDigestsFinder,
3939
String remoteServerName,
40-
Context ctx,
40+
String buildRequestId,
41+
String commandId,
4142
@Nullable String remoteInstanceName) {
4243
this.uploader = uploader;
4344
this.missingDigestsFinder = missingDigestsFinder;
4445
this.remoteServerName = remoteServerName;
45-
this.ctx = ctx;
46+
this.buildRequestId = buildRequestId;
47+
this.commandId = commandId;
4648
this.remoteInstanceName = remoteInstanceName;
4749
}
4850

@@ -52,7 +54,8 @@ public BuildEventArtifactUploader create(CommandEnvironment env) {
5254
uploader.retain(),
5355
missingDigestsFinder,
5456
remoteServerName,
55-
ctx,
57+
buildRequestId,
58+
commandId,
5659
remoteInstanceName,
5760
env.getOptions().getOptions(RemoteOptions.class).buildEventUploadMaxThreads);
5861
}

src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,12 @@
3636
import com.google.common.util.concurrent.SettableFuture;
3737
import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
3838
import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff;
39+
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
3940
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
4041
import com.google.devtools.build.lib.remote.util.Utils;
4142
import io.grpc.CallOptions;
4243
import io.grpc.Channel;
4344
import io.grpc.ClientCall;
44-
import io.grpc.Context;
4545
import io.grpc.Metadata;
4646
import io.grpc.Status;
4747
import io.grpc.Status.Code;
@@ -134,9 +134,10 @@ public ByteStreamUploader(
134134
* uploaded, if {@code true} the blob is uploaded.
135135
* @throws IOException when reading of the {@link Chunker}s input source fails
136136
*/
137-
public void uploadBlob(HashCode hash, Chunker chunker, boolean forceUpload)
137+
public void uploadBlob(
138+
RemoteActionExecutionContext context, HashCode hash, Chunker chunker, boolean forceUpload)
138139
throws IOException, InterruptedException {
139-
uploadBlobs(singletonMap(hash, chunker), forceUpload);
140+
uploadBlobs(context, singletonMap(hash, chunker), forceUpload);
140141
}
141142

142143
/**
@@ -156,12 +157,14 @@ public void uploadBlob(HashCode hash, Chunker chunker, boolean forceUpload)
156157
* uploaded, if {@code true} the blob is uploaded.
157158
* @throws IOException when reading of the {@link Chunker}s input source or uploading fails
158159
*/
159-
public void uploadBlobs(Map<HashCode, Chunker> chunkers, boolean forceUpload)
160+
public void uploadBlobs(
161+
RemoteActionExecutionContext context, Map<HashCode, Chunker> chunkers, boolean forceUpload)
160162
throws IOException, InterruptedException {
161163
List<ListenableFuture<Void>> uploads = new ArrayList<>();
162164

163165
for (Map.Entry<HashCode, Chunker> chunkerEntry : chunkers.entrySet()) {
164-
uploads.add(uploadBlobAsync(chunkerEntry.getKey(), chunkerEntry.getValue(), forceUpload));
166+
uploads.add(
167+
uploadBlobAsync(context, chunkerEntry.getKey(), chunkerEntry.getValue(), forceUpload));
165168
}
166169

167170
try {
@@ -200,14 +203,17 @@ void shutdown() {
200203
}
201204
}
202205

203-
/** @deprecated Use {@link #uploadBlobAsync(Digest, Chunker, boolean)} instead. */
206+
/**
207+
* @deprecated Use {@link #uploadBlobAsync(RemoteActionExecutionContext, Digest, Chunker,
208+
* boolean)} instead.
209+
*/
204210
@Deprecated
205211
@VisibleForTesting
206212
public ListenableFuture<Void> uploadBlobAsync(
207-
HashCode hash, Chunker chunker, boolean forceUpload) {
213+
RemoteActionExecutionContext context, HashCode hash, Chunker chunker, boolean forceUpload) {
208214
Digest digest =
209215
Digest.newBuilder().setHash(hash.toString()).setSizeBytes(chunker.getSize()).build();
210-
return uploadBlobAsync(digest, chunker, forceUpload);
216+
return uploadBlobAsync(context, digest, chunker, forceUpload);
211217
}
212218

213219
/**
@@ -227,7 +233,7 @@ public ListenableFuture<Void> uploadBlobAsync(
227233
* @throws IOException when reading of the {@link Chunker}s input source fails
228234
*/
229235
public ListenableFuture<Void> uploadBlobAsync(
230-
Digest digest, Chunker chunker, boolean forceUpload) {
236+
RemoteActionExecutionContext context, Digest digest, Chunker chunker, boolean forceUpload) {
231237
synchronized (lock) {
232238
checkState(!isShutdown, "Must not call uploadBlobs after shutdown.");
233239

@@ -242,7 +248,7 @@ public ListenableFuture<Void> uploadBlobAsync(
242248

243249
ListenableFuture<Void> uploadResult =
244250
Futures.transform(
245-
startAsyncUpload(digest, chunker),
251+
startAsyncUpload(context, digest, chunker),
246252
(v) -> {
247253
synchronized (lock) {
248254
uploadedBlobs.add(HashCode.fromString(digest.getHash()));
@@ -294,7 +300,8 @@ private static String buildUploadResourceName(String instanceName, UUID uuid, Di
294300
}
295301

296302
/** Starts a file upload and returns a future representing the upload. */
297-
private ListenableFuture<Void> startAsyncUpload(Digest digest, Chunker chunker) {
303+
private ListenableFuture<Void> startAsyncUpload(
304+
RemoteActionExecutionContext context, Digest digest, Chunker chunker) {
298305
try {
299306
chunker.reset();
300307
} catch (IOException e) {
@@ -313,7 +320,13 @@ private ListenableFuture<Void> startAsyncUpload(Digest digest, Chunker chunker)
313320
String resourceName = buildUploadResourceName(instanceName, uploadId, digest);
314321
AsyncUpload newUpload =
315322
new AsyncUpload(
316-
channel, callCredentialsProvider, callTimeoutSecs, retrier, resourceName, chunker);
323+
context,
324+
channel,
325+
callCredentialsProvider,
326+
callTimeoutSecs,
327+
retrier,
328+
resourceName,
329+
chunker);
317330
ListenableFuture<Void> currUpload = newUpload.start();
318331
currUpload.addListener(
319332
() -> {
@@ -348,6 +361,7 @@ public ReferenceCounted touch(Object o) {
348361

349362
private static class AsyncUpload {
350363

364+
private final RemoteActionExecutionContext context;
351365
private final Channel channel;
352366
private final CallCredentialsProvider callCredentialsProvider;
353367
private final long callTimeoutSecs;
@@ -358,12 +372,14 @@ private static class AsyncUpload {
358372
private ClientCall<WriteRequest, WriteResponse> call;
359373

360374
AsyncUpload(
375+
RemoteActionExecutionContext context,
361376
Channel channel,
362377
CallCredentialsProvider callCredentialsProvider,
363378
long callTimeoutSecs,
364379
Retrier retrier,
365380
String resourceName,
366381
Chunker chunker) {
382+
this.context = context;
367383
this.channel = channel;
368384
this.callCredentialsProvider = callCredentialsProvider;
369385
this.callTimeoutSecs = callTimeoutSecs;
@@ -373,7 +389,6 @@ private static class AsyncUpload {
373389
}
374390

375391
ListenableFuture<Void> start() {
376-
Context ctx = Context.current();
377392
ProgressiveBackoff progressiveBackoff = new ProgressiveBackoff(retrier::newBackoff);
378393
AtomicLong committedOffset = new AtomicLong(0);
379394

@@ -383,8 +398,7 @@ ListenableFuture<Void> start() {
383398
retrier.executeAsync(
384399
() -> {
385400
if (committedOffset.get() < chunker.getSize()) {
386-
return ctx.call(
387-
() -> callAndQueryOnFailure(committedOffset, progressiveBackoff));
401+
return callAndQueryOnFailure(committedOffset, progressiveBackoff);
388402
}
389403
return Futures.immediateFuture(null);
390404
},
@@ -409,7 +423,8 @@ ListenableFuture<Void> start() {
409423

410424
private ByteStreamFutureStub bsFutureStub() {
411425
return ByteStreamGrpc.newFutureStub(channel)
412-
.withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
426+
.withInterceptors(
427+
TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()))
413428
.withCallCredentials(callCredentialsProvider.getCallCredentials())
414429
.withDeadlineAfter(callTimeoutSecs, SECONDS);
415430
}
@@ -420,7 +435,7 @@ private ListenableFuture<Void> callAndQueryOnFailure(
420435
call(committedOffset),
421436
Exception.class,
422437
(e) -> guardQueryWithSuppression(e, committedOffset, progressiveBackoff),
423-
Context.current().fixedContextExecutor(MoreExecutors.directExecutor()));
438+
MoreExecutors.directExecutor());
424439
}
425440

426441
private ListenableFuture<Void> guardQueryWithSuppression(
@@ -584,7 +599,9 @@ public void onReady() {
584599
}
585600
}
586601
};
587-
call.start(callListener, TracingMetadataUtils.headersFromCurrentContext());
602+
call.start(
603+
callListener,
604+
TracingMetadataUtils.headersFromRequestMetadata(context.getRequestMetadata()));
588605
call.request(1);
589606
return uploadResult;
590607
}

src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -393,16 +393,22 @@ public void onCompleted() {
393393
}
394394

395395
@Override
396-
public ListenableFuture<Void> uploadFile(Digest digest, Path path) {
396+
public ListenableFuture<Void> uploadFile(
397+
RemoteActionExecutionContext context, Digest digest, Path path) {
397398
return uploader.uploadBlobAsync(
399+
context,
398400
digest,
399401
Chunker.builder().setInput(digest.getSizeBytes(), path).build(),
400402
/* forceUpload= */ true);
401403
}
402404

403405
@Override
404-
public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
406+
public ListenableFuture<Void> uploadBlob(
407+
RemoteActionExecutionContext context, Digest digest, ByteString data) {
405408
return uploader.uploadBlobAsync(
406-
digest, Chunker.builder().setInput(data.toByteArray()).build(), /* forceUpload= */ true);
409+
context,
410+
digest,
411+
Chunker.builder().setInput(data.toByteArray()).build(),
412+
/* forceUpload= */ true);
407413
}
408414
}

src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public ActionResult upload(
139139
int exitCode)
140140
throws ExecException, IOException, InterruptedException {
141141
ActionResult.Builder resultBuilder = ActionResult.newBuilder();
142-
uploadOutputs(execRoot, actionKey, action, command, outputs, outErr, resultBuilder);
142+
uploadOutputs(context, execRoot, actionKey, action, command, outputs, outErr, resultBuilder);
143143
resultBuilder.setExitCode(exitCode);
144144
ActionResult result = resultBuilder.build();
145145
if (exitCode == 0 && !action.getDoNotCache()) {
@@ -162,6 +162,7 @@ public ActionResult upload(
162162
}
163163

164164
private void uploadOutputs(
165+
RemoteActionExecutionContext context,
165166
Path execRoot,
166167
ActionKey actionKey,
167168
Action action,
@@ -192,14 +193,14 @@ private void uploadOutputs(
192193
for (Digest digest : digestsToUpload) {
193194
Path file = digestToFile.get(digest);
194195
if (file != null) {
195-
uploads.add(cacheProtocol.uploadFile(digest, file));
196+
uploads.add(cacheProtocol.uploadFile(context, digest, file));
196197
} else {
197198
ByteString blob = digestToBlobs.get(digest);
198199
if (blob == null) {
199200
String message = "FindMissingBlobs call returned an unknown digest: " + digest;
200201
throw new IOException(message);
201202
}
202-
uploads.add(cacheProtocol.uploadBlob(digest, blob));
203+
uploads.add(cacheProtocol.uploadBlob(context, digest, blob));
203204
}
204205
}
205206

0 commit comments

Comments
 (0)