@@ -47,6 +47,10 @@ const (
4747 // the resource being operated on.
4848 resourcePrefixHeader = "google-cloud-resource-prefix"
4949
50+ // routeToLeaderHeader is the name of the metadata header if RW/PDML
51+ // requests need to route to leader.
52+ routeToLeaderHeader = "x-goog-spanner-route-to-leader"
53+
5054 // numChannels is the default value for NumChannels of client.
5155 numChannels = 4
5256)
@@ -83,14 +87,15 @@ func parseDatabaseName(db string) (project, instance, database string, err error
8387// Client is a client for reading and writing data to a Cloud Spanner database.
8488// A client is safe to use concurrently, except for its Close method.
8589type Client struct {
86- sc * sessionClient
87- idleSessions * sessionPool
88- logger * log.Logger
89- qo QueryOptions
90- ro ReadOptions
91- ao []ApplyOption
92- txo TransactionOptions
93- ct * commonTags
90+ sc * sessionClient
91+ idleSessions * sessionPool
92+ logger * log.Logger
93+ qo QueryOptions
94+ ro ReadOptions
95+ ao []ApplyOption
96+ txo TransactionOptions
97+ ct * commonTags
98+ disableRouteToLeader bool
9499}
95100
96101// DatabaseName returns the full name of a database, e.g.,
@@ -147,24 +152,33 @@ type ClientConfig struct {
147152 // database by this client.
148153 DatabaseRole string
149154
155+ // DisableRouteToLeader specifies if all the requests of type read-write and PDML
156+ // need to be routed to the leader region.
157+ //
158+ // Default: false
159+ DisableRouteToLeader bool
160+
150161 // Logger is the logger to use for this client. If it is nil, all logging
151162 // will be directed to the standard logger.
152163 Logger * log.Logger
153164}
154165
155- func contextWithOutgoingMetadata (ctx context.Context , md metadata.MD ) context.Context {
166+ func contextWithOutgoingMetadata (ctx context.Context , md metadata.MD , disableRouteToLeader bool ) context.Context {
156167 existing , ok := metadata .FromOutgoingContext (ctx )
157168 if ok {
158169 md = metadata .Join (existing , md )
159170 }
171+ if ! disableRouteToLeader {
172+ md = metadata .Join (md , metadata .Pairs (routeToLeaderHeader , "true" ))
173+ }
160174 return metadata .NewOutgoingContext (ctx , md )
161175}
162176
163177// NewClient creates a client to a database. A valid database name has the
164178// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses
165179// a default configuration.
166180func NewClient (ctx context.Context , database string , opts ... option.ClientOption ) (* Client , error ) {
167- return NewClientWithConfig (ctx , database , ClientConfig {SessionPoolConfig : DefaultSessionPoolConfig }, opts ... )
181+ return NewClientWithConfig (ctx , database , ClientConfig {SessionPoolConfig : DefaultSessionPoolConfig , DisableRouteToLeader : false }, opts ... )
168182}
169183
170184// NewClientWithConfig creates a client to a database. A valid database name has
@@ -224,7 +238,7 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
224238 config .incStep = DefaultSessionPoolConfig .incStep
225239 }
226240 // Create a session client.
227- sc := newSessionClient (pool , database , config .UserAgent , sessionLabels , config .DatabaseRole , metadata .Pairs (resourcePrefixHeader , database ), config .Logger , config .CallOptions )
241+ sc := newSessionClient (pool , database , config .UserAgent , sessionLabels , config .DatabaseRole , config . DisableRouteToLeader , metadata .Pairs (resourcePrefixHeader , database ), config .Logger , config .CallOptions )
228242 // Create a session pool.
229243 config .SessionPoolConfig .sessionLabels = sessionLabels
230244 sp , err := newSessionPool (sc , config .SessionPoolConfig )
@@ -233,14 +247,15 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
233247 return nil , err
234248 }
235249 c = & Client {
236- sc : sc ,
237- idleSessions : sp ,
238- logger : config .Logger ,
239- qo : getQueryOptions (config .QueryOptions ),
240- ro : config .ReadOptions ,
241- ao : config .ApplyOptions ,
242- txo : config .TransactionOptions ,
243- ct : getCommonTags (sc ),
250+ sc : sc ,
251+ idleSessions : sp ,
252+ logger : config .Logger ,
253+ qo : getQueryOptions (config .QueryOptions ),
254+ ro : config .ReadOptions ,
255+ ao : config .ApplyOptions ,
256+ txo : config .TransactionOptions ,
257+ ct : getCommonTags (sc ),
258+ disableRouteToLeader : config .DisableRouteToLeader ,
244259 }
245260 return c , nil
246261}
@@ -303,6 +318,7 @@ func (c *Client) Single() *ReadOnlyTransaction {
303318 t .txReadOnly .txReadEnv = t
304319 t .txReadOnly .qo = c .qo
305320 t .txReadOnly .ro = c .ro
321+ t .txReadOnly .disableRouteToLeader = true
306322 t .txReadOnly .replaceSessionFunc = func (ctx context.Context ) error {
307323 if t .sh == nil {
308324 return spannerErrorf (codes .InvalidArgument , "missing session handle on transaction" )
@@ -340,6 +356,7 @@ func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
340356 t .txReadOnly .txReadEnv = t
341357 t .txReadOnly .qo = c .qo
342358 t .txReadOnly .ro = c .ro
359+ t .txReadOnly .disableRouteToLeader = true
343360 t .ct = c .ct
344361 return t
345362}
@@ -372,7 +389,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
372389 sh = & sessionHandle {session : s }
373390
374391 // Begin transaction.
375- res , err := sh .getClient ().BeginTransaction (contextWithOutgoingMetadata (ctx , sh .getMetadata ()), & sppb.BeginTransactionRequest {
392+ res , err := sh .getClient ().BeginTransaction (contextWithOutgoingMetadata (ctx , sh .getMetadata (), true ), & sppb.BeginTransactionRequest {
376393 Session : sh .getID (),
377394 Options : & sppb.TransactionOptions {
378395 Mode : & sppb.TransactionOptions_ReadOnly_ {
@@ -405,6 +422,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
405422 t .txReadOnly .txReadEnv = t
406423 t .txReadOnly .qo = c .qo
407424 t .txReadOnly .ro = c .ro
425+ t .txReadOnly .disableRouteToLeader = true
408426 t .ct = c .ct
409427 return t , nil
410428}
@@ -434,6 +452,7 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)
434452 t .txReadOnly .txReadEnv = t
435453 t .txReadOnly .qo = c .qo
436454 t .txReadOnly .ro = c .ro
455+ t .txReadOnly .disableRouteToLeader = true
437456 t .ct = c .ct
438457 return t
439458}
@@ -527,6 +546,7 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
527546 t .txReadOnly .txReadEnv = t
528547 t .txReadOnly .qo = c .qo
529548 t .txReadOnly .ro = c .ro
549+ t .txReadOnly .disableRouteToLeader = c .disableRouteToLeader
530550 t .txOpts = c .txo .merge (options )
531551 t .ct = c .ct
532552
@@ -607,7 +627,7 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption)
607627 }, TransactionOptions {CommitPriority : ao .priority , TransactionTag : ao .transactionTag })
608628 return resp .CommitTs , err
609629 }
610- t := & writeOnlyTransaction {sp : c .idleSessions , commitPriority : ao .priority , transactionTag : ao .transactionTag }
630+ t := & writeOnlyTransaction {sp : c .idleSessions , commitPriority : ao .priority , transactionTag : ao .transactionTag , disableRouteToLeader : c . disableRouteToLeader }
611631 return t .applyAtLeastOnce (ctx , ms ... )
612632}
613633
0 commit comments