Skip to content

Commit 72d5255

Browse files
benjaminpcopybara-github
authored andcommitted
Stop remote blob upload if upload is complete.
If a ByteStream/Write RPC fails, but ByteStream/QueryWriteStatus reveals the upload is in fact complete, avoid a NullPointerException. This CL is the dual fix of 78b89a0 for uploads. On bazel-6.0.0-pre.20211117.1, I observed: ``` java.lang.NullPointerException at com.google.devtools.build.lib.remote.Chunker.seek(Chunker.java:156) at com.google.devtools.build.lib.remote.ByteStreamUploader$AsyncUpload.lambda$start$0(ByteStreamUploader.java:416) at com.google.devtools.build.lib.remote.Retrier.executeAsync(Retrier.java:277) at com.google.devtools.build.lib.remote.Retrier.lambda$onExecuteAsyncFailure$1(Retrier.java:293) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleAsyncTask.runInterruptibly(TrustedListenableFutureTask.java:160) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleAsyncTask.runInterruptibly(TrustedListenableFutureTask.java:143) at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69) at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69) at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ``` Closes #14464. PiperOrigin-RevId: 417795715
1 parent 1c95255 commit 72d5255

File tree

2 files changed

+68
-5
lines changed

2 files changed

+68
-5
lines changed

src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ public ListenableFuture<Void> uploadBlobAsync(
253253
checkState(!isShutdown, "Must not call uploadBlobs after shutdown.");
254254

255255
if (!forceUpload && uploadedBlobs.contains(HashCode.fromString(digest.getHash()))) {
256-
return Futures.immediateFuture(null);
256+
return immediateVoidFuture();
257257
}
258258

259259
ListenableFuture<Void> inProgress = uploadsInProgress.get(digest);
@@ -424,7 +424,7 @@ ListenableFuture<Void> start() {
424424
() ->
425425
retrier.executeAsync(
426426
() -> {
427-
if (chunker.getSize() == 0) {
427+
if (chunker.getSize() == committedOffset.get()) {
428428
return immediateVoidFuture();
429429
}
430430
try {
@@ -452,7 +452,7 @@ ListenableFuture<Void> start() {
452452
if (chunker.hasNext()) {
453453
return callAndQueryOnFailure(committedOffset, progressiveBackoff);
454454
}
455-
return Futures.immediateFuture(null);
455+
return immediateVoidFuture();
456456
},
457457
progressiveBackoff),
458458
callCredentialsProvider);
@@ -476,7 +476,7 @@ ListenableFuture<Void> start() {
476476
return Futures.immediateFailedFuture(new IOException(message));
477477
}
478478
}
479-
return Futures.immediateFuture(null);
479+
return immediateVoidFuture();
480480
},
481481
MoreExecutors.directExecutor());
482482
}
@@ -564,7 +564,7 @@ private ListenableFuture<Void> query(
564564
progressiveBackoff.reset();
565565
}
566566
committedOffset.set(committedSize);
567-
return Futures.immediateFuture(null);
567+
return immediateVoidFuture();
568568
},
569569
MoreExecutors.directExecutor());
570570
}

src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1605,6 +1605,69 @@ public void onCompleted() {
16051605
blockUntilInternalStateConsistent(uploader);
16061606
}
16071607

1608+
@Test
1609+
public void failureAfterUploadCompletes() throws Exception {
1610+
AtomicInteger numUploads = new AtomicInteger();
1611+
RemoteRetrier retrier =
1612+
TestUtils.newRemoteRetrier(
1613+
() -> mockBackoff, e -> e instanceof StatusRuntimeException, retryService);
1614+
ByteStreamUploader uploader =
1615+
new ByteStreamUploader(
1616+
INSTANCE_NAME,
1617+
new ReferenceCountedChannel(channelConnectionFactory),
1618+
CallCredentialsProvider.NO_CREDENTIALS,
1619+
/* callTimeoutSecs= */ 60,
1620+
retrier,
1621+
-1);
1622+
1623+
byte[] blob = new byte[CHUNK_SIZE - 1];
1624+
new Random().nextBytes(blob);
1625+
1626+
serviceRegistry.addService(
1627+
new ByteStreamImplBase() {
1628+
@Override
1629+
public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
1630+
numUploads.incrementAndGet();
1631+
return new StreamObserver<WriteRequest>() {
1632+
@Override
1633+
public void onNext(WriteRequest writeRequest) {}
1634+
1635+
@Override
1636+
public void onError(Throwable throwable) {
1637+
fail("onError should never be called.");
1638+
}
1639+
1640+
@Override
1641+
public void onCompleted() {
1642+
streamObserver.onNext(
1643+
WriteResponse.newBuilder().setCommittedSize(blob.length).build());
1644+
streamObserver.onError(Status.UNAVAILABLE.asException());
1645+
}
1646+
};
1647+
}
1648+
1649+
@Override
1650+
public void queryWriteStatus(
1651+
QueryWriteStatusRequest request, StreamObserver<QueryWriteStatusResponse> response) {
1652+
response.onNext(
1653+
QueryWriteStatusResponse.newBuilder()
1654+
.setCommittedSize(blob.length)
1655+
.setComplete(true)
1656+
.build());
1657+
response.onCompleted();
1658+
}
1659+
});
1660+
1661+
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
1662+
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
1663+
1664+
uploader.uploadBlob(context, hash, chunker, true);
1665+
1666+
blockUntilInternalStateConsistent(uploader);
1667+
1668+
assertThat(numUploads.get()).isEqualTo(1);
1669+
}
1670+
16081671
@Test
16091672
public void testCompressedUploads() throws Exception {
16101673
RemoteRetrier retrier =

0 commit comments

Comments
 (0)