Skip to content

Commit 5df79b6

Browse files
committed
Ensure Chunkers are always reset (closed).
Chunkers hold open files and memory. Normally, they are closed when the input is fully read. However, we want to make sure resources are freed in error cases, too.
1 parent 22db69a commit 5df79b6

2 files changed

Lines changed: 66 additions & 2 deletions

File tree

src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ private void releaseOut() {
444444
@Override
445445
public ListenableFuture<Void> uploadFile(
446446
RemoteActionExecutionContext context, Digest digest, Path path) {
447-
return uploader.uploadBlobAsync(
447+
return uploadChunker(
448448
context,
449449
digest,
450450
Chunker.builder()
@@ -456,12 +456,28 @@ public ListenableFuture<Void> uploadFile(
456456
@Override
457457
public ListenableFuture<Void> uploadBlob(
458458
RemoteActionExecutionContext context, Digest digest, ByteString data) {
459-
return uploader.uploadBlobAsync(
459+
return uploadChunker(
460460
context,
461461
digest,
462462
Chunker.builder()
463463
.setInput(data.toByteArray())
464464
.setCompressed(options.cacheCompression)
465465
.build());
466466
}
467+
468+
ListenableFuture<Void> uploadChunker(
469+
RemoteActionExecutionContext context, Digest digest, Chunker chunker) {
470+
ListenableFuture<Void> f = uploader.uploadBlobAsync(context, digest, chunker);
471+
f.addListener(
472+
() -> {
473+
try {
474+
chunker.reset();
475+
} catch (IOException e) {
476+
logger.atWarning().withCause(e).log(
477+
"failed to reset chunker uploading %s/%d", digest.getHash(), digest.getSizeBytes());
478+
}
479+
},
480+
MoreExecutors.directExecutor());
481+
return f;
482+
}
467483
}

src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,12 @@
6868
import io.grpc.ServerInterceptors;
6969
import io.grpc.Status;
7070
import io.grpc.stub.StreamObserver;
71+
import java.io.ByteArrayInputStream;
7172
import java.io.IOException;
7273
import java.util.List;
7374
import java.util.Optional;
75+
import java.util.concurrent.CountDownLatch;
76+
import java.util.concurrent.ExecutionException;
7477
import java.util.concurrent.atomic.AtomicBoolean;
7578
import java.util.concurrent.atomic.AtomicInteger;
7679
import org.junit.Test;
@@ -151,6 +154,51 @@ public void onError(Throwable t) {
151154
client.ensureInputsPresent(context, merkleTree, ImmutableMap.of(), /*force=*/ true);
152155
}
153156

157+
@Test
158+
public void testChunkerResetAfterError() throws Exception {
159+
GrpcCacheClient client = newClient();
160+
serviceRegistry.addService(
161+
new ByteStreamImplBase() {
162+
@Override
163+
public StreamObserver<WriteRequest> write(
164+
StreamObserver<WriteResponse> responseObserver) {
165+
return new StreamObserver<WriteRequest>() {
166+
@Override
167+
public void onNext(WriteRequest request) {
168+
responseObserver.onError(Status.DATA_LOSS.asRuntimeException());
169+
}
170+
171+
@Override
172+
public void onCompleted() {}
173+
174+
@Override
175+
public void onError(Throwable t) {}
176+
};
177+
}
178+
});
179+
byte[] data = new byte[20];
180+
Digest digest = DIGEST_UTIL.compute(data);
181+
CountDownLatch latch = new CountDownLatch(1);
182+
Chunker chunker =
183+
new Chunker(
184+
() ->
185+
new ByteArrayInputStream(data) {
186+
187+
@Override
188+
public void close() throws IOException {
189+
super.close();
190+
latch.countDown();
191+
}
192+
},
193+
data.length,
194+
2,
195+
false);
196+
Throwable t =
197+
assertThrows(ExecutionException.class, client.uploadChunker(context, digest, chunker)::get);
198+
assertThat(Status.fromThrowable(t.getCause()).getCode()).isEqualTo(Status.Code.DATA_LOSS);
199+
latch.await();
200+
}
201+
154202
@Test
155203
public void testDownloadEmptyBlob() throws Exception {
156204
GrpcCacheClient client = newClient();

0 commit comments

Comments
 (0)