Skip to content

Commit 692289e

Browse files
committed
---
yaml --- r: 2493 b: refs/heads/update-datastore c: 6537314 h: refs/heads/master i: 2491: 89f1704
1 parent 3345474 commit 692289e

8 files changed

Lines changed: 115 additions & 28 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ refs/heads/gh-pages: 4e0561bb4504bf647db669a14417b2b2c87ba45d
55
refs/heads/bigquery: 762fa5830e6c398c0396177e3e7fd243bd62cfc3
66
refs/heads/pubsub-alpha: 1a0e970f265af871e02274085b9662b3fe29058b
77
refs/heads/resource-manager: ebf4adc5ee835cd2086c4ac5b4e78d01a5a005a7
8-
refs/heads/update-datastore: 87e4bac0dff3ef1f36f67e6a119072edd0d3509c
8+
refs/heads/update-datastore: 65373146e8cd827011631eb111ad4a5b52f5c2a5
99
refs/tags/0.0.9: 22f1839238f66c39e67ed4dfdcd273b1ae2e8444
1010
refs/tags/v0.0.10: 207ebd2a3472fddee69fe1298eb90429e3306efd
1111
refs/tags/v0.0.11: ffbfba48a6426ff63c08ff2117e58681f251fbf2

branches/update-datastore/gcloud-java-storage/src/main/java/com/google/gcloud/spi/DefaultStorageRpc.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -410,8 +410,8 @@ public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) {
410410
}
411411

412412
@Override
413-
public byte[] read(StorageObject from, Map<Option, ?> options, long position, int bytes)
414-
throws StorageException {
413+
public Tuple<String, byte[]> read(StorageObject from, Map<Option, ?> options, long position,
414+
int bytes) throws StorageException {
415415
try {
416416
Get req = storage.objects()
417417
.get(from.getBucket(), from.getName())
@@ -420,12 +420,13 @@ public byte[] read(StorageObject from, Map<Option, ?> options, long position, in
420420
.setIfMetagenerationNotMatch(IF_METAGENERATION_NOT_MATCH.getLong(options))
421421
.setIfGenerationMatch(IF_GENERATION_MATCH.getLong(options))
422422
.setIfGenerationNotMatch(IF_GENERATION_NOT_MATCH.getLong(options));
423-
MediaHttpDownloader downloader = req.getMediaHttpDownloader();
424-
downloader.setContentRange(position, Ints.checkedCast(position + bytes - 1));
425-
downloader.setDirectDownloadEnabled(true);
423+
StringBuilder range = new StringBuilder();
424+
range.append("bytes=").append(position).append("-").append(position + bytes - 1);
425+
req.getRequestHeaders().setRange(range.toString());
426426
ByteArrayOutputStream output = new ByteArrayOutputStream();
427-
req.executeMediaAndDownloadTo(output);
428-
return output.toByteArray();
427+
req.executeMedia().download(output);
428+
String etag = req.getLastResponseHeaders().getETag();
429+
return Tuple.of(etag, output.toByteArray());
429430
} catch (IOException ex) {
430431
throw translate(ex);
431432
}

branches/update-datastore/gcloud-java-storage/src/main/java/com/google/gcloud/spi/StorageRpc.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ StorageObject compose(Iterable<StorageObject> sources, StorageObject target,
249249
byte[] load(StorageObject storageObject, Map<Option, ?> options)
250250
throws StorageException;
251251

252-
byte[] read(StorageObject from, Map<Option, ?> options, long position, int bytes)
252+
Tuple<String, byte[]> read(StorageObject from, Map<Option, ?> options, long position, int bytes)
253253
throws StorageException;
254254

255255
String open(StorageObject object, Map<Option, ?> options) throws StorageException;

branches/update-datastore/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.gcloud.RestorableState;
2424
import com.google.gcloud.RetryHelper;
2525
import com.google.gcloud.spi.StorageRpc;
26+
import com.google.gcloud.spi.StorageRpc.Tuple;
2627

2728
import java.io.IOException;
2829
import java.io.Serializable;
@@ -41,6 +42,7 @@ class BlobReadChannelImpl implements BlobReadChannel {
4142
private final StorageOptions serviceOptions;
4243
private final BlobId blob;
4344
private final Map<StorageRpc.Option, ?> requestOptions;
45+
private String lastEtag;
4446
private int position;
4547
private boolean isOpen;
4648
private boolean endOfStream;
@@ -117,12 +119,19 @@ public int read(ByteBuffer byteBuffer) throws IOException {
117119
}
118120
final int toRead = Math.max(byteBuffer.remaining(), chunkSize);
119121
try {
120-
buffer = runWithRetries(new Callable<byte[]>() {
122+
Tuple<String, byte[]> result = runWithRetries(new Callable<Tuple<String, byte[]>>() {
121123
@Override
122-
public byte[] call() {
124+
public Tuple<String, byte[]> call() {
123125
return storageRpc.read(storageObject, requestOptions, position, toRead);
124126
}
125127
}, serviceOptions.retryParams(), StorageImpl.EXCEPTION_HANDLER);
128+
if (lastEtag != null && !Objects.equals(result.x(), lastEtag)) {
129+
StringBuilder messageBuilder = new StringBuilder();
130+
messageBuilder.append("Blob ").append(blob).append(" was updated while reading");
131+
throw new StorageException(0, messageBuilder.toString(), false);
132+
}
133+
lastEtag = result.x();
134+
buffer = result.y();
126135
} catch (RetryHelper.RetryHelperException e) {
127136
throw StorageException.translateAndThrow(e);
128137
}
@@ -152,6 +161,7 @@ static class StateImpl implements RestorableState<BlobReadChannel>, Serializable
152161
private final StorageOptions serviceOptions;
153162
private final BlobId blob;
154163
private final Map<StorageRpc.Option, ?> requestOptions;
164+
private final String lastEtag;
155165
private final int position;
156166
private final boolean isOpen;
157167
private final boolean endOfStream;
@@ -161,6 +171,7 @@ static class StateImpl implements RestorableState<BlobReadChannel>, Serializable
161171
this.serviceOptions = builder.serviceOptions;
162172
this.blob = builder.blob;
163173
this.requestOptions = builder.requestOptions;
174+
this.lastEtag = builder.lastEtag;
164175
this.position = builder.position;
165176
this.isOpen = builder.isOpen;
166177
this.endOfStream = builder.endOfStream;
@@ -171,6 +182,7 @@ static class Builder {
171182
private final StorageOptions serviceOptions;
172183
private final BlobId blob;
173184
private final Map<StorageRpc.Option, ?> requestOptions;
185+
private String lastEtag;
174186
private int position;
175187
private boolean isOpen;
176188
private boolean endOfStream;
@@ -182,6 +194,11 @@ private Builder(StorageOptions options, BlobId blob, Map<StorageRpc.Option, ?> r
182194
this.requestOptions = reqOptions;
183195
}
184196

197+
Builder lastEtag(String lastEtag) {
198+
this.lastEtag = lastEtag;
199+
return this;
200+
}
201+
185202
Builder position(int position) {
186203
this.position = position;
187204
return this;
@@ -215,6 +232,7 @@ static Builder builder(
215232
@Override
216233
public BlobReadChannel restore() {
217234
BlobReadChannelImpl channel = new BlobReadChannelImpl(serviceOptions, blob, requestOptions);
235+
channel.lastEtag = lastEtag;
218236
channel.position = position;
219237
channel.isOpen = isOpen;
220238
channel.endOfStream = endOfStream;
@@ -224,8 +242,8 @@ public BlobReadChannel restore() {
224242

225243
@Override
226244
public int hashCode() {
227-
return Objects.hash(serviceOptions, blob, requestOptions, position, isOpen, endOfStream,
228-
chunkSize);
245+
return Objects.hash(serviceOptions, blob, requestOptions, lastEtag, position, isOpen,
246+
endOfStream, chunkSize);
229247
}
230248

231249
@Override
@@ -240,6 +258,7 @@ public boolean equals(Object obj) {
240258
return Objects.equals(this.serviceOptions, other.serviceOptions)
241259
&& Objects.equals(this.blob, other.blob)
242260
&& Objects.equals(this.requestOptions, other.requestOptions)
261+
&& Objects.equals(this.lastEtag, other.lastEtag)
243262
&& this.position == other.position
244263
&& this.isOpen == other.isOpen
245264
&& this.endOfStream == other.endOfStream

branches/update-datastore/gcloud-java-storage/src/main/java/com/google/gcloud/storage/Storage.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.io.InputStream;
3434
import java.io.Serializable;
3535
import java.net.URL;
36+
import java.nio.ByteBuffer;
3637
import java.util.Arrays;
3738
import java.util.Collections;
3839
import java.util.HashSet;
@@ -1405,14 +1406,20 @@ private static void checkContentType(BlobInfo blobInfo) throws IllegalArgumentEx
14051406
BatchResponse apply(BatchRequest batchRequest);
14061407

14071408
/**
1408-
* Return a channel for reading the blob's content.
1409+
* Return a channel for reading the blob's content. The blob's latest generation is read. If the
1410+
* blob changes while reading (i.e. {@link BlobInfo#etag()} changes), subsequent calls to
1411+
* {@link BlobReadChannel#read(ByteBuffer)} may throw {@link StorageException}.
14091412
*
14101413
* @throws StorageException upon failure
14111414
*/
14121415
BlobReadChannel reader(String bucket, String blob, BlobSourceOption... options);
14131416

14141417
/**
1415-
* Return a channel for reading the blob's content.
1418+
* Return a channel for reading the blob's content. If {@code blob.generation()} is set
1419+
* data corresponding to that generation is read. If {@code blob.generation()} is {@code null}
1420+
* the blob's latest generation is read. If the blob changes while reading (i.e.
1421+
* {@link BlobInfo#etag()} changes), subsequent calls to {@link BlobReadChannel#read(ByteBuffer)}
1422+
* may throw {@link StorageException}.
14161423
*
14171424
* @throws StorageException upon failure
14181425
*/

branches/update-datastore/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static org.easymock.EasyMock.anyObject;
2020
import static org.easymock.EasyMock.createMock;
2121
import static org.easymock.EasyMock.expect;
22-
import static org.easymock.EasyMock.expectLastCall;
2322
import static org.easymock.EasyMock.replay;
2423
import static org.easymock.EasyMock.verify;
2524
import static org.junit.Assert.assertArrayEquals;
@@ -46,7 +45,7 @@ public class BlobReadChannelImplTest {
4645

4746
private static final String BUCKET_NAME = "b";
4847
private static final String BLOB_NAME = "n";
49-
private static final BlobId BLOB_ID = BlobId.of(BUCKET_NAME, BLOB_NAME);
48+
private static final BlobId BLOB_ID = BlobId.of(BUCKET_NAME, BLOB_NAME, -1L);
5049
private static final Map<StorageRpc.Option, ?> EMPTY_RPC_OPTIONS = ImmutableMap.of();
5150
private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024;
5251
private static final int CUSTOM_CHUNK_SIZE = 2 * 1024 * 1024;
@@ -88,7 +87,7 @@ public void testReadBuffered() throws IOException {
8887
ByteBuffer firstReadBuffer = ByteBuffer.allocate(42);
8988
ByteBuffer secondReadBuffer = ByteBuffer.allocate(42);
9089
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
91-
.andReturn(result);
90+
.andReturn(StorageRpc.Tuple.of("etag", result));
9291
replay(storageRpcMock);
9392
reader.read(firstReadBuffer);
9493
reader.read(secondReadBuffer);
@@ -107,10 +106,11 @@ public void testReadBig() throws IOException {
107106
byte[] secondResult = randomByteArray(DEFAULT_CHUNK_SIZE);
108107
ByteBuffer firstReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
109108
ByteBuffer secondReadBuffer = ByteBuffer.allocate(42);
110-
storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE);
111-
expectLastCall().andReturn(firstResult);
112-
storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, DEFAULT_CHUNK_SIZE, CUSTOM_CHUNK_SIZE);
113-
expectLastCall().andReturn(secondResult);
109+
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
110+
.andReturn(StorageRpc.Tuple.of("etag", firstResult));
111+
expect(storageRpcMock.read(
112+
BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, DEFAULT_CHUNK_SIZE, CUSTOM_CHUNK_SIZE))
113+
.andReturn(StorageRpc.Tuple.of("etag", secondResult));
114114
replay(storageRpcMock);
115115
reader.read(firstReadBuffer);
116116
reader.read(secondReadBuffer);
@@ -125,7 +125,7 @@ public void testReadFinish() throws IOException {
125125
byte[] result = {};
126126
ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
127127
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
128-
.andReturn(result);
128+
.andReturn(StorageRpc.Tuple.of("etag", result));
129129
replay(storageRpcMock);
130130
assertEquals(-1, reader.read(readBuffer));
131131
}
@@ -137,7 +137,7 @@ public void testSeek() throws IOException {
137137
byte[] result = randomByteArray(DEFAULT_CHUNK_SIZE);
138138
ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
139139
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 42, DEFAULT_CHUNK_SIZE))
140-
.andReturn(result);
140+
.andReturn(StorageRpc.Tuple.of("etag", result));
141141
replay(storageRpcMock);
142142
reader.read(readBuffer);
143143
assertArrayEquals(result, readBuffer.array());
@@ -166,16 +166,42 @@ public void testReadClosed() {
166166
}
167167
}
168168

169+
@Test
170+
public void testReadGenerationChanged() throws IOException {
171+
BlobId blobId = BlobId.of(BUCKET_NAME, BLOB_NAME);
172+
reader = new BlobReadChannelImpl(options, blobId, EMPTY_RPC_OPTIONS);
173+
byte[] firstResult = randomByteArray(DEFAULT_CHUNK_SIZE);
174+
byte[] secondResult = randomByteArray(DEFAULT_CHUNK_SIZE);
175+
ByteBuffer firstReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
176+
ByteBuffer secondReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
177+
expect(storageRpcMock.read(blobId.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
178+
.andReturn(StorageRpc.Tuple.of("etag1", firstResult));
179+
expect(storageRpcMock.read(
180+
blobId.toPb(), EMPTY_RPC_OPTIONS, DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE))
181+
.andReturn(StorageRpc.Tuple.of("etag2", firstResult));
182+
replay(storageRpcMock);
183+
reader.read(firstReadBuffer);
184+
try {
185+
reader.read(secondReadBuffer);
186+
fail("Expected BlobReadChannel read to throw StorageException");
187+
} catch (StorageException ex) {
188+
StringBuilder messageBuilder = new StringBuilder();
189+
messageBuilder.append("Blob ").append(blobId).append(" was updated while reading");
190+
assertEquals(messageBuilder.toString(), ex.getMessage());
191+
// expected
192+
}
193+
}
194+
169195
@Test
170196
public void testSaveAndRestore() throws IOException, ClassNotFoundException {
171197
byte[] firstResult = randomByteArray(DEFAULT_CHUNK_SIZE);
172198
byte[] secondResult = randomByteArray(DEFAULT_CHUNK_SIZE);
173199
ByteBuffer firstReadBuffer = ByteBuffer.allocate(42);
174200
ByteBuffer secondReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
175201
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
176-
.andReturn(firstResult);
202+
.andReturn(StorageRpc.Tuple.of("etag", firstResult));
177203
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 42, DEFAULT_CHUNK_SIZE))
178-
.andReturn(secondResult);
204+
.andReturn(StorageRpc.Tuple.of("etag", secondResult));
179205
replay(storageRpcMock);
180206
reader = new BlobReadChannelImpl(options, BLOB_ID, EMPTY_RPC_OPTIONS);
181207
reader.read(firstReadBuffer);

branches/update-datastore/gcloud-java-storage/src/test/java/com/google/gcloud/storage/ITStorageTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.Iterator;
5050
import java.util.List;
5151
import java.util.Map;
52+
import java.util.Random;
5253
import java.util.concurrent.ExecutionException;
5354
import java.util.concurrent.TimeUnit;
5455
import java.util.concurrent.TimeoutException;
@@ -730,6 +731,39 @@ public void testReadChannelFail() throws IOException {
730731
assertTrue(storage.delete(BUCKET, blobName));
731732
}
732733

734+
@Test
735+
public void testReadChannelFailUpdatedGeneration() throws IOException {
736+
String blobName = "test-read-blob-fail-updated-generation";
737+
BlobInfo blob = BlobInfo.builder(BUCKET, blobName).build();
738+
Random random = new Random();
739+
int chunkSize = 1024;
740+
int blobSize = 2 * chunkSize;
741+
byte[] content = new byte[blobSize];
742+
random.nextBytes(content);
743+
BlobInfo remoteBlob = storage.create(blob, content);
744+
assertNotNull(remoteBlob);
745+
assertEquals(blobSize, (long) remoteBlob.size());
746+
try (BlobReadChannel reader = storage.reader(blob.blobId())) {
747+
reader.chunkSize(chunkSize);
748+
ByteBuffer readBytes = ByteBuffer.allocate(chunkSize);
749+
int numReadBytes = reader.read(readBytes);
750+
assertEquals(chunkSize, numReadBytes);
751+
assertArrayEquals(Arrays.copyOf(content, chunkSize), readBytes.array());
752+
try (BlobWriteChannel writer = storage.writer(blob)) {
753+
byte[] newContent = new byte[blobSize];
754+
random.nextBytes(newContent);
755+
int numWrittenBytes = writer.write(ByteBuffer.wrap(newContent));
756+
assertEquals(blobSize, numWrittenBytes);
757+
}
758+
readBytes = ByteBuffer.allocate(chunkSize);
759+
reader.read(readBytes);
760+
fail("StorageException was expected");
761+
} catch(StorageException ex) {
762+
// expected
763+
}
764+
assertTrue(storage.delete(BUCKET, blobName));
765+
}
766+
733767
@Test
734768
public void testWriteChannelFail() throws IOException {
735769
String blobName = "test-write-channel-blob-fail";

branches/update-datastore/gcloud-java-storage/src/test/java/com/google/gcloud/storage/StorageImplTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,7 +1030,7 @@ public void testReaderWithOptions() throws IOException {
10301030
byte[] result = new byte[DEFAULT_CHUNK_SIZE];
10311031
EasyMock.expect(
10321032
storageRpcMock.read(BLOB_INFO2.toPb(), BLOB_SOURCE_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
1033-
.andReturn(result);
1033+
.andReturn(StorageRpc.Tuple.of("etag", result));
10341034
EasyMock.replay(storageRpcMock);
10351035
storage = options.service();
10361036
BlobReadChannel channel = storage.reader(BUCKET_NAME1, BLOB_NAME2, BLOB_SOURCE_GENERATION,
@@ -1045,7 +1045,7 @@ public void testReaderWithOptionsFromBlobId() throws IOException {
10451045
byte[] result = new byte[DEFAULT_CHUNK_SIZE];
10461046
EasyMock.expect(
10471047
storageRpcMock.read(BLOB_INFO1.blobId().toPb(), BLOB_SOURCE_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
1048-
.andReturn(result);
1048+
.andReturn(StorageRpc.Tuple.of("etag", result));
10491049
EasyMock.replay(storageRpcMock);
10501050
storage = options.service();
10511051
BlobReadChannel channel = storage.reader(BLOB_INFO1.blobId(),

0 commit comments

Comments
 (0)