Skip to content

Commit 4c4dccc

Browse files
Fixing retries of Storage.create (#2359)
1 parent afe0a23 commit 4c4dccc

3 files changed

Lines changed: 114 additions & 10 deletions

File tree

google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1433,6 +1433,9 @@ public static Builder newBuilder() {
14331433
* {@code BlobWriteOption.md5Match} and {@code BlobWriteOption.crc32cMatch} options. The given
14341434
* input stream is closed upon success.
14351435
*
1436+
* <p>This method is marked as {@link Deprecated} because it cannot safely retry, given that it
1437+
* accepts an {@link InputStream} which can only be consumed once.
1438+
*
14361439
* <p>Example of creating a blob from an input stream.
14371440
* <pre> {@code
14381441
* String bucketName = "my_unique_bucket";
@@ -1460,6 +1463,7 @@ public static Builder newBuilder() {
14601463
* @return a [@code Blob} with complete information
14611464
* @throws StorageException upon failure
14621465
*/
1466+
@Deprecated
14631467
Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options);
14641468

14651469
/**

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import com.google.cloud.storage.spi.v1.StorageRpc;
5252
import com.google.cloud.storage.spi.v1.StorageRpc.RewriteResponse;
5353
import com.google.common.base.Function;
54+
import com.google.common.base.Preconditions;
5455
import com.google.common.collect.ImmutableList;
5556
import com.google.common.collect.ImmutableMap;
5657
import com.google.common.collect.ImmutableSet;
@@ -121,7 +122,7 @@ public Blob create(BlobInfo blobInfo, BlobTargetOption... options) {
121122
.setMd5(EMPTY_BYTE_ARRAY_MD5)
122123
.setCrc32c(EMPTY_BYTE_ARRAY_CRC32C)
123124
.build();
124-
return create(updatedInfo, new ByteArrayInputStream(EMPTY_BYTE_ARRAY), options);
125+
return internalCreate(updatedInfo, EMPTY_BYTE_ARRAY, options);
125126
}
126127

127128
@Override
@@ -132,24 +133,29 @@ public Blob create(BlobInfo blobInfo, byte[] content, BlobTargetOption... option
132133
.setCrc32c(BaseEncoding.base64().encode(
133134
Ints.toByteArray(Hashing.crc32c().hashBytes(content).asInt())))
134135
.build();
135-
return create(updatedInfo, new ByteArrayInputStream(content), options);
136+
return internalCreate(updatedInfo, content, options);
136137
}
137138

138139
@Override
140+
@Deprecated
139141
public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) {
140142
Tuple<BlobInfo, BlobTargetOption[]> targetOptions = BlobTargetOption.convert(blobInfo, options);
141-
return create(targetOptions.x(), content, targetOptions.y());
143+
StorageObject blobPb = targetOptions.x().toPb();
144+
Map<StorageRpc.Option, ?> optionsMap = optionMap(targetOptions.x(), targetOptions.y());
145+
InputStream inputStreamParam = firstNonNull(content, new ByteArrayInputStream(EMPTY_BYTE_ARRAY));
146+
// retries are not safe when the input is an InputStream, so we can't retry.
147+
return Blob.fromPb(this, storageRpc.create(blobPb, inputStreamParam, optionsMap));
142148
}
143149

144-
private Blob create(BlobInfo info, final InputStream content, BlobTargetOption... options) {
150+
private Blob internalCreate(BlobInfo info, final byte[] content, BlobTargetOption... options) {
151+
Preconditions.checkNotNull(content);
145152
final StorageObject blobPb = info.toPb();
146153
final Map<StorageRpc.Option, ?> optionsMap = optionMap(info, options);
147154
try {
148155
return Blob.fromPb(this, runWithRetries(new Callable<StorageObject>() {
149156
@Override
150157
public StorageObject call() {
151-
return storageRpc.create(blobPb,
152-
firstNonNull(content, new ByteArrayInputStream(EMPTY_BYTE_ARRAY)), optionsMap);
158+
return storageRpc.create(blobPb, new ByteArrayInputStream(content), optionsMap);
153159
}
154160
}, getOptions().getRetrySettings(), EXCEPTION_HANDLER, getOptions().getClock()));
155161
} catch (RetryHelperException e) {

google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplTest.java

Lines changed: 98 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import static com.google.cloud.storage.testing.ApiPolicyMatcher.eqApiPolicy;
2020
import static java.nio.charset.StandardCharsets.UTF_8;
21-
import static org.easymock.EasyMock.eq;
2221
import static org.junit.Assert.assertArrayEquals;
2322
import static org.junit.Assert.assertEquals;
2423
import static org.junit.Assert.assertFalse;
@@ -407,7 +406,9 @@ public void testCreateBlob() throws IOException {
407406
.andReturn(BLOB_INFO1.toPb());
408407
EasyMock.replay(storageRpcMock);
409408
initializeService();
409+
410410
Blob blob = storage.create(BLOB_INFO1, BLOB_CONTENT);
411+
411412
assertEquals(expectedBlob1, blob);
412413
ByteArrayInputStream byteStream = capturedStream.getValue();
413414
byte[] streamBytes = new byte[BLOB_CONTENT.length];
@@ -416,6 +417,57 @@ public void testCreateBlob() throws IOException {
416417
assertEquals(-1, byteStream.read(streamBytes));
417418
}
418419

420+
@Test
421+
public void testCreateBlobRetry() throws IOException {
422+
Capture<ByteArrayInputStream> capturedStream1 = Capture.newInstance();
423+
Capture<ByteArrayInputStream> capturedStream2 = Capture.newInstance();
424+
StorageObject storageObject = BLOB_INFO1
425+
.toBuilder()
426+
.setMd5(CONTENT_MD5)
427+
.setCrc32c(CONTENT_CRC32C)
428+
.build()
429+
.toPb();
430+
431+
EasyMock.expect(
432+
storageRpcMock.create(
433+
EasyMock.eq(storageObject),
434+
EasyMock.capture(capturedStream1),
435+
EasyMock.eq(EMPTY_RPC_OPTIONS)))
436+
.andThrow(new StorageException(500, "internalError")).once();
437+
438+
EasyMock.expect(
439+
storageRpcMock.create(
440+
EasyMock.eq(storageObject),
441+
EasyMock.capture(capturedStream2),
442+
EasyMock.eq(EMPTY_RPC_OPTIONS)))
443+
.andReturn(BLOB_INFO1.toPb());
444+
445+
EasyMock.replay(storageRpcMock);
446+
storage =
447+
options
448+
.toBuilder()
449+
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
450+
.build()
451+
.getService();
452+
initializeServiceDependentObjects();
453+
454+
Blob blob = storage.create(BLOB_INFO1, BLOB_CONTENT);
455+
456+
assertEquals(expectedBlob1, blob);
457+
458+
ByteArrayInputStream byteStream = capturedStream1.getValue();
459+
byte[] streamBytes = new byte[BLOB_CONTENT.length];
460+
assertEquals(BLOB_CONTENT.length, byteStream.read(streamBytes));
461+
assertArrayEquals(BLOB_CONTENT, streamBytes);
462+
assertEquals(-1, byteStream.read(streamBytes));
463+
464+
ByteArrayInputStream byteStream2 = capturedStream2.getValue();
465+
byte[] streamBytes2 = new byte[BLOB_CONTENT.length];
466+
assertEquals(BLOB_CONTENT.length, byteStream2.read(streamBytes2));
467+
assertArrayEquals(BLOB_CONTENT, streamBytes2);
468+
assertEquals(-1, byteStream.read(streamBytes2));
469+
}
470+
419471
@Test
420472
public void testCreateEmptyBlob() throws IOException {
421473
Capture<ByteArrayInputStream> capturedStream = Capture.newInstance();
@@ -507,17 +559,30 @@ public void testCreateBlobWithEncryptionKey() throws IOException {
507559
}
508560

509561
@Test
510-
public void testCreateBlobFromStream() {
562+
public void testCreateBlobFromStream() throws IOException {
563+
Capture<ByteArrayInputStream> capturedStream = Capture.newInstance();
564+
511565
ByteArrayInputStream fileStream = new ByteArrayInputStream(BLOB_CONTENT);
512566
BlobInfo.Builder infoBuilder = BLOB_INFO1.toBuilder();
513567
BlobInfo infoWithHashes = infoBuilder.setMd5(CONTENT_MD5).setCrc32c(CONTENT_CRC32C).build();
514568
BlobInfo infoWithoutHashes = infoBuilder.setMd5(null).setCrc32c(null).build();
515-
EasyMock.expect(storageRpcMock.create(infoWithoutHashes.toPb(), fileStream, EMPTY_RPC_OPTIONS))
569+
EasyMock.expect(storageRpcMock
570+
.create(
571+
EasyMock.eq(infoWithoutHashes.toPb()),
572+
EasyMock.capture(capturedStream),
573+
EasyMock.eq(EMPTY_RPC_OPTIONS)))
516574
.andReturn(BLOB_INFO1.toPb());
517575
EasyMock.replay(storageRpcMock);
518576
initializeService();
577+
519578
Blob blob = storage.create(infoWithHashes, fileStream);
579+
520580
assertEquals(expectedBlob1, blob);
581+
ByteArrayInputStream byteStream = capturedStream.getValue();
582+
byte[] streamBytes = new byte[BLOB_CONTENT.length];
583+
assertEquals(BLOB_CONTENT.length, byteStream.read(streamBytes));
584+
assertArrayEquals(BLOB_CONTENT, streamBytes);
585+
assertEquals(-1, byteStream.read(streamBytes));
521586
}
522587

523588
@Test
@@ -539,6 +604,35 @@ public void testCreateBlobFromStreamWithEncryptionKey() throws IOException {
539604
assertEquals(expectedBlob1, blob);
540605
}
541606

607+
@Test
608+
public void testCreateBlobFromStreamRetryableException() throws IOException {
609+
Capture<ByteArrayInputStream> capturedStream = Capture.newInstance();
610+
ByteArrayInputStream fileStream = new ByteArrayInputStream(BLOB_CONTENT);
611+
BlobInfo.Builder infoBuilder = BLOB_INFO1.toBuilder();
612+
BlobInfo infoWithHashes = infoBuilder.setMd5(CONTENT_MD5).setCrc32c(CONTENT_CRC32C).build();
613+
BlobInfo infoWithoutHashes = infoBuilder.setMd5(null).setCrc32c(null).build();
614+
EasyMock.expect(storageRpcMock
615+
.create(
616+
EasyMock.eq(infoWithoutHashes.toPb()),
617+
EasyMock.capture(capturedStream),
618+
EasyMock.eq(EMPTY_RPC_OPTIONS)))
619+
.andThrow(new StorageException(500, "internalError")).once();
620+
621+
EasyMock.replay(storageRpcMock);
622+
storage =
623+
options
624+
.toBuilder()
625+
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
626+
.build()
627+
.getService();
628+
629+
// Even though this exception is retryable, storage.create(BlobInfo, InputStream)
630+
// shouldn't retry.
631+
thrown.expect(StorageException.class);
632+
thrown.expectMessage("internalError");
633+
storage.create(infoWithHashes, fileStream);
634+
}
635+
542636
@Test
543637
public void testGetBucket() {
544638
EasyMock.expect(storageRpcMock.get(BucketInfo.of(BUCKET_NAME1).toPb(), EMPTY_RPC_OPTIONS))
@@ -2033,7 +2127,7 @@ public void testSetIamPolicy() {
20332127
EasyMock.expect(storageRpcMock.getIamPolicy(BUCKET_NAME1)).andReturn(API_POLICY1);
20342128
EasyMock.expect(
20352129
storageRpcMock.setIamPolicy(
2036-
eq(BUCKET_NAME1),
2130+
EasyMock.eq(BUCKET_NAME1),
20372131
eqApiPolicy(preCommitApiPolicy)))
20382132
.andReturn(postCommitApiPolicy);
20392133
EasyMock.replay(storageRpcMock);

0 commit comments

Comments
 (0)