Skip to content

Commit 0a8c650

Browse files
committed
---
yaml --- r: 5967 b: refs/heads/tswast-patch-1 c: c1db9b2 h: refs/heads/master i: 5965: b3ad20f 5963: a82ea68 5959: 58faa98 5951: b020ceb
1 parent f56364f commit 0a8c650

7 files changed

Lines changed: 130 additions & 15 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,5 @@ refs/tags/v0.18.0: 9d193c4c4b9d1c6f21515dd8e50836b9194ec9bb
5757
refs/tags/v0.19.0: e67b56e4d8dad5f9a7b38c9b2107c23c828f2ed5
5858
refs/tags/v0.20.0: 839f7fb7156535146aa1cb2c5aadd8d375d854e8
5959
refs/tags/v0.20.1: 370471f437f1f4f68a11e068df5cd6bf39edb1fa
60-
refs/heads/tswast-patch-1: dff49797ada1f85eb592eeefae2affef3ef6e193
60+
refs/heads/tswast-patch-1: c1db9b21b5871ab7c5af96c1e1156d75dd410b58
6161
refs/heads/pubsub-streaming-pull: 19262b752ee874eb2ca3b950eb2aef44d5a5267b

branches/tswast-patch-1/src/main/java/com/google/gcloud/examples/StorageExample.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,22 @@
1616

1717
package com.google.gcloud.examples;
1818

19+
import static java.nio.charset.StandardCharsets.UTF_8;
20+
1921
import com.google.gcloud.spi.StorageRpc.Tuple;
2022
import com.google.gcloud.storage.BatchRequest;
2123
import com.google.gcloud.storage.BatchResponse;
2224
import com.google.gcloud.storage.Blob;
25+
import com.google.gcloud.storage.BlobReadChannel;
2326
import com.google.gcloud.storage.Bucket;
2427
import com.google.gcloud.storage.StorageService;
2528
import com.google.gcloud.storage.StorageService.ComposeRequest;
2629
import com.google.gcloud.storage.StorageService.CopyRequest;
2730
import com.google.gcloud.storage.StorageServiceFactory;
2831
import com.google.gcloud.storage.StorageServiceOptions;
2932

30-
import java.nio.charset.StandardCharsets;
33+
import java.io.IOException;
34+
import java.nio.ByteBuffer;
3135
import java.nio.file.Files;
3236
import java.nio.file.Path;
3337
import java.nio.file.Paths;
@@ -214,17 +218,23 @@ public String params() {
214218

215219
private static class GetAction extends BlobAction {
216220
@Override
217-
public void run(StorageService storage, Blob blob) {
221+
public void run(StorageService storage, Blob blob) throws IOException {
218222
blob = storage.get(blob);
219223
if (blob == null) {
220224
System.out.println("No such object");
221225
return;
222226
}
223-
if (blob.size() < 1_000_000) {
224-
System.out.println(new String(storage.load(blob), StandardCharsets.UTF_8));
227+
if (blob.size() < 1024) {
228+
System.out.println(new String(storage.load(blob), UTF_8));
225229
} else {
226-
// todo: download via streaming API
227-
throw new IllegalArgumentException("file is too big");
230+
try (BlobReadChannel reader = storage.reader(blob)) {
231+
ByteBuffer bytes = ByteBuffer.allocate(64 * 1024);
232+
while (reader.read(bytes) > 0) {
233+
bytes.flip();
234+
System.out.print(UTF_8.decode(bytes));
235+
bytes.clear();
236+
}
237+
}
228238
}
229239
}
230240
}
@@ -294,7 +304,7 @@ public static void printUsage() {
294304
actionAndParams.append(' ').append(param);
295305
}
296306
}
297-
System.out.printf("Usage: %s <project_id> <operation> <args>*%s%n",
307+
System.out.printf("Usage: %s [<project_id>] operation <args>*%s%n",
298308
StorageExample.class.getSimpleName(), actionAndParams);
299309
}
300310

branches/tswast-patch-1/src/main/java/com/google/gcloud/spi/DefaultStorageRpc.java

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

branches/tswast-patch-1/src/main/java/com/google/gcloud/spi/StorageRpc.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
/* * Copyright 2015 Google Inc. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package com.google.gcloud.spi;import com.google.api.services.storage.model.Bucket;import com.google.api.services.storage.model.StorageObject;import com.google.common.collect.ImmutableList;import com.google.common.collect.ImmutableMap;import com.google.gcloud.storage.BlobReadChannel;import com.google.gcloud.storage.BlobWriteChannel;import com.google.gcloud.storage.StorageServiceException;import java.util.List;import java.util.Map;public interface StorageRpc { enum Option { PREDEFINED_ACL("predefinedAcl"), PREDEFINED_DEFAULT_OBJECT_ACL("predefinedDefaultObjectAcl"), IF_METAGENERATION_MATCH("ifMetagenerationMatch"), IF_METAGENERATION_NOT_MATCH("ifMetagenerationNotMatch"), IF_GENERATION_NOT_MATCH("ifGenerationMatch"), IF_GENERATION_MATCH("ifGenerationNotMatch"), IF_SOURCE_METAGENERATION_MATCH("ifSourceMetagenerationMatch"), IF_SOURCE_METAGENERATION_NOT_MATCH("ifSourceMetagenerationNotMatch"), IF_SOURCE_GENERATION_MATCH("ifSourceGenerationMatch"), IF_SOURCE_GENERATION_NOT_MATCH("ifSourceGenerationNotMatch"), PREFIX("prefix"), MAX_RESULTS("maxResults"), PAGE_TOKEN("pageToken"), DELIMITER("delimiter"), VERSIONS("versions"); private final String value; Option(String value) { this.value = value; } public String value() { return value; } @SuppressWarnings("unchecked") <T> T get(Map<Option, ?> options) { return (T) options.get(this); } String getString(Map<Option, ?> options) { return get(options); } Long getLong(Map<Option, ?> options) { return get(options); } Boolean getBoolean(Map<Option, ?> options) { return get(options); } } class Tuple<X, Y> { private final X x; private final Y y; private Tuple(X x, Y y) { this.x = x; this.y = y; } public static <X, Y> Tuple<X, Y> of(X x, Y y) { return new Tuple<>(x, y); } public X x() { return x; } public Y y() { return y; } } class BatchRequest { public final List<Tuple<StorageObject, Map<Option, ?>>> toDelete; public final List<Tuple<StorageObject, Map<Option, ?>>> toUpdate; public final List<Tuple<StorageObject, Map<Option, ?>>> toGet; public BatchRequest(Iterable<Tuple<StorageObject, Map<Option, ?>>> toDelete, Iterable<Tuple<StorageObject, Map<Option, ?>>> toUpdate, Iterable<Tuple<StorageObject, Map<Option, ?>>> toGet) { this.toDelete = ImmutableList.copyOf(toDelete); this.toUpdate = ImmutableList.copyOf(toUpdate); this.toGet = ImmutableList.copyOf(toGet); } } class BatchResponse { public final Map<StorageObject, Tuple<Boolean, StorageServiceException>> deletes; public final Map<StorageObject, Tuple<StorageObject, StorageServiceException>> updates; public final Map<StorageObject, Tuple<StorageObject, StorageServiceException>> gets; public BatchResponse(Map<StorageObject, Tuple<Boolean, StorageServiceException>> deletes, Map<StorageObject, Tuple<StorageObject, StorageServiceException>> updates, Map<StorageObject, Tuple<StorageObject, StorageServiceException>> gets) { this.deletes = ImmutableMap.copyOf(deletes); this.updates = ImmutableMap.copyOf(updates); this.gets = ImmutableMap.copyOf(gets); } } Bucket create(Bucket bucket, Map<Option, ?> options) throws StorageServiceException; StorageObject create(StorageObject object, byte[] content, Map<Option, ?> options) throws StorageServiceException; Tuple<String, Iterable<Bucket>> list(Map<Option, ?> options) throws StorageServiceException; Tuple<String, Iterable<StorageObject>> list(String bucket, Map<Option, ?> options) throws StorageServiceException; Bucket get(Bucket bucket, Map<Option, ?> options) throws StorageServiceException; StorageObject get(StorageObject object, Map<Option, ?> options) throws StorageServiceException; Bucket patch(Bucket bucket, Map<Option, ?> options) throws StorageServiceException; StorageObject patch(StorageObject storageObject, Map<Option, ?> options) throws StorageServiceException; boolean delete(Bucket bucket, Map<Option, ?> options) throws StorageServiceException; boolean delete(StorageObject object, Map<Option, ?> options) throws StorageServiceException; BatchResponse batch(BatchRequest request) throws StorageServiceException; StorageObject compose(Iterable<StorageObject> sources, StorageObject target, Map<Option, ?> targetOptions) throws StorageServiceException; StorageObject copy(StorageObject source, Map<Option, ?> sourceOptions, StorageObject target, Map<Option, ?> targetOptions) throws StorageServiceException; byte[] load(StorageObject storageObject, Map<Option, ?> options) throws StorageServiceException; BlobReadChannel reader(StorageObject from, Map<Option, ?> options) throws StorageServiceException; BlobWriteChannel writer(StorageObject to, Map<Option, ?> options) throws StorageServiceException;}
1+
/* * Copyright 2015 Google Inc. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package com.google.gcloud.spi;import com.google.api.services.storage.model.Bucket;import com.google.api.services.storage.model.StorageObject;import com.google.common.collect.ImmutableList;import com.google.common.collect.ImmutableMap;import com.google.gcloud.storage.BlobWriteChannel;import com.google.gcloud.storage.StorageServiceException;import java.util.List;import java.util.Map;public interface StorageRpc { enum Option { PREDEFINED_ACL("predefinedAcl"), PREDEFINED_DEFAULT_OBJECT_ACL("predefinedDefaultObjectAcl"), IF_METAGENERATION_MATCH("ifMetagenerationMatch"), IF_METAGENERATION_NOT_MATCH("ifMetagenerationNotMatch"), IF_GENERATION_NOT_MATCH("ifGenerationMatch"), IF_GENERATION_MATCH("ifGenerationNotMatch"), IF_SOURCE_METAGENERATION_MATCH("ifSourceMetagenerationMatch"), IF_SOURCE_METAGENERATION_NOT_MATCH("ifSourceMetagenerationNotMatch"), IF_SOURCE_GENERATION_MATCH("ifSourceGenerationMatch"), IF_SOURCE_GENERATION_NOT_MATCH("ifSourceGenerationNotMatch"), PREFIX("prefix"), MAX_RESULTS("maxResults"), PAGE_TOKEN("pageToken"), DELIMITER("delimiter"), VERSIONS("versions"); private final String value; Option(String value) { this.value = value; } public String value() { return value; } @SuppressWarnings("unchecked") <T> T get(Map<Option, ?> options) { return (T) options.get(this); } String getString(Map<Option, ?> options) { return get(options); } Long getLong(Map<Option, ?> options) { return get(options); } Boolean getBoolean(Map<Option, ?> options) { return get(options); } } class Tuple<X, Y> { private final X x; private final Y y; private Tuple(X x, Y y) { this.x = x; this.y = y; } public static <X, Y> Tuple<X, Y> of(X x, Y y) { return new Tuple<>(x, y); } public X x() { return x; } public Y y() { return y; } } class BatchRequest { public final List<Tuple<StorageObject, Map<Option, ?>>> toDelete; public final List<Tuple<StorageObject, Map<Option, ?>>> toUpdate; public final List<Tuple<StorageObject, Map<Option, ?>>> toGet; public BatchRequest(Iterable<Tuple<StorageObject, Map<Option, ?>>> toDelete, Iterable<Tuple<StorageObject, Map<Option, ?>>> toUpdate, Iterable<Tuple<StorageObject, Map<Option, ?>>> toGet) { this.toDelete = ImmutableList.copyOf(toDelete); this.toUpdate = ImmutableList.copyOf(toUpdate); this.toGet = ImmutableList.copyOf(toGet); } } class BatchResponse { public final Map<StorageObject, Tuple<Boolean, StorageServiceException>> deletes; public final Map<StorageObject, Tuple<StorageObject, StorageServiceException>> updates; public final Map<StorageObject, Tuple<StorageObject, StorageServiceException>> gets; public BatchResponse(Map<StorageObject, Tuple<Boolean, StorageServiceException>> deletes, Map<StorageObject, Tuple<StorageObject, StorageServiceException>> updates, Map<StorageObject, Tuple<StorageObject, StorageServiceException>> gets) { this.deletes = ImmutableMap.copyOf(deletes); this.updates = ImmutableMap.copyOf(updates); this.gets = ImmutableMap.copyOf(gets); } } Bucket create(Bucket bucket, Map<Option, ?> options) throws StorageServiceException; StorageObject create(StorageObject object, byte[] content, Map<Option, ?> options) throws StorageServiceException; Tuple<String, Iterable<Bucket>> list(Map<Option, ?> options) throws StorageServiceException; Tuple<String, Iterable<StorageObject>> list(String bucket, Map<Option, ?> options) throws StorageServiceException; Bucket get(Bucket bucket, Map<Option, ?> options) throws StorageServiceException; StorageObject get(StorageObject object, Map<Option, ?> options) throws StorageServiceException; Bucket patch(Bucket bucket, Map<Option, ?> options) throws StorageServiceException; StorageObject patch(StorageObject storageObject, Map<Option, ?> options) throws StorageServiceException; boolean delete(Bucket bucket, Map<Option, ?> options) throws StorageServiceException; boolean delete(StorageObject object, Map<Option, ?> options) throws StorageServiceException; BatchResponse batch(BatchRequest request) throws StorageServiceException; StorageObject compose(Iterable<StorageObject> sources, StorageObject target, Map<Option, ?> targetOptions) throws StorageServiceException; StorageObject copy(StorageObject source, Map<Option, ?> sourceOptions, StorageObject target, Map<Option, ?> targetOptions) throws StorageServiceException; byte[] load(StorageObject storageObject, Map<Option, ?> options) throws StorageServiceException; byte[] read(StorageObject from, Map<Option, ?> options, int position, int bytes) throws StorageServiceException; BlobWriteChannel writer(StorageObject to, Map<Option, ?> options) throws StorageServiceException;}

branches/tswast-patch-1/src/main/java/com/google/gcloud/storage/BlobReadChannel.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.google.gcloud.storage;
1818

1919
import java.io.Closeable;
20+
import java.io.IOException;
2021
import java.io.Serializable;
2122
import java.nio.channels.ReadableByteChannel;
2223

@@ -37,5 +38,5 @@ public interface BlobReadChannel extends ReadableByteChannel, Serializable, Clos
3738
@Override
3839
void close();
3940

40-
void seek(int position);
41+
void seek(int position) throws IOException;
4142
}

branches/tswast-patch-1/src/main/java/com/google/gcloud/storage/StorageServiceImpl.java

Lines changed: 107 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@
3434
import com.google.gcloud.spi.StorageRpc;
3535
import com.google.gcloud.spi.StorageRpc.Tuple;
3636

37+
import java.io.IOException;
38+
import java.io.ObjectInputStream;
39+
import java.io.ObjectOutputStream;
3740
import java.io.Serializable;
41+
import java.nio.ByteBuffer;
3842
import java.util.Arrays;
3943
import java.util.List;
4044
import java.util.Map;
@@ -308,12 +312,111 @@ private <I, O extends Serializable> List<BatchResponse.Result<O>> transformBatch
308312
return response;
309313
}
310314

315+
private static class BlobReadChannelImpl implements BlobReadChannel {
316+
317+
private final StorageServiceOptions serviceOptions;
318+
private final Blob blob;
319+
private final Map<StorageRpc.Option, ?> requestOptions;
320+
private int position;
321+
private boolean isOpen;
322+
private boolean endOfStream;
323+
324+
private transient StorageRpc storageRpc;
325+
private transient RetryParams retryParams;
326+
private transient StorageObject storageObject;
327+
private transient int bufferPos;
328+
private transient byte[] buffer;
329+
330+
BlobReadChannelImpl(StorageServiceOptions serviceOptions, Blob blob,
331+
Map<StorageRpc.Option, ?> requestOptions) {
332+
this.serviceOptions = serviceOptions;
333+
this.blob = blob;
334+
this.requestOptions = requestOptions;
335+
isOpen = true;
336+
initTransients();
337+
}
338+
339+
private void writeObject(ObjectOutputStream out) throws IOException {
340+
if (buffer != null) {
341+
position += bufferPos;
342+
buffer = null;
343+
bufferPos = 0;
344+
endOfStream = false;
345+
}
346+
out.defaultWriteObject();
347+
}
348+
349+
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
350+
in.defaultReadObject();
351+
initTransients();
352+
}
353+
354+
private void initTransients() {
355+
storageRpc = serviceOptions.storageRpc();
356+
retryParams = MoreObjects.firstNonNull(serviceOptions.retryParams(), RetryParams.noRetries());
357+
storageObject = blob.toPb();
358+
}
359+
360+
@Override
361+
public boolean isOpen() {
362+
return isOpen;
363+
}
364+
365+
@Override
366+
public void close() {
367+
if (isOpen) {
368+
buffer = null;
369+
isOpen = false;
370+
}
371+
}
372+
373+
private void validateOpen() throws IOException {
374+
if (!isOpen) {
375+
throw new IOException("stream is closed");
376+
}
377+
}
378+
379+
@Override
380+
public void seek(int position) throws IOException {
381+
validateOpen();
382+
throw new UnsupportedOperationException("not supported yet");
383+
// todo: implement
384+
}
385+
386+
@Override
387+
public int read(ByteBuffer byteBuffer) throws IOException {
388+
validateOpen();
389+
if (buffer == null) {
390+
if (endOfStream) {
391+
return -1;
392+
}
393+
final int toRead = Math.max(byteBuffer.remaining(), 256 * 1024);
394+
buffer = runWithRetries(new Callable<byte[]>() {
395+
@Override
396+
public byte[] call() {
397+
return storageRpc.read(storageObject, requestOptions, position, toRead);
398+
}
399+
}, retryParams, EXCEPTION_HANDLER);
400+
if (toRead > buffer.length) {
401+
endOfStream = true;
402+
}
403+
}
404+
int toWrite = Math.min(buffer.length - bufferPos, byteBuffer.remaining());
405+
byteBuffer.put(buffer, bufferPos, toWrite);
406+
bufferPos += toWrite;
407+
if (bufferPos >= buffer.length) {
408+
position += buffer.length;
409+
buffer = null;
410+
bufferPos = 0;
411+
}
412+
return toWrite;
413+
}
414+
}
415+
311416
@Override
312417
public BlobReadChannel reader(Blob blob, BlobSourceOption... options) {
313-
// todo: Use retry helper on retriable failures
314-
// todo: consider changing lower level api to handle segments
315-
final Map<StorageRpc.Option, ?> optionsMap = optionMap(blob, options);
316-
return storageRpc.reader(blob.toPb(), optionsMap);
418+
Map<StorageRpc.Option, ?> optionsMap = optionMap(blob, options);
419+
return new BlobReadChannelImpl(options(), blob, optionsMap);
317420
}
318421

319422
@Override

branches/tswast-patch-1/src/main/java/com/google/gcloud/storage/StorageServiceOptions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ private StorageServiceOptions(Builder builder) {
7070
pathDelimiter = MoreObjects.firstNonNull(builder.pathDelimiter, DEFAULT_PATH_DELIMITER);
7171
project = builder.project != null ? builder.project : defaultProject();
7272
Preconditions.checkArgument(project != null, "Missing required project id");
73+
// todo: consider providing read-timeout
7374
}
7475

7576
@Override

0 commit comments

Comments
 (0)