3636import com .google .common .util .concurrent .SettableFuture ;
3737import com .google .devtools .build .lib .authandtls .CallCredentialsProvider ;
3838import com .google .devtools .build .lib .remote .RemoteRetrier .ProgressiveBackoff ;
39+ import com .google .devtools .build .lib .remote .common .RemoteActionExecutionContext ;
3940import com .google .devtools .build .lib .remote .util .TracingMetadataUtils ;
4041import com .google .devtools .build .lib .remote .util .Utils ;
4142import io .grpc .CallOptions ;
4243import io .grpc .Channel ;
4344import io .grpc .ClientCall ;
44- import io .grpc .Context ;
4545import io .grpc .Metadata ;
4646import io .grpc .Status ;
4747import io .grpc .Status .Code ;
@@ -134,9 +134,10 @@ public ByteStreamUploader(
134134 * uploaded, if {@code true} the blob is uploaded.
135135 * @throws IOException when reading of the {@link Chunker}s input source fails
136136 */
137- public void uploadBlob (HashCode hash , Chunker chunker , boolean forceUpload )
137+ public void uploadBlob (
138+ RemoteActionExecutionContext context , HashCode hash , Chunker chunker , boolean forceUpload )
138139 throws IOException , InterruptedException {
139- uploadBlobs (singletonMap (hash , chunker ), forceUpload );
140+ uploadBlobs (context , singletonMap (hash , chunker ), forceUpload );
140141 }
141142
142143 /**
@@ -156,12 +157,14 @@ public void uploadBlob(HashCode hash, Chunker chunker, boolean forceUpload)
156157 * uploaded, if {@code true} the blob is uploaded.
157158 * @throws IOException when reading of the {@link Chunker}s input source or uploading fails
158159 */
159- public void uploadBlobs (Map <HashCode , Chunker > chunkers , boolean forceUpload )
160+ public void uploadBlobs (
161+ RemoteActionExecutionContext context , Map <HashCode , Chunker > chunkers , boolean forceUpload )
160162 throws IOException , InterruptedException {
161163 List <ListenableFuture <Void >> uploads = new ArrayList <>();
162164
163165 for (Map .Entry <HashCode , Chunker > chunkerEntry : chunkers .entrySet ()) {
164- uploads .add (uploadBlobAsync (chunkerEntry .getKey (), chunkerEntry .getValue (), forceUpload ));
166+ uploads .add (
167+ uploadBlobAsync (context , chunkerEntry .getKey (), chunkerEntry .getValue (), forceUpload ));
165168 }
166169
167170 try {
@@ -200,14 +203,17 @@ void shutdown() {
200203 }
201204 }
202205
203- /** @deprecated Use {@link #uploadBlobAsync(Digest, Chunker, boolean)} instead. */
206+ /**
207+ * @deprecated Use {@link #uploadBlobAsync(RemoteActionExecutionContext, Digest, Chunker,
208+ * boolean)} instead.
209+ */
204210 @ Deprecated
205211 @ VisibleForTesting
206212 public ListenableFuture <Void > uploadBlobAsync (
207- HashCode hash , Chunker chunker , boolean forceUpload ) {
213+ RemoteActionExecutionContext context , HashCode hash , Chunker chunker , boolean forceUpload ) {
208214 Digest digest =
209215 Digest .newBuilder ().setHash (hash .toString ()).setSizeBytes (chunker .getSize ()).build ();
210- return uploadBlobAsync (digest , chunker , forceUpload );
216+ return uploadBlobAsync (context , digest , chunker , forceUpload );
211217 }
212218
213219 /**
@@ -227,7 +233,7 @@ public ListenableFuture<Void> uploadBlobAsync(
227233 * @throws IOException when reading of the {@link Chunker}s input source fails
228234 */
229235 public ListenableFuture <Void > uploadBlobAsync (
230- Digest digest , Chunker chunker , boolean forceUpload ) {
236+ RemoteActionExecutionContext context , Digest digest , Chunker chunker , boolean forceUpload ) {
231237 synchronized (lock ) {
232238 checkState (!isShutdown , "Must not call uploadBlobs after shutdown." );
233239
@@ -242,7 +248,7 @@ public ListenableFuture<Void> uploadBlobAsync(
242248
243249 ListenableFuture <Void > uploadResult =
244250 Futures .transform (
245- startAsyncUpload (digest , chunker ),
251+ startAsyncUpload (context , digest , chunker ),
246252 (v ) -> {
247253 synchronized (lock ) {
248254 uploadedBlobs .add (HashCode .fromString (digest .getHash ()));
@@ -294,7 +300,8 @@ private static String buildUploadResourceName(String instanceName, UUID uuid, Di
294300 }
295301
296302 /** Starts a file upload and returns a future representing the upload. */
297- private ListenableFuture <Void > startAsyncUpload (Digest digest , Chunker chunker ) {
303+ private ListenableFuture <Void > startAsyncUpload (
304+ RemoteActionExecutionContext context , Digest digest , Chunker chunker ) {
298305 try {
299306 chunker .reset ();
300307 } catch (IOException e ) {
@@ -313,7 +320,13 @@ private ListenableFuture<Void> startAsyncUpload(Digest digest, Chunker chunker)
313320 String resourceName = buildUploadResourceName (instanceName , uploadId , digest );
314321 AsyncUpload newUpload =
315322 new AsyncUpload (
316- channel , callCredentialsProvider , callTimeoutSecs , retrier , resourceName , chunker );
323+ context ,
324+ channel ,
325+ callCredentialsProvider ,
326+ callTimeoutSecs ,
327+ retrier ,
328+ resourceName ,
329+ chunker );
317330 ListenableFuture <Void > currUpload = newUpload .start ();
318331 currUpload .addListener (
319332 () -> {
@@ -348,6 +361,7 @@ public ReferenceCounted touch(Object o) {
348361
349362 private static class AsyncUpload {
350363
364+ private final RemoteActionExecutionContext context ;
351365 private final Channel channel ;
352366 private final CallCredentialsProvider callCredentialsProvider ;
353367 private final long callTimeoutSecs ;
@@ -358,12 +372,14 @@ private static class AsyncUpload {
358372 private ClientCall <WriteRequest , WriteResponse > call ;
359373
360374 AsyncUpload (
375+ RemoteActionExecutionContext context ,
361376 Channel channel ,
362377 CallCredentialsProvider callCredentialsProvider ,
363378 long callTimeoutSecs ,
364379 Retrier retrier ,
365380 String resourceName ,
366381 Chunker chunker ) {
382+ this .context = context ;
367383 this .channel = channel ;
368384 this .callCredentialsProvider = callCredentialsProvider ;
369385 this .callTimeoutSecs = callTimeoutSecs ;
@@ -373,7 +389,6 @@ private static class AsyncUpload {
373389 }
374390
375391 ListenableFuture <Void > start () {
376- Context ctx = Context .current ();
377392 ProgressiveBackoff progressiveBackoff = new ProgressiveBackoff (retrier ::newBackoff );
378393 AtomicLong committedOffset = new AtomicLong (0 );
379394
@@ -383,8 +398,7 @@ ListenableFuture<Void> start() {
383398 retrier .executeAsync (
384399 () -> {
385400 if (committedOffset .get () < chunker .getSize ()) {
386- return ctx .call (
387- () -> callAndQueryOnFailure (committedOffset , progressiveBackoff ));
401+ return callAndQueryOnFailure (committedOffset , progressiveBackoff );
388402 }
389403 return Futures .immediateFuture (null );
390404 },
@@ -409,7 +423,8 @@ ListenableFuture<Void> start() {
409423
410424 private ByteStreamFutureStub bsFutureStub () {
411425 return ByteStreamGrpc .newFutureStub (channel )
412- .withInterceptors (TracingMetadataUtils .attachMetadataFromContextInterceptor ())
426+ .withInterceptors (
427+ TracingMetadataUtils .attachMetadataInterceptor (context .getRequestMetadata ()))
413428 .withCallCredentials (callCredentialsProvider .getCallCredentials ())
414429 .withDeadlineAfter (callTimeoutSecs , SECONDS );
415430 }
@@ -420,7 +435,7 @@ private ListenableFuture<Void> callAndQueryOnFailure(
420435 call (committedOffset ),
421436 Exception .class ,
422437 (e ) -> guardQueryWithSuppression (e , committedOffset , progressiveBackoff ),
423- Context . current (). fixedContextExecutor ( MoreExecutors .directExecutor () ));
438+ MoreExecutors .directExecutor ());
424439 }
425440
426441 private ListenableFuture <Void > guardQueryWithSuppression (
@@ -584,7 +599,9 @@ public void onReady() {
584599 }
585600 }
586601 };
587- call .start (callListener , TracingMetadataUtils .headersFromCurrentContext ());
602+ call .start (
603+ callListener ,
604+ TracingMetadataUtils .headersFromRequestMetadata (context .getRequestMetadata ()));
588605 call .request (1 );
589606 return uploadResult ;
590607 }
0 commit comments