@@ -19,13 +19,16 @@ package spanner
1919import (
2020 "context"
2121 "fmt"
22+ "io"
2223 "log"
2324 "os"
2425 "regexp"
2526 "time"
2627
2728 "cloud.google.com/go/internal/trace"
2829 sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
30+ "github.com/googleapis/gax-go/v2"
31+ "google.golang.org/api/iterator"
2932 "google.golang.org/api/option"
3033 "google.golang.org/api/option/internaloption"
3134 gtransport "google.golang.org/api/transport/grpc"
@@ -97,6 +100,7 @@ type Client struct {
97100 ro ReadOptions
98101 ao []ApplyOption
99102 txo TransactionOptions
103+ bwo BatchWriteOptions
100104 ct * commonTags
101105 disableRouteToLeader bool
102106}
@@ -138,6 +142,9 @@ type ClientConfig struct {
138142 // TransactionOptions is the configuration for a transaction.
139143 TransactionOptions TransactionOptions
140144
145+ // BatchWriteOptions is the configuration for a BatchWrite request.
146+ BatchWriteOptions BatchWriteOptions
147+
141148 // CallOptions is the configuration for providing custom retry settings that
142149 // override the default values.
143150 CallOptions * vkit.CallOptions
@@ -281,6 +288,7 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
281288 ro : config .ReadOptions ,
282289 ao : config .ApplyOptions ,
283290 txo : config .TransactionOptions ,
291+ bwo : config .BatchWriteOptions ,
284292 ct : getCommonTags (sc ),
285293 disableRouteToLeader : config .DisableRouteToLeader ,
286294 }
@@ -669,6 +677,217 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption)
669677 return t .applyAtLeastOnce (ctx , ms ... )
670678}
671679
680+ // BatchWriteOptions provides options for a BatchWriteRequest.
681+ type BatchWriteOptions struct {
682+ // Priority is the RPC priority to use for this request.
683+ Priority sppb.RequestOptions_Priority
684+
685+ // The transaction tag to use for this request.
686+ TransactionTag string
687+ }
688+
689+ // merge combines two BatchWriteOptions such that the input parameter will have higher
690+ // order of precedence.
691+ func (bwo BatchWriteOptions ) merge (opts BatchWriteOptions ) BatchWriteOptions {
692+ merged := BatchWriteOptions {
693+ TransactionTag : bwo .TransactionTag ,
694+ Priority : bwo .Priority ,
695+ }
696+ if opts .TransactionTag != "" {
697+ merged .TransactionTag = opts .TransactionTag
698+ }
699+ if opts .Priority != sppb .RequestOptions_PRIORITY_UNSPECIFIED {
700+ merged .Priority = opts .Priority
701+ }
702+ return merged
703+ }
704+
705+ // BatchWriteResponseIterator is an iterator over BatchWriteResponse structures returned from BatchWrite RPC.
706+ type BatchWriteResponseIterator struct {
707+ ctx context.Context
708+ stream sppb.Spanner_BatchWriteClient
709+ err error
710+ dataReceived bool
711+ replaceSession func (ctx context.Context ) error
712+ rpc func (ctx context.Context ) (sppb.Spanner_BatchWriteClient , error )
713+ release func (error )
714+ cancel func ()
715+ }
716+
717+ // Next returns the next result. Its second return value is iterator.Done if
718+ // there are no more results. Once Next returns Done, all subsequent calls
719+ // will return Done.
720+ func (r * BatchWriteResponseIterator ) Next () (* sppb.BatchWriteResponse , error ) {
721+ for {
722+ // Stream finished or in error state.
723+ if r .err != nil {
724+ return nil , r .err
725+ }
726+
727+ // RPC not made yet.
728+ if r .stream == nil {
729+ r .stream , r .err = r .rpc (r .ctx )
730+ continue
731+ }
732+
733+ // Read from the stream.
734+ var response * sppb.BatchWriteResponse
735+ response , r .err = r .stream .Recv ()
736+
737+ // Return an item.
738+ if r .err == nil {
739+ r .dataReceived = true
740+ return response , nil
741+ }
742+
743+ // Stream finished.
744+ if r .err == io .EOF {
745+ r .err = iterator .Done
746+ return nil , r .err
747+ }
748+
749+ // Retry request on session not found error only if no data has been received before.
750+ if ! r .dataReceived && r .replaceSession != nil && isSessionNotFoundError (r .err ) {
751+ r .err = r .replaceSession (r .ctx )
752+ r .stream = nil
753+ }
754+ }
755+ }
756+
757+ // Stop terminates the iteration. It should be called after you finish using the
758+ // iterator.
759+ func (r * BatchWriteResponseIterator ) Stop () {
760+ if r .stream != nil {
761+ err := r .err
762+ if err == iterator .Done {
763+ err = nil
764+ }
765+ defer trace .EndSpan (r .ctx , err )
766+ }
767+ if r .cancel != nil {
768+ r .cancel ()
769+ r .cancel = nil
770+ }
771+ if r .release != nil {
772+ r .release (r .err )
773+ r .release = nil
774+ }
775+ if r .err == nil {
776+ r .err = spannerErrorf (codes .FailedPrecondition , "Next called after Stop" )
777+ }
778+ }
779+
780+ // Do calls the provided function once in sequence for each item in the
781+ // iteration. If the function returns a non-nil error, Do immediately returns
782+ // that error.
783+ //
784+ // If there are no items in the iterator, Do will return nil without calling the
785+ // provided function.
786+ //
787+ // Do always calls Stop on the iterator.
788+ func (r * BatchWriteResponseIterator ) Do (f func (r * sppb.BatchWriteResponse ) error ) error {
789+ defer r .Stop ()
790+ for {
791+ row , err := r .Next ()
792+ switch err {
793+ case iterator .Done :
794+ return nil
795+ case nil :
796+ if err = f (row ); err != nil {
797+ return err
798+ }
799+ default :
800+ return err
801+ }
802+ }
803+ }
804+
805+ // BatchWrite applies a list of mutation groups in a collection of efficient
806+ // transactions. The mutation groups are applied non-atomically in an
807+ // unspecified order and thus, they must be independent of each other. Partial
808+ // failure is possible, i.e., some mutation groups may have been applied
809+ // successfully, while some may have failed. The results of individual batches
810+ // are streamed into the response as the batches are applied.
811+ //
812+ // BatchWrite requests are not replay protected, meaning that each mutation
813+ // group may be applied more than once. Replays of non-idempotent mutations
814+ // may have undesirable effects. For example, replays of an insert mutation
815+ // may produce an already exists error or if you use generated or commit
816+ // timestamp-based keys, it may result in additional rows being added to the
817+ // mutation's table. We recommend structuring your mutation groups to be
818+ // idempotent to avoid this issue.
819+ func (c * Client ) BatchWrite (ctx context.Context , mgs []* MutationGroup ) * BatchWriteResponseIterator {
820+ return c .BatchWriteWithOptions (ctx , mgs , BatchWriteOptions {})
821+ }
822+
823+ // BatchWriteWithOptions is same as BatchWrite. It accepts additional options to customize the request.
824+ func (c * Client ) BatchWriteWithOptions (ctx context.Context , mgs []* MutationGroup , opts BatchWriteOptions ) * BatchWriteResponseIterator {
825+ ctx = trace .StartSpan (ctx , "cloud.google.com/go/spanner.BatchWrite" )
826+
827+ var err error
828+ defer func () {
829+ trace .EndSpan (ctx , err )
830+ }()
831+
832+ opts = c .bwo .merge (opts )
833+
834+ mgsPb , err := mutationGroupsProto (mgs )
835+ if err != nil {
836+ return & BatchWriteResponseIterator {err : err }
837+ }
838+
839+ var sh * sessionHandle
840+ sh , err = c .idleSessions .take (ctx )
841+ if err != nil {
842+ return & BatchWriteResponseIterator {err : err }
843+ }
844+
845+ rpc := func (ct context.Context ) (sppb.Spanner_BatchWriteClient , error ) {
846+ var md metadata.MD
847+ stream , rpcErr := sh .getClient ().BatchWrite (contextWithOutgoingMetadata (ct , sh .getMetadata (), c .disableRouteToLeader ), & sppb.BatchWriteRequest {
848+ Session : sh .getID (),
849+ MutationGroups : mgsPb ,
850+ RequestOptions : createRequestOptions (opts .Priority , "" , opts .TransactionTag ),
851+ }, gax .WithGRPCOptions (grpc .Header (& md )))
852+
853+ if getGFELatencyMetricsFlag () && md != nil && c .ct != nil {
854+ if metricErr := createContextAndCaptureGFELatencyMetrics (ct , c .ct , md , "BatchWrite" ); metricErr != nil {
855+ trace .TracePrintf (ct , nil , "Error in recording GFE Latency. Try disabling and rerunning. Error: %v" , err )
856+ }
857+ }
858+ return stream , rpcErr
859+ }
860+
861+ replaceSession := func (ct context.Context ) error {
862+ if sh != nil {
863+ sh .destroy ()
864+ }
865+ var sessionErr error
866+ sh , sessionErr = c .idleSessions .take (ct )
867+ return sessionErr
868+ }
869+
870+ release := func (err error ) {
871+ if sh == nil {
872+ return
873+ }
874+ if isSessionNotFoundError (err ) {
875+ sh .destroy ()
876+ }
877+ sh .recycle ()
878+ }
879+
880+ ctx , cancel := context .WithCancel (ctx )
881+ ctx = trace .StartSpan (ctx , "cloud.google.com/go/spanner.BatchWriteResponseIterator" )
882+ return & BatchWriteResponseIterator {
883+ ctx : ctx ,
884+ rpc : rpc ,
885+ replaceSession : replaceSession ,
886+ release : release ,
887+ cancel : cancel ,
888+ }
889+ }
890+
672891// logf logs the given message to the given logger, or the standard logger if
673892// the given logger is nil.
674893func logf (logger * log.Logger , format string , v ... interface {}) {
0 commit comments