Skip to content

Commit f923a5b

Browse files
benjaminpcopybara-github
authored andcommitted
Ensure Chunkers are always reset (closed).
Chunkers hold open files and memory. Normally, they are closed when the input is fully read. However, we want to make sure resources are freed in error cases, too. Closes #15416. PiperOrigin-RevId: 448212195
1 parent 24f962b commit f923a5b

2 files changed

Lines changed: 70 additions & 2 deletions

File tree

src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ private void releaseOut() {
450450
@Override
451451
public ListenableFuture<Void> uploadFile(
452452
RemoteActionExecutionContext context, Digest digest, Path path) {
453-
return uploader.uploadBlobAsync(
453+
return uploadChunker(
454454
context,
455455
digest,
456456
Chunker.builder()
@@ -462,12 +462,28 @@ public ListenableFuture<Void> uploadFile(
462462
@Override
463463
public ListenableFuture<Void> uploadBlob(
464464
RemoteActionExecutionContext context, Digest digest, ByteString data) {
465-
return uploader.uploadBlobAsync(
465+
return uploadChunker(
466466
context,
467467
digest,
468468
Chunker.builder()
469469
.setInput(data.toByteArray())
470470
.setCompressed(options.cacheCompression)
471471
.build());
472472
}
473+
474+
ListenableFuture<Void> uploadChunker(
475+
RemoteActionExecutionContext context, Digest digest, Chunker chunker) {
476+
ListenableFuture<Void> f = uploader.uploadBlobAsync(context, digest, chunker);
477+
f.addListener(
478+
() -> {
479+
try {
480+
chunker.reset();
481+
} catch (IOException e) {
482+
logger.atWarning().withCause(e).log(
483+
"failed to reset chunker uploading %s/%d", digest.getHash(), digest.getSizeBytes());
484+
}
485+
},
486+
MoreExecutors.directExecutor());
487+
return f;
488+
}
473489
}

src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,12 @@
7070
import io.grpc.Status;
7171
import io.grpc.stub.ServerCallStreamObserver;
7272
import io.grpc.stub.StreamObserver;
73+
import java.io.ByteArrayInputStream;
7374
import java.io.ByteArrayOutputStream;
7475
import java.io.IOException;
7576
import java.util.List;
7677
import java.util.Optional;
78+
import java.util.concurrent.ExecutionException;
7779
import java.util.concurrent.atomic.AtomicBoolean;
7880
import java.util.concurrent.atomic.AtomicInteger;
7981
import org.junit.Test;
@@ -182,6 +184,56 @@ public void read(ReadRequest request, StreamObserver<ReadResponse> responseObser
182184
assertThat(cancelled.get()).isTrue();
183185
}
184186

187+
@Test
188+
public void testChunkerResetAfterError() throws Exception {
189+
// arrange
190+
GrpcCacheClient client = newClient();
191+
serviceRegistry.addService(
192+
new ByteStreamImplBase() {
193+
@Override
194+
public StreamObserver<WriteRequest> write(
195+
StreamObserver<WriteResponse> responseObserver) {
196+
return new StreamObserver<WriteRequest>() {
197+
@Override
198+
public void onNext(WriteRequest request) {
199+
responseObserver.onError(Status.DATA_LOSS.asRuntimeException());
200+
}
201+
202+
@Override
203+
public void onCompleted() {}
204+
205+
@Override
206+
public void onError(Throwable t) {}
207+
};
208+
}
209+
});
210+
byte[] data = new byte[20];
211+
Digest digest = DIGEST_UTIL.compute(data);
212+
AtomicBoolean closed = new AtomicBoolean();
213+
Chunker chunker =
214+
new Chunker(
215+
() ->
216+
new ByteArrayInputStream(data) {
217+
218+
@Override
219+
public void close() throws IOException {
220+
super.close();
221+
closed.set(true);
222+
}
223+
},
224+
data.length,
225+
2,
226+
false);
227+
228+
// act
229+
Throwable t =
230+
assertThrows(ExecutionException.class, client.uploadChunker(context, digest, chunker)::get);
231+
232+
// assert
233+
assertThat(Status.fromThrowable(t.getCause()).getCode()).isEqualTo(Status.Code.DATA_LOSS);
234+
assertThat(closed.get()).isTrue();
235+
}
236+
185237
@Test
186238
public void testDownloadEmptyBlob() throws Exception {
187239
GrpcCacheClient client = newClient();

0 commit comments

Comments
 (0)