Skip to content

Commit bd82561

Browse files
authored
chore(storage): implement NewRangeReader interface, refactor gRPC read (#6053)
1 parent 9266276 commit bd82561

File tree

6 files changed

+576
-243
lines changed

6 files changed

+576
-243
lines changed

storage/client.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ type storageClient interface {
8383
ComposeObject(ctx context.Context, req *composeObjectRequest, opts ...storageOption) (*ObjectAttrs, error)
8484
RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error)
8585

86-
OpenReader(ctx context.Context, r *Reader, opts ...storageOption) error
86+
NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (*Reader, error)
8787
OpenWriter(ctx context.Context, w *Writer, opts ...storageOption) error
8888

8989
// IAM methods.
@@ -211,6 +211,16 @@ type userProjectOption struct {
211211

212212
func (o *userProjectOption) Apply(s *settings) { s.userProject = o.project }
213213

214+
type newRangeReaderParams struct {
215+
bucket string
216+
conds *Conditions
217+
encryptionKey []byte
218+
gen int64
219+
length int64
220+
object string
221+
offset int64
222+
}
223+
214224
type composeObjectRequest struct {
215225
dstBucket string
216226
dstObject string

storage/client_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,51 @@ func TestDeleteDefaultObjectACLEmulated(t *testing.T) {
585585
})
586586
}
587587

588+
func TestOpenReaderEmulated(t *testing.T) {
589+
transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) {
590+
// Populate test data.
591+
_, err := client.CreateBucket(context.Background(), project, &BucketAttrs{
592+
Name: bucket,
593+
})
594+
if err != nil {
595+
t.Fatalf("client.CreateBucket: %v", err)
596+
}
597+
prefix := time.Now().Nanosecond()
598+
want := &ObjectAttrs{
599+
Bucket: bucket,
600+
Name: fmt.Sprintf("%d-object-%d", prefix, time.Now().Nanosecond()),
601+
}
602+
w := veneerClient.Bucket(bucket).Object(want.Name).NewWriter(context.Background())
603+
if _, err := w.Write(randomBytesToWrite); err != nil {
604+
t.Fatalf("failed to populate test data: %v", err)
605+
}
606+
if err := w.Close(); err != nil {
607+
t.Fatalf("closing object: %v", err)
608+
}
609+
610+
params := &newRangeReaderParams{
611+
bucket: bucket,
612+
object: want.Name,
613+
gen: defaultGen,
614+
offset: 0,
615+
length: -1,
616+
}
617+
r, err := client.NewRangeReader(context.Background(), params)
618+
if err != nil {
619+
t.Fatalf("opening reading: %v", err)
620+
}
621+
wantLen := len(randomBytesToWrite)
622+
got := make([]byte, wantLen)
623+
n, err := r.Read(got)
624+
if n != wantLen {
625+
t.Fatalf("expected to read %d bytes, but got %d", wantLen, n)
626+
}
627+
if diff := cmp.Diff(got, randomBytesToWrite); diff != "" {
628+
t.Fatalf("Read: got(-),want(+):\n%s", diff)
629+
}
630+
})
631+
}
632+
588633
func initEmulatorClients() func() error {
589634
noopCloser := func() error { return nil }
590635
if !isEmulatorEnvironmentSet() {

storage/grpc_client.go

Lines changed: 246 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@ package storage
1616

1717
import (
1818
"context"
19+
"fmt"
20+
"io"
1921
"os"
2022

23+
"cloud.google.com/go/internal/trace"
2124
gapic "cloud.google.com/go/storage/internal/apiv2"
2225
"google.golang.org/api/iterator"
2326
"google.golang.org/api/option"
@@ -565,9 +568,134 @@ func (c *grpcStorageClient) RewriteObject(ctx context.Context, req *rewriteObjec
565568
return nil, errMethodNotSupported
566569
}
567570

568-
func (c *grpcStorageClient) OpenReader(ctx context.Context, r *Reader, opts ...storageOption) error {
569-
return errMethodNotSupported
571+
func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) {
572+
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.NewRangeReader")
573+
defer func() { trace.EndSpan(ctx, err) }()
574+
575+
if params.conds != nil {
576+
if err := params.conds.validate("grpcStorageClient.NewRangeReader"); err != nil {
577+
return nil, err
578+
}
579+
}
580+
581+
s := callSettings(c.settings, opts...)
582+
583+
// A negative length means "read to the end of the object", but the
584+
// read_limit field it corresponds to uses zero to mean the same thing. Thus
585+
// we coerce the length to 0 to read to the end of the object.
586+
if params.length < 0 {
587+
params.length = 0
588+
}
589+
590+
b := bucketResourceName(globalProjectAlias, params.bucket)
591+
// TODO(noahdietz): Use encryptionKey to set relevant request fields.
592+
req := &storagepb.ReadObjectRequest{
593+
Bucket: b,
594+
Object: params.object,
595+
}
596+
// The default is a negative value, which means latest.
597+
if params.gen >= 0 {
598+
req.Generation = params.gen
599+
}
600+
601+
// Define a function that initiates a Read with offset and length, assuming
602+
// we have already read seen bytes.
603+
reopen := func(seen int64) (*readStreamResponse, context.CancelFunc, error) {
604+
// If the context has already expired, return immediately without making
605+
// we call.
606+
if err := ctx.Err(); err != nil {
607+
return nil, nil, err
608+
}
609+
610+
cc, cancel := context.WithCancel(ctx)
611+
612+
start := params.offset + seen
613+
// Only set a ReadLimit if length is greater than zero, because zero
614+
// means read it all.
615+
if params.length > 0 {
616+
req.ReadLimit = params.length - seen
617+
}
618+
req.ReadOffset = start
619+
620+
if err := applyCondsProto("gRPCReader.reopen", params.gen, params.conds, req); err != nil {
621+
cancel()
622+
return nil, nil, err
623+
}
624+
625+
var stream storagepb.Storage_ReadObjectClient
626+
var msg *storagepb.ReadObjectResponse
627+
var err error
628+
629+
err = run(cc, func() error {
630+
stream, err = c.raw.ReadObject(cc, req, s.gax...)
631+
if err != nil {
632+
return err
633+
}
634+
635+
msg, err = stream.Recv()
636+
637+
return err
638+
}, s.retry, s.idempotent, setRetryHeaderGRPC(ctx))
639+
if err != nil {
640+
// Close the stream context we just created to ensure we don't leak
641+
// resources.
642+
cancel()
643+
return nil, nil, err
644+
}
645+
646+
return &readStreamResponse{stream, msg}, cancel, nil
647+
}
648+
649+
res, cancel, err := reopen(0)
650+
if err != nil {
651+
return nil, err
652+
}
653+
654+
// The first message was Recv'd on stream open, use it to populate the
655+
// object metadata.
656+
msg := res.response
657+
obj := msg.GetMetadata()
658+
// This is the size of the entire object, even if only a range was requested.
659+
size := obj.GetSize()
660+
661+
r = &Reader{
662+
Attrs: ReaderObjectAttrs{
663+
Size: size,
664+
ContentType: obj.GetContentType(),
665+
ContentEncoding: obj.GetContentEncoding(),
666+
CacheControl: obj.GetCacheControl(),
667+
LastModified: obj.GetUpdateTime().AsTime(),
668+
Metageneration: obj.GetMetageneration(),
669+
Generation: obj.GetGeneration(),
670+
},
671+
reader: &gRPCReader{
672+
stream: res.stream,
673+
reopen: reopen,
674+
cancel: cancel,
675+
size: size,
676+
// Store the content from the first Recv in the
677+
// client buffer for reading later.
678+
leftovers: msg.GetChecksummedData().GetContent(),
679+
},
680+
}
681+
682+
cr := msg.GetContentRange()
683+
if cr != nil {
684+
r.Attrs.StartOffset = cr.GetStart()
685+
r.remain = cr.GetEnd() - cr.GetStart() + 1
686+
} else {
687+
r.remain = size
688+
}
689+
690+
// Only support checksums when reading an entire object, not a range.
691+
if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil && params.offset == 0 && params.length == 0 {
692+
r.wantCRC = checksums.GetCrc32C()
693+
r.checkCRC = true
694+
}
695+
696+
return r, nil
570697
}
698+
571699
func (c *grpcStorageClient) OpenWriter(ctx context.Context, w *Writer, opts ...storageOption) error {
572700
return errMethodNotSupported
573701
}
@@ -653,3 +781,119 @@ func (c *grpcStorageClient) DeleteHMACKey(ctx context.Context, desc *hmacKeyDesc
653781
func setUserProjectMetadata(ctx context.Context, project string) context.Context {
654782
return metadata.AppendToOutgoingContext(ctx, "x-goog-user-project", project)
655783
}
784+
785+
type readStreamResponse struct {
786+
stream storagepb.Storage_ReadObjectClient
787+
response *storagepb.ReadObjectResponse
788+
}
789+
790+
type gRPCReader struct {
791+
seen, size int64
792+
stream storagepb.Storage_ReadObjectClient
793+
reopen func(seen int64) (*readStreamResponse, context.CancelFunc, error)
794+
leftovers []byte
795+
cancel context.CancelFunc
796+
}
797+
798+
// Read reads bytes into the user's buffer from an open gRPC stream.
799+
func (r *gRPCReader) Read(p []byte) (int, error) {
800+
// No stream to read from, either never initiliazed or Close was called.
801+
// Note: There is a potential concurrency issue if multiple routines are
802+
// using the same reader. One encounters an error and the stream is closed
803+
// and then reopened while the other routine attempts to read from it.
804+
if r.stream == nil {
805+
return 0, fmt.Errorf("reader has been closed")
806+
}
807+
808+
// The entire object has been read by this reader, return EOF.
809+
if r.size != 0 && r.size == r.seen {
810+
return 0, io.EOF
811+
}
812+
813+
var n int
814+
// Read leftovers and return what was available to conform to the Reader
815+
// interface: https://pkg.go.dev/io#Reader.
816+
if len(r.leftovers) > 0 {
817+
n = copy(p, r.leftovers)
818+
r.seen += int64(n)
819+
r.leftovers = r.leftovers[n:]
820+
return n, nil
821+
}
822+
823+
// Attempt to Recv the next message on the stream.
824+
msg, err := r.recv()
825+
if err != nil {
826+
return 0, err
827+
}
828+
829+
// TODO: Determine if we need to capture incremental CRC32C for this
830+
// chunk. The Object CRC32C checksum is captured when directed to read
831+
// the entire Object. If directed to read a range, we may need to
832+
// calculate the range's checksum for verification if the checksum is
833+
// present in the response here.
834+
// TODO: Figure out if we need to support decompressive transcoding
835+
// https://cloud.google.com/storage/docs/transcoding.
836+
content := msg.GetChecksummedData().GetContent()
837+
n = copy(p[n:], content)
838+
leftover := len(content) - n
839+
if leftover > 0 {
840+
// Wasn't able to copy all of the data in the message, store for
841+
// future Read calls.
842+
r.leftovers = content[n:]
843+
}
844+
r.seen += int64(n)
845+
846+
return n, nil
847+
}
848+
849+
// Close cancels the read stream's context in order for it to be closed and
850+
// collected.
851+
func (r *gRPCReader) Close() error {
852+
if r.cancel != nil {
853+
r.cancel()
854+
}
855+
r.stream = nil
856+
return nil
857+
}
858+
859+
// recv attempts to Recv the next message on the stream. In the event
860+
// that a retryable error is encountered, the stream will be closed, reopened,
861+
// and Recv again. This will attempt to Recv until one of the following is true:
862+
//
863+
// * Recv is successful
864+
// * A non-retryable error is encountered
865+
// * The Reader's context is canceled
866+
//
867+
// The last error received is the one that is returned, which could be from
868+
// an attempt to reopen the stream.
869+
//
870+
// This is an experimental API and not intended for public use.
871+
func (r *gRPCReader) recv() (*storagepb.ReadObjectResponse, error) {
872+
msg, err := r.stream.Recv()
873+
if err != nil && shouldRetry(err) {
874+
// This will "close" the existing stream and immediately attempt to
875+
// reopen the stream, but will backoff if further attempts are necessary.
876+
// Reopening the stream Recvs the first message, so if retrying is
877+
// successful, the next logical chunk will be returned.
878+
msg, err = r.reopenStream()
879+
}
880+
881+
return msg, err
882+
}
883+
884+
// reopenStream "closes" the existing stream and attempts to reopen a stream and
885+
// sets the Reader's stream and cancelStream properties in the process.
886+
//
887+
// This is an experimental API and not intended for public use.
888+
func (r *gRPCReader) reopenStream() (*storagepb.ReadObjectResponse, error) {
889+
// Close existing stream and initialize new stream with updated offset.
890+
r.Close()
891+
892+
res, cancel, err := r.reopen(r.seen)
893+
if err != nil {
894+
return nil, err
895+
}
896+
r.stream = res.stream
897+
r.cancel = cancel
898+
return res.response, nil
899+
}

0 commit comments

Comments
 (0)