@@ -16,8 +16,11 @@ package storage
1616
1717import (
1818 "context"
19+ "fmt"
20+ "io"
1921 "os"
2022
23+ "cloud.google.com/go/internal/trace"
2124 gapic "cloud.google.com/go/storage/internal/apiv2"
2225 "google.golang.org/api/iterator"
2326 "google.golang.org/api/option"
@@ -565,9 +568,134 @@ func (c *grpcStorageClient) RewriteObject(ctx context.Context, req *rewriteObjec
565568 return nil , errMethodNotSupported
566569}
567570
568- func (c * grpcStorageClient ) OpenReader (ctx context.Context , r * Reader , opts ... storageOption ) error {
569- return errMethodNotSupported
571+ func (c * grpcStorageClient ) NewRangeReader (ctx context.Context , params * newRangeReaderParams , opts ... storageOption ) (r * Reader , err error ) {
572+ ctx = trace .StartSpan (ctx , "cloud.google.com/go/storage.grpcStorageClient.NewRangeReader" )
573+ defer func () { trace .EndSpan (ctx , err ) }()
574+
575+ if params .conds != nil {
576+ if err := params .conds .validate ("grpcStorageClient.NewRangeReader" ); err != nil {
577+ return nil , err
578+ }
579+ }
580+
581+ s := callSettings (c .settings , opts ... )
582+
583+ // A negative length means "read to the end of the object", but the
584+ // read_limit field it corresponds to uses zero to mean the same thing. Thus
585+ // we coerce the length to 0 to read to the end of the object.
586+ if params .length < 0 {
587+ params .length = 0
588+ }
589+
590+ b := bucketResourceName (globalProjectAlias , params .bucket )
591+ // TODO(noahdietz): Use encryptionKey to set relevant request fields.
592+ req := & storagepb.ReadObjectRequest {
593+ Bucket : b ,
594+ Object : params .object ,
595+ }
596+ // The default is a negative value, which means latest.
597+ if params .gen >= 0 {
598+ req .Generation = params .gen
599+ }
600+
601+ // Define a function that initiates a Read with offset and length, assuming
602+ // we have already read seen bytes.
603+ reopen := func (seen int64 ) (* readStreamResponse , context.CancelFunc , error ) {
604+ // If the context has already expired, return immediately without making
605+ // we call.
606+ if err := ctx .Err (); err != nil {
607+ return nil , nil , err
608+ }
609+
610+ cc , cancel := context .WithCancel (ctx )
611+
612+ start := params .offset + seen
613+ // Only set a ReadLimit if length is greater than zero, because zero
614+ // means read it all.
615+ if params .length > 0 {
616+ req .ReadLimit = params .length - seen
617+ }
618+ req .ReadOffset = start
619+
620+ if err := applyCondsProto ("gRPCReader.reopen" , params .gen , params .conds , req ); err != nil {
621+ cancel ()
622+ return nil , nil , err
623+ }
624+
625+ var stream storagepb.Storage_ReadObjectClient
626+ var msg * storagepb.ReadObjectResponse
627+ var err error
628+
629+ err = run (cc , func () error {
630+ stream , err = c .raw .ReadObject (cc , req , s .gax ... )
631+ if err != nil {
632+ return err
633+ }
634+
635+ msg , err = stream .Recv ()
636+
637+ return err
638+ }, s .retry , s .idempotent , setRetryHeaderGRPC (ctx ))
639+ if err != nil {
640+ // Close the stream context we just created to ensure we don't leak
641+ // resources.
642+ cancel ()
643+ return nil , nil , err
644+ }
645+
646+ return & readStreamResponse {stream , msg }, cancel , nil
647+ }
648+
649+ res , cancel , err := reopen (0 )
650+ if err != nil {
651+ return nil , err
652+ }
653+
654+ // The first message was Recv'd on stream open, use it to populate the
655+ // object metadata.
656+ msg := res .response
657+ obj := msg .GetMetadata ()
658+ // This is the size of the entire object, even if only a range was requested.
659+ size := obj .GetSize ()
660+
661+ r = & Reader {
662+ Attrs : ReaderObjectAttrs {
663+ Size : size ,
664+ ContentType : obj .GetContentType (),
665+ ContentEncoding : obj .GetContentEncoding (),
666+ CacheControl : obj .GetCacheControl (),
667+ LastModified : obj .GetUpdateTime ().AsTime (),
668+ Metageneration : obj .GetMetageneration (),
669+ Generation : obj .GetGeneration (),
670+ },
671+ reader : & gRPCReader {
672+ stream : res .stream ,
673+ reopen : reopen ,
674+ cancel : cancel ,
675+ size : size ,
676+ // Store the content from the first Recv in the
677+ // client buffer for reading later.
678+ leftovers : msg .GetChecksummedData ().GetContent (),
679+ },
680+ }
681+
682+ cr := msg .GetContentRange ()
683+ if cr != nil {
684+ r .Attrs .StartOffset = cr .GetStart ()
685+ r .remain = cr .GetEnd () - cr .GetStart () + 1
686+ } else {
687+ r .remain = size
688+ }
689+
690+ // Only support checksums when reading an entire object, not a range.
691+ if checksums := msg .GetObjectChecksums (); checksums != nil && checksums .Crc32C != nil && params .offset == 0 && params .length == 0 {
692+ r .wantCRC = checksums .GetCrc32C ()
693+ r .checkCRC = true
694+ }
695+
696+ return r , nil
570697}
698+
571699func (c * grpcStorageClient ) OpenWriter (ctx context.Context , w * Writer , opts ... storageOption ) error {
572700 return errMethodNotSupported
573701}
@@ -653,3 +781,119 @@ func (c *grpcStorageClient) DeleteHMACKey(ctx context.Context, desc *hmacKeyDesc
653781func setUserProjectMetadata (ctx context.Context , project string ) context.Context {
654782 return metadata .AppendToOutgoingContext (ctx , "x-goog-user-project" , project )
655783}
784+
785+ type readStreamResponse struct {
786+ stream storagepb.Storage_ReadObjectClient
787+ response * storagepb.ReadObjectResponse
788+ }
789+
790+ type gRPCReader struct {
791+ seen , size int64
792+ stream storagepb.Storage_ReadObjectClient
793+ reopen func (seen int64 ) (* readStreamResponse , context.CancelFunc , error )
794+ leftovers []byte
795+ cancel context.CancelFunc
796+ }
797+
798+ // Read reads bytes into the user's buffer from an open gRPC stream.
799+ func (r * gRPCReader ) Read (p []byte ) (int , error ) {
800+ // No stream to read from, either never initiliazed or Close was called.
801+ // Note: There is a potential concurrency issue if multiple routines are
802+ // using the same reader. One encounters an error and the stream is closed
803+ // and then reopened while the other routine attempts to read from it.
804+ if r .stream == nil {
805+ return 0 , fmt .Errorf ("reader has been closed" )
806+ }
807+
808+ // The entire object has been read by this reader, return EOF.
809+ if r .size != 0 && r .size == r .seen {
810+ return 0 , io .EOF
811+ }
812+
813+ var n int
814+ // Read leftovers and return what was available to conform to the Reader
815+ // interface: https://pkg.go.dev/io#Reader.
816+ if len (r .leftovers ) > 0 {
817+ n = copy (p , r .leftovers )
818+ r .seen += int64 (n )
819+ r .leftovers = r .leftovers [n :]
820+ return n , nil
821+ }
822+
823+ // Attempt to Recv the next message on the stream.
824+ msg , err := r .recv ()
825+ if err != nil {
826+ return 0 , err
827+ }
828+
829+ // TODO: Determine if we need to capture incremental CRC32C for this
830+ // chunk. The Object CRC32C checksum is captured when directed to read
831+ // the entire Object. If directed to read a range, we may need to
832+ // calculate the range's checksum for verification if the checksum is
833+ // present in the response here.
834+ // TODO: Figure out if we need to support decompressive transcoding
835+ // https://cloud.google.com/storage/docs/transcoding.
836+ content := msg .GetChecksummedData ().GetContent ()
837+ n = copy (p [n :], content )
838+ leftover := len (content ) - n
839+ if leftover > 0 {
840+ // Wasn't able to copy all of the data in the message, store for
841+ // future Read calls.
842+ r .leftovers = content [n :]
843+ }
844+ r .seen += int64 (n )
845+
846+ return n , nil
847+ }
848+
849+ // Close cancels the read stream's context in order for it to be closed and
850+ // collected.
851+ func (r * gRPCReader ) Close () error {
852+ if r .cancel != nil {
853+ r .cancel ()
854+ }
855+ r .stream = nil
856+ return nil
857+ }
858+
859+ // recv attempts to Recv the next message on the stream. In the event
860+ // that a retryable error is encountered, the stream will be closed, reopened,
861+ // and Recv again. This will attempt to Recv until one of the following is true:
862+ //
863+ // * Recv is successful
864+ // * A non-retryable error is encountered
865+ // * The Reader's context is canceled
866+ //
867+ // The last error received is the one that is returned, which could be from
868+ // an attempt to reopen the stream.
869+ //
870+ // This is an experimental API and not intended for public use.
871+ func (r * gRPCReader ) recv () (* storagepb.ReadObjectResponse , error ) {
872+ msg , err := r .stream .Recv ()
873+ if err != nil && shouldRetry (err ) {
874+ // This will "close" the existing stream and immediately attempt to
875+ // reopen the stream, but will backoff if further attempts are necessary.
876+ // Reopening the stream Recvs the first message, so if retrying is
877+ // successful, the next logical chunk will be returned.
878+ msg , err = r .reopenStream ()
879+ }
880+
881+ return msg , err
882+ }
883+
884+ // reopenStream "closes" the existing stream and attempts to reopen a stream and
885+ // sets the Reader's stream and cancelStream properties in the process.
886+ //
887+ // This is an experimental API and not intended for public use.
888+ func (r * gRPCReader ) reopenStream () (* storagepb.ReadObjectResponse , error ) {
889+ // Close existing stream and initialize new stream with updated offset.
890+ r .Close ()
891+
892+ res , cancel , err := r .reopen (r .seen )
893+ if err != nil {
894+ return nil , err
895+ }
896+ r .stream = res .stream
897+ r .cancel = cancel
898+ return res .response , nil
899+ }
0 commit comments