Skip to content

Commit 42e1f6e

Browse files
committed
fix: correct lastChunk retry logic in BlobWriteChannel (#918)
Add new method StorageRpc#queryResumableUpload which allows getting a shallow StorageObject for a resumable upload session which is complete. Update BlobWriteChannel to use StoageRpc#queryResumableUpload instead of StorageRpc#get when attempting to validate the upload size of an object when it determines the upload is complete and is on the last chunk. If a BlobWriteChannel is opened with a conditional like IfGenerationMatch it is not possible to simply get the object, as the object can drift generationally while the resumable upload is being performed. Related to #839 (cherry picked from commit ab0228c)
1 parent 895e3a3 commit 42e1f6e

7 files changed

Lines changed: 204 additions & 25 deletions

File tree

google-cloud-storage/clirr-ignored-differences.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,9 @@
3131
<method>long getCurrentUploadOffset(java.lang.String)</method>
3232
<differenceType>7012</differenceType>
3333
</difference>
34+
<difference>
35+
<className>com/google/cloud/storage/spi/v1/StorageRpc</className>
36+
<method>com.google.api.services.storage.model.StorageObject queryCompletedResumableUpload(java.lang.String, long)</method>
37+
<differenceType>7012</differenceType>
38+
</difference>
3439
</differences>

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.google.cloud.RetryHelper;
2626
import com.google.cloud.WriteChannel;
2727
import com.google.cloud.storage.spi.v1.StorageRpc;
28-
import com.google.common.collect.Maps;
2928
import java.math.BigInteger;
3029
import java.net.URL;
3130
import java.util.Map;
@@ -78,12 +77,6 @@ private long getRemotePosition() {
7877
return getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId());
7978
}
8079

81-
private StorageObject getRemoteStorageObject() {
82-
return getOptions()
83-
.getStorageRpcV1()
84-
.get(getEntity().toPb(), Maps.newEnumMap(StorageRpc.Option.class));
85-
}
86-
8780
private static StorageException unrecoverableState(
8881
String uploadId,
8982
int chunkOffset,
@@ -212,8 +205,12 @@ public void run() {
212205
if (uploadAlreadyComplete && lastChunk) {
213206
// Case 6
214207
// Request object metadata if not available
208+
long totalBytes = getPosition() + length;
215209
if (storageObject == null) {
216-
storageObject = getRemoteStorageObject();
210+
storageObject =
211+
getOptions()
212+
.getStorageRpcV1()
213+
.queryCompletedResumableUpload(getUploadId(), totalBytes);
217214
}
218215
// the following checks are defined here explicitly to provide a more
219216
// informative if either storageObject is unable to be resolved or it's size is
@@ -239,7 +236,7 @@ public void run() {
239236
remotePosition,
240237
lastChunk);
241238
}
242-
if (size.longValue() != getPosition() + length) {
239+
if (size.longValue() != totalBytes) {
243240
throw unrecoverableState(
244241
getUploadId(),
245242
chunkOffset,

google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,25 @@ public long getCurrentUploadOffset(String uploadId) {
799799
}
800800
}
801801

802+
@Override
803+
public StorageObject queryCompletedResumableUpload(String uploadId, long totalBytes) {
804+
try {
805+
GenericUrl url = new GenericUrl(uploadId);
806+
HttpRequest req = storage.getRequestFactory().buildPutRequest(url, new EmptyContent());
807+
req.getHeaders().setContentRange(String.format("bytes */%s", totalBytes));
808+
req.setParser(storage.getObjectParser());
809+
HttpResponse response = req.execute();
810+
// If the response is 200
811+
if (response.getStatusCode() == 200) {
812+
return response.parseAs(StorageObject.class);
813+
} else {
814+
throw buildStorageException(response.getStatusCode(), response.getStatusMessage());
815+
}
816+
} catch (IOException ex) {
817+
throw translate(ex);
818+
}
819+
}
820+
802821
@Override
803822
public StorageObject writeWithResponse(
804823
String uploadId,
@@ -864,10 +883,7 @@ public StorageObject writeWithResponse(
864883
if (exception != null) {
865884
throw exception;
866885
}
867-
GoogleJsonError error = new GoogleJsonError();
868-
error.setCode(code);
869-
error.setMessage(message);
870-
throw translate(error);
886+
throw buildStorageException(code, message);
871887
}
872888
} catch (IOException ex) {
873889
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
@@ -914,10 +930,7 @@ public String open(StorageObject object, Map<Option, ?> options) {
914930
setEncryptionHeaders(requestHeaders, "x-goog-encryption-", options);
915931
HttpResponse response = httpRequest.execute();
916932
if (response.getStatusCode() != 200) {
917-
GoogleJsonError error = new GoogleJsonError();
918-
error.setCode(response.getStatusCode());
919-
error.setMessage(response.getStatusMessage());
920-
throw translate(error);
933+
throw buildStorageException(response.getStatusCode(), response.getStatusMessage());
921934
}
922935
return response.getHeaders().getLocation();
923936
} catch (IOException ex) {
@@ -947,10 +960,7 @@ public String open(String signedURL) {
947960
requestHeaders.set("x-goog-resumable", "start");
948961
HttpResponse response = httpRequest.execute();
949962
if (response.getStatusCode() != 201) {
950-
GoogleJsonError error = new GoogleJsonError();
951-
error.setCode(response.getStatusCode());
952-
error.setMessage(response.getStatusMessage());
953-
throw translate(error);
963+
throw buildStorageException(response.getStatusCode(), response.getStatusMessage());
954964
}
955965
return response.getHeaders().getLocation();
956966
} catch (IOException ex) {
@@ -1610,4 +1620,11 @@ public ServiceAccount getServiceAccount(String projectId) {
16101620
span.end(HttpStorageRpcSpans.END_SPAN_OPTIONS);
16111621
}
16121622
}
1623+
1624+
private static StorageException buildStorageException(int statusCode, String statusMessage) {
1625+
GoogleJsonError error = new GoogleJsonError();
1626+
error.setCode(statusCode);
1627+
error.setMessage(statusMessage);
1628+
return translate(error);
1629+
}
16131630
}

google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,24 @@ void write(
337337
*/
338338
long getCurrentUploadOffset(String uploadId);
339339

340+
/**
341+
* Attempts to retrieve the StorageObject from a completed resumable upload. When a resumable
342+
* upload completes, the response will be the up-to-date StorageObject metadata. This up-to-date
343+
* metadata can then be used to validate the total size of the object along with new generation
344+
* and other information.
345+
*
346+
* <p>If for any reason, the response to the final PUT to a resumable upload is not received, this
347+
* method can be used to query for the up-to-date StorageObject. If the upload is complete, this
348+
* method can be used to access the StorageObject independently from any other liveness or
349+
* conditional criteria requirements that are otherwise applicable when using {@link
350+
* #get(StorageObject, Map)}.
351+
*
352+
* @param uploadId resumable upload ID URL
353+
* @param totalBytes the total number of bytes that should have been written.
354+
* @throws StorageException if the upload is incomplete or does not exist
355+
*/
356+
StorageObject queryCompletedResumableUpload(String uploadId, long totalBytes);
357+
340358
/**
341359
* Writes the provided bytes to a storage object at the provided location. If {@code last=true}
342360
* returns metadata of the updated object, otherwise returns null.

google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,11 @@ public long getCurrentUploadOffset(String uploadId) {
144144
throw new UnsupportedOperationException("Not implemented yet");
145145
}
146146

147+
@Override
148+
public StorageObject queryCompletedResumableUpload(String uploadId, long totalBytes) {
149+
throw new UnsupportedOperationException("Not implemented yet");
150+
}
151+
147152
@Override
148153
public StorageObject writeWithResponse(
149154
String uploadId,

google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import com.google.cloud.storage.spi.StorageRpcFactory;
4141
import com.google.cloud.storage.spi.v1.StorageRpc;
4242
import com.google.common.collect.ImmutableMap;
43-
import com.google.common.collect.Maps;
4443
import java.io.IOException;
4544
import java.math.BigInteger;
4645
import java.net.MalformedURLException;
@@ -334,10 +333,10 @@ public void testWriteWithRetryAndObjectMetadata() throws IOException {
334333
.andThrow(socketClosedException);
335334
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
336335
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
337-
expect(storageRpcMock.get(BLOB_INFO.toPb(), Maps.newEnumMap(StorageRpc.Option.class)))
336+
expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE)))
338337
.andThrow(socketClosedException);
339338
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
340-
expect(storageRpcMock.get(BLOB_INFO.toPb(), Maps.newEnumMap(StorageRpc.Option.class)))
339+
expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE)))
341340
.andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE)));
342341
replay(storageRpcMock);
343342
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
@@ -487,7 +486,7 @@ public void testWriteWithLastFlushRetryChunkButCompleted() throws IOException {
487486
eq(true)))
488487
.andThrow(socketClosedException);
489488
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
490-
expect(storageRpcMock.get(BLOB_INFO.toPb(), Maps.newEnumMap(StorageRpc.Option.class)))
489+
expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE)))
491490
.andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE)));
492491
replay(storageRpcMock);
493492
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);

google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@
8282
import com.google.cloud.storage.StorageException;
8383
import com.google.cloud.storage.StorageOptions;
8484
import com.google.cloud.storage.StorageRoles;
85+
import com.google.cloud.storage.spi.StorageRpcFactory;
86+
import com.google.cloud.storage.spi.v1.StorageRpc;
87+
import com.google.cloud.storage.spi.v1.StorageRpc.Option;
8588
import com.google.cloud.storage.testing.RemoteStorageHelper;
8689
import com.google.common.collect.ImmutableList;
8790
import com.google.common.collect.ImmutableMap;
@@ -90,6 +93,8 @@
9093
import com.google.common.collect.Lists;
9194
import com.google.common.io.BaseEncoding;
9295
import com.google.common.io.ByteStreams;
96+
import com.google.common.reflect.AbstractInvocationHandler;
97+
import com.google.common.reflect.Reflection;
9398
import com.google.iam.v1.Binding;
9499
import com.google.iam.v1.IAMPolicyGrpc;
95100
import com.google.iam.v1.SetIamPolicyRequest;
@@ -106,9 +111,11 @@
106111
import java.io.FileInputStream;
107112
import java.io.IOException;
108113
import java.io.InputStream;
114+
import java.lang.reflect.Method;
109115
import java.net.URL;
110116
import java.net.URLConnection;
111117
import java.nio.ByteBuffer;
118+
import java.nio.charset.StandardCharsets;
112119
import java.nio.file.Files;
113120
import java.nio.file.Path;
114121
import java.security.Key;
@@ -124,6 +131,7 @@
124131
import java.util.Set;
125132
import java.util.concurrent.ExecutionException;
126133
import java.util.concurrent.TimeUnit;
134+
import java.util.concurrent.atomic.AtomicBoolean;
127135
import java.util.logging.Level;
128136
import java.util.logging.Logger;
129137
import java.util.zip.GZIPInputStream;
@@ -137,7 +145,14 @@
137145
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
138146
import org.junit.AfterClass;
139147
import org.junit.BeforeClass;
148+
import org.junit.Rule;
140149
import org.junit.Test;
150+
import org.junit.rules.TestName;
151+
import org.threeten.bp.Clock;
152+
import org.threeten.bp.Instant;
153+
import org.threeten.bp.ZoneId;
154+
import org.threeten.bp.ZoneOffset;
155+
import org.threeten.bp.format.DateTimeFormatter;
141156

142157
public class ITStorageTest {
143158

@@ -198,6 +213,8 @@ public class ITStorageTest {
198213
private static final ImmutableList<LifecycleRule> LIFECYCLE_RULES =
199214
ImmutableList.of(LIFECYCLE_RULE_1, LIFECYCLE_RULE_2);
200215

216+
@Rule public final TestName testName = new TestName();
217+
201218
@BeforeClass
202219
public static void beforeClass() throws IOException {
203220
remoteStorageHelper = RemoteStorageHelper.create();
@@ -3624,4 +3641,125 @@ public void testWriterWithKmsKeyName() throws IOException {
36243641
assertThat(blob.getKmsKeyName()).isNotNull();
36253642
assertThat(storage.delete(BUCKET, blobName)).isTrue();
36263643
}
3644+
3645+
@Test
3646+
public void blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent_multipleChunks()
3647+
throws IOException {
3648+
int _2MiB = 256 * 1024;
3649+
int contentSize = 292_617;
3650+
3651+
blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent(_2MiB, contentSize);
3652+
}
3653+
3654+
@Test
3655+
public void blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent_singleChunk()
3656+
throws IOException {
3657+
int _4MiB = 256 * 1024 * 2;
3658+
int contentSize = 292_617;
3659+
3660+
blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent(_4MiB, contentSize);
3661+
}
3662+
3663+
private void blobWriteChannel_handlesRecoveryOnLastChunkWhenGenerationIsPresent(
3664+
int chunkSize, int contentSize) throws IOException {
3665+
Instant now = Clock.systemUTC().instant();
3666+
DateTimeFormatter formatter =
3667+
DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC));
3668+
String nowString = formatter.format(now);
3669+
3670+
String blobPath = String.format("%s/%s/blob", testName.getMethodName(), nowString);
3671+
BlobId blobId = BlobId.of(BUCKET, blobPath);
3672+
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();
3673+
3674+
Random rand = new Random(1234567890);
3675+
String randString = randString(rand, contentSize);
3676+
final byte[] randStringBytes = randString.getBytes(StandardCharsets.UTF_8);
3677+
Storage storage = StorageOptions.getDefaultInstance().getService();
3678+
WriteChannel ww = storage.writer(blobInfo);
3679+
ww.setChunkSize(chunkSize);
3680+
ww.write(ByteBuffer.wrap(randStringBytes));
3681+
ww.close();
3682+
3683+
Blob blobGen1 = storage.get(blobId);
3684+
3685+
final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
3686+
3687+
Storage testStorage =
3688+
StorageOptions.newBuilder()
3689+
.setServiceRpcFactory(
3690+
new StorageRpcFactory() {
3691+
/**
3692+
* Here we're creating a proxy of StorageRpc where we can delegate all calls to
3693+
* the normal implementation, except in the case of {@link
3694+
* StorageRpc#writeWithResponse(String, byte[], int, long, int, boolean)} where
3695+
* {@code lastChunk == true}. We allow the call to execute, but instead of
3696+
* returning the result we throw an IOException to simulate a prematurely close
3697+
* connection. This behavior is to ensure appropriate handling of a completed
3698+
* upload where the ACK wasn't received. In particular, if an upload is initiated
3699+
* against an object where an {@link Option#IF_GENERATION_MATCH} simply calling
3700+
* get on an object can result in a 404 because the object that is created while
3701+
* the BlobWriteChannel is executing will be a new generation.
3702+
*/
3703+
@SuppressWarnings("UnstableApiUsage")
3704+
@Override
3705+
public StorageRpc create(final StorageOptions options) {
3706+
return Reflection.newProxy(
3707+
StorageRpc.class,
3708+
new AbstractInvocationHandler() {
3709+
final StorageRpc delegate =
3710+
(StorageRpc) StorageOptions.getDefaultInstance().getRpc();
3711+
3712+
@Override
3713+
protected Object handleInvocation(
3714+
Object proxy, Method method, Object[] args) throws Throwable {
3715+
if ("writeWithResponse".equals(method.getName())) {
3716+
Object result = method.invoke(delegate, args);
3717+
boolean lastChunk = (boolean) args[5];
3718+
// if we're on the lastChunk simulate a connection failure which
3719+
// happens after the request was processed but before response could
3720+
// be received by the client.
3721+
if (lastChunk) {
3722+
exceptionThrown.set(true);
3723+
throw StorageException.translate(
3724+
new IOException("simulated Connection closed prematurely"));
3725+
} else {
3726+
return result;
3727+
}
3728+
}
3729+
return method.invoke(delegate, args);
3730+
}
3731+
});
3732+
}
3733+
})
3734+
.build()
3735+
.getService();
3736+
3737+
try (WriteChannel w = testStorage.writer(blobGen1, BlobWriteOption.generationMatch())) {
3738+
w.setChunkSize(chunkSize);
3739+
3740+
ByteBuffer buffer = ByteBuffer.wrap(randStringBytes);
3741+
w.write(buffer);
3742+
}
3743+
3744+
assertTrue("Expected an exception to be thrown for the last chunk", exceptionThrown.get());
3745+
3746+
Blob blobGen2 = storage.get(blobId);
3747+
assertEquals(contentSize, (long) blobGen2.getSize());
3748+
assertNotEquals(blobInfo.getGeneration(), blobGen2.getGeneration());
3749+
ByteArrayOutputStream actualData = new ByteArrayOutputStream();
3750+
blobGen2.downloadTo(actualData);
3751+
assertArrayEquals(randStringBytes, actualData.toByteArray());
3752+
}
3753+
3754+
private static String randString(Random rand, int length) {
3755+
final StringBuilder sb = new StringBuilder();
3756+
while (sb.length() < length) {
3757+
int i = rand.nextInt('z');
3758+
char c = (char) i;
3759+
if (Character.isLetter(c) || Character.isDigit(c)) {
3760+
sb.append(c);
3761+
}
3762+
}
3763+
return sb.toString();
3764+
}
36273765
}

0 commit comments

Comments
 (0)