Skip to content

Commit fcab05f

Browse files
authored
feat(spanner): add x-goog-spanner-route-to-leader header to Spanner RPC contexts for RW/PDML transactions. (#7500)
* feat: add x-goog-spanner-route-to-leader header to Spanner RPC contexts for RW/PDML transactions. * incorporate requested changes * incorporate requested changes * fix tests
1 parent d382522 commit fcab05f

File tree

7 files changed

+85
-58
lines changed

7 files changed

+85
-58
lines changed

spanner/batch.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
133133
return nil, err
134134
}
135135
var md metadata.MD
136-
resp, err = client.PartitionRead(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.PartitionReadRequest{
136+
resp, err = client.PartitionRead(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), &sppb.PartitionReadRequest{
137137
Session: sid,
138138
Transaction: ts,
139139
Table: table,
@@ -202,7 +202,7 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
202202
Params: params,
203203
ParamTypes: paramTypes,
204204
}
205-
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md)))
205+
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), req, gax.WithGRPCOptions(grpc.Header(&md)))
206206

207207
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
208208
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "partitionQuery"); err != nil {
@@ -271,7 +271,7 @@ func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) {
271271
sid, client := sh.getID(), sh.getClient()
272272

273273
var md metadata.MD
274-
err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.DeleteSessionRequest{Name: sid}, gax.WithGRPCOptions(grpc.Header(&md)))
274+
err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata(), true), &sppb.DeleteSessionRequest{Name: sid}, gax.WithGRPCOptions(grpc.Header(&md)))
275275

276276
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
277277
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Cleanup"); err != nil {
@@ -356,7 +356,7 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
356356
}
357357
}
358358
return stream(
359-
contextWithOutgoingMetadata(ctx, sh.getMetadata()),
359+
contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader),
360360
sh.session.logger,
361361
rpc,
362362
t.setTimestamp,

spanner/client.go

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ const (
4747
// the resource being operated on.
4848
resourcePrefixHeader = "google-cloud-resource-prefix"
4949

50+
// routeToLeaderHeader is the name of the metadata header if RW/PDML
51+
// requests need to route to leader.
52+
routeToLeaderHeader = "x-goog-spanner-route-to-leader"
53+
5054
// numChannels is the default value for NumChannels of client.
5155
numChannels = 4
5256
)
@@ -83,14 +87,15 @@ func parseDatabaseName(db string) (project, instance, database string, err error
8387
// Client is a client for reading and writing data to a Cloud Spanner database.
8488
// A client is safe to use concurrently, except for its Close method.
8589
type Client struct {
86-
sc *sessionClient
87-
idleSessions *sessionPool
88-
logger *log.Logger
89-
qo QueryOptions
90-
ro ReadOptions
91-
ao []ApplyOption
92-
txo TransactionOptions
93-
ct *commonTags
90+
sc *sessionClient
91+
idleSessions *sessionPool
92+
logger *log.Logger
93+
qo QueryOptions
94+
ro ReadOptions
95+
ao []ApplyOption
96+
txo TransactionOptions
97+
ct *commonTags
98+
disableRouteToLeader bool
9499
}
95100

96101
// DatabaseName returns the full name of a database, e.g.,
@@ -147,24 +152,33 @@ type ClientConfig struct {
147152
// database by this client.
148153
DatabaseRole string
149154

155+
// DisableRouteToLeader specifies if all the requests of type read-write and PDML
156+
// need to be routed to the leader region.
157+
//
158+
// Default: false
159+
DisableRouteToLeader bool
160+
150161
// Logger is the logger to use for this client. If it is nil, all logging
151162
// will be directed to the standard logger.
152163
Logger *log.Logger
153164
}
154165

155-
func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context {
166+
func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD, disableRouteToLeader bool) context.Context {
156167
existing, ok := metadata.FromOutgoingContext(ctx)
157168
if ok {
158169
md = metadata.Join(existing, md)
159170
}
171+
if !disableRouteToLeader {
172+
md = metadata.Join(md, metadata.Pairs(routeToLeaderHeader, "true"))
173+
}
160174
return metadata.NewOutgoingContext(ctx, md)
161175
}
162176

163177
// NewClient creates a client to a database. A valid database name has the
164178
// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses
165179
// a default configuration.
166180
func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) {
167-
return NewClientWithConfig(ctx, database, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig}, opts...)
181+
return NewClientWithConfig(ctx, database, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig, DisableRouteToLeader: false}, opts...)
168182
}
169183

170184
// NewClientWithConfig creates a client to a database. A valid database name has
@@ -224,7 +238,7 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
224238
config.incStep = DefaultSessionPoolConfig.incStep
225239
}
226240
// Create a session client.
227-
sc := newSessionClient(pool, database, config.UserAgent, sessionLabels, config.DatabaseRole, metadata.Pairs(resourcePrefixHeader, database), config.Logger, config.CallOptions)
241+
sc := newSessionClient(pool, database, config.UserAgent, sessionLabels, config.DatabaseRole, config.DisableRouteToLeader, metadata.Pairs(resourcePrefixHeader, database), config.Logger, config.CallOptions)
228242
// Create a session pool.
229243
config.SessionPoolConfig.sessionLabels = sessionLabels
230244
sp, err := newSessionPool(sc, config.SessionPoolConfig)
@@ -233,14 +247,15 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
233247
return nil, err
234248
}
235249
c = &Client{
236-
sc: sc,
237-
idleSessions: sp,
238-
logger: config.Logger,
239-
qo: getQueryOptions(config.QueryOptions),
240-
ro: config.ReadOptions,
241-
ao: config.ApplyOptions,
242-
txo: config.TransactionOptions,
243-
ct: getCommonTags(sc),
250+
sc: sc,
251+
idleSessions: sp,
252+
logger: config.Logger,
253+
qo: getQueryOptions(config.QueryOptions),
254+
ro: config.ReadOptions,
255+
ao: config.ApplyOptions,
256+
txo: config.TransactionOptions,
257+
ct: getCommonTags(sc),
258+
disableRouteToLeader: config.DisableRouteToLeader,
244259
}
245260
return c, nil
246261
}
@@ -303,6 +318,7 @@ func (c *Client) Single() *ReadOnlyTransaction {
303318
t.txReadOnly.txReadEnv = t
304319
t.txReadOnly.qo = c.qo
305320
t.txReadOnly.ro = c.ro
321+
t.txReadOnly.disableRouteToLeader = true
306322
t.txReadOnly.replaceSessionFunc = func(ctx context.Context) error {
307323
if t.sh == nil {
308324
return spannerErrorf(codes.InvalidArgument, "missing session handle on transaction")
@@ -340,6 +356,7 @@ func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
340356
t.txReadOnly.txReadEnv = t
341357
t.txReadOnly.qo = c.qo
342358
t.txReadOnly.ro = c.ro
359+
t.txReadOnly.disableRouteToLeader = true
343360
t.ct = c.ct
344361
return t
345362
}
@@ -372,7 +389,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
372389
sh = &sessionHandle{session: s}
373390

374391
// Begin transaction.
375-
res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
392+
res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata(), true), &sppb.BeginTransactionRequest{
376393
Session: sh.getID(),
377394
Options: &sppb.TransactionOptions{
378395
Mode: &sppb.TransactionOptions_ReadOnly_{
@@ -405,6 +422,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
405422
t.txReadOnly.txReadEnv = t
406423
t.txReadOnly.qo = c.qo
407424
t.txReadOnly.ro = c.ro
425+
t.txReadOnly.disableRouteToLeader = true
408426
t.ct = c.ct
409427
return t, nil
410428
}
@@ -434,6 +452,7 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)
434452
t.txReadOnly.txReadEnv = t
435453
t.txReadOnly.qo = c.qo
436454
t.txReadOnly.ro = c.ro
455+
t.txReadOnly.disableRouteToLeader = true
437456
t.ct = c.ct
438457
return t
439458
}
@@ -527,6 +546,7 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
527546
t.txReadOnly.txReadEnv = t
528547
t.txReadOnly.qo = c.qo
529548
t.txReadOnly.ro = c.ro
549+
t.txReadOnly.disableRouteToLeader = c.disableRouteToLeader
530550
t.txOpts = c.txo.merge(options)
531551
t.ct = c.ct
532552

@@ -607,7 +627,7 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption)
607627
}, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag})
608628
return resp.CommitTs, err
609629
}
610-
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag}
630+
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader}
611631
return t.applyAtLeastOnce(ctx, ms...)
612632
}
613633

spanner/internal/testutil/inmem_spanner_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ func (s *StatementResult) convertUpdateCountToResultSet(exact bool) *spannerpb.R
198198
return rs
199199
}
200200

201-
func (s *StatementResult) getResultSetWithTransactionSet(selector *spannerpb.TransactionSelector, tx []byte) *StatementResult {
201+
func (s StatementResult) getResultSetWithTransactionSet(selector *spannerpb.TransactionSelector, tx []byte) *StatementResult {
202202
res := &StatementResult{
203203
Type: s.Type,
204204
Err: s.Err,

spanner/pdml.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
8080
// Execute the PDML and retry if the transaction is aborted.
8181
executePdmlWithRetry := func(ctx context.Context) (int64, error) {
8282
for {
83-
count, err := executePdml(ctx, sh, req)
83+
count, err := executePdml(contextWithOutgoingMetadata(ctx, sh.getMetadata(), c.disableRouteToLeader), sh, req)
8484
if err == nil {
8585
return count, nil
8686
}
@@ -105,7 +105,7 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
105105
func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest) (count int64, err error) {
106106
var md metadata.MD
107107
// Begin transaction.
108-
res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
108+
res, err := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{
109109
Session: sh.getID(),
110110
Options: &sppb.TransactionOptions{
111111
Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}},
@@ -118,7 +118,7 @@ func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlReq
118118
req.Transaction = &sppb.TransactionSelector{
119119
Selector: &sppb.TransactionSelector_Id{Id: res.Id},
120120
}
121-
resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md)))
121+
resultSet, err := sh.getClient().ExecuteSql(ctx, req, gax.WithGRPCOptions(grpc.Header(&md)))
122122
if getGFELatencyMetricsFlag() && md != nil && sh.session.pool != nil {
123123
err := captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "executePdml_ExecuteSql")
124124
if err != nil {

spanner/session.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func (s *session) ping() error {
236236
defer span.End()
237237

238238
// s.getID is safe even when s is invalid.
239-
_, err := s.client.ExecuteSql(contextWithOutgoingMetadata(ctx, s.md), &sppb.ExecuteSqlRequest{
239+
_, err := s.client.ExecuteSql(contextWithOutgoingMetadata(ctx, s.md, true), &sppb.ExecuteSqlRequest{
240240
Session: s.getID(),
241241
Sql: "SELECT 1",
242242
})
@@ -352,7 +352,7 @@ func (s *session) destroyWithContext(ctx context.Context, isExpire bool) bool {
352352
func (s *session) delete(ctx context.Context) {
353353
// Ignore the error because even if we fail to explicitly destroy the
354354
// session, it will be eventually garbage collected by Cloud Spanner.
355-
err := s.client.DeleteSession(contextWithOutgoingMetadata(ctx, s.md), &sppb.DeleteSessionRequest{Name: s.getID()})
355+
err := s.client.DeleteSession(contextWithOutgoingMetadata(ctx, s.md, true), &sppb.DeleteSessionRequest{Name: s.getID()})
356356
// Do not log DeadlineExceeded errors when deleting sessions, as these do
357357
// not indicate anything the user can or should act upon.
358358
if err != nil && ErrCode(err) != codes.DeadlineExceeded {

spanner/sessionclient.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,9 @@ type sessionConsumer interface {
8585
// will ensure that the sessions that are created are evenly distributed over
8686
// all available channels.
8787
type sessionClient struct {
88-
mu sync.Mutex
89-
closed bool
88+
mu sync.Mutex
89+
closed bool
90+
disableRouteToLeader bool
9091

9192
connPool gtransport.ConnPool
9293
database string
@@ -101,18 +102,19 @@ type sessionClient struct {
101102
}
102103

103104
// newSessionClient creates a session client to use for a database.
104-
func newSessionClient(connPool gtransport.ConnPool, database, userAgent string, sessionLabels map[string]string, databaseRole string, md metadata.MD, logger *log.Logger, callOptions *vkit.CallOptions) *sessionClient {
105+
func newSessionClient(connPool gtransport.ConnPool, database, userAgent string, sessionLabels map[string]string, databaseRole string, disableRouteToLeader bool, md metadata.MD, logger *log.Logger, callOptions *vkit.CallOptions) *sessionClient {
105106
return &sessionClient{
106-
connPool: connPool,
107-
database: database,
108-
userAgent: userAgent,
109-
id: cidGen.nextID(database),
110-
sessionLabels: sessionLabels,
111-
databaseRole: databaseRole,
112-
md: md,
113-
batchTimeout: time.Minute,
114-
logger: logger,
115-
callOptions: callOptions,
107+
connPool: connPool,
108+
database: database,
109+
userAgent: userAgent,
110+
id: cidGen.nextID(database),
111+
sessionLabels: sessionLabels,
112+
databaseRole: databaseRole,
113+
disableRouteToLeader: disableRouteToLeader,
114+
md: md,
115+
batchTimeout: time.Minute,
116+
logger: logger,
117+
callOptions: callOptions,
116118
}
117119
}
118120

@@ -136,9 +138,9 @@ func (sc *sessionClient) createSession(ctx context.Context) (*session, error) {
136138
if err != nil {
137139
return nil, err
138140
}
139-
ctx = contextWithOutgoingMetadata(ctx, sc.md)
141+
140142
var md metadata.MD
141-
sid, err := client.CreateSession(ctx, &sppb.CreateSessionRequest{
143+
sid, err := client.CreateSession(contextWithOutgoingMetadata(ctx, sc.md, sc.disableRouteToLeader), &sppb.CreateSessionRequest{
142144
Database: sc.database,
143145
Session: &sppb.Session{Labels: sc.sessionLabels, CreatorRole: sc.databaseRole},
144146
}, gax.WithGRPCOptions(grpc.Header(&md)))
@@ -237,8 +239,6 @@ func (sc *sessionClient) batchCreateSessions(createSessionCount int32, distribut
237239
func (sc *sessionClient) executeBatchCreateSessions(client *vkit.Client, createCount int32, labels map[string]string, md metadata.MD, consumer sessionConsumer) {
238240
ctx, cancel := context.WithTimeout(context.Background(), sc.batchTimeout)
239241
defer cancel()
240-
ctx = contextWithOutgoingMetadata(ctx, sc.md)
241-
242242
ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchCreateSessions")
243243
defer func() { trace.EndSpan(ctx, nil) }()
244244
trace.TracePrintf(ctx, nil, "Creating a batch of %d sessions", createCount)
@@ -259,7 +259,7 @@ func (sc *sessionClient) executeBatchCreateSessions(client *vkit.Client, createC
259259
break
260260
}
261261
var mdForGFELatency metadata.MD
262-
response, err := client.BatchCreateSessions(ctx, &sppb.BatchCreateSessionsRequest{
262+
response, err := client.BatchCreateSessions(contextWithOutgoingMetadata(ctx, sc.md, sc.disableRouteToLeader), &sppb.BatchCreateSessionsRequest{
263263
SessionCount: remainingCreateCount,
264264
Database: sc.database,
265265
SessionTemplate: &sppb.Session{Labels: labels, CreatorRole: sc.databaseRole},

0 commit comments

Comments
 (0)