Skip to content

Commit 8bc506e

Browse files
authored
chore(spanner): add routing hints and cache updates for begin/commit (#14252)
## Summary This change extends key-aware routing to include mutation-based `BeginTransaction` and `Commit` request payloads, and teaches the response path to consume `CacheUpdate` from `Transaction` and `CommitResponse`. ## What changed - Populate `RoutingHint` on `BeginTransaction` when a `mutationKey` is present - Populate `RoutingHint` on `Commit` from the first mutation in the request - Keep existing transaction-affinity routing for `Commit`/`Rollback` - Apply `CacheUpdate` from: - `Transaction` - `CommitResponse` - Add focused `KeyAwareChannel` tests for the new request/response behavior
1 parent aa2f50e commit 8bc506e

10 files changed

+632
-27
lines changed

spanner/channel_finder.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,15 +100,31 @@ func (f *channelFinder) findServerBeginTransaction(ctx context.Context, req *spp
100100
if req == nil || req.GetMutationKey() == nil {
101101
return nil
102102
}
103-
target := f.recipeCache.mutationToTargetRange(req.GetMutationKey())
104-
if target == nil {
103+
return f.routeMutation(ctx, req.GetMutationKey(), preferLeaderFromTransactionOptions(req.GetOptions()), ensureBeginTransactionRoutingHint(req))
104+
}
105+
106+
func (f *channelFinder) fillCommitRoutingHint(ctx context.Context, req *sppb.CommitRequest) channelEndpoint {
107+
if req == nil {
108+
return nil
109+
}
110+
mutation := selectMutationProtoForRouting(req.GetMutations())
111+
if mutation == nil {
105112
return nil
106113
}
107-
hint := &sppb.RoutingHint{Key: append([]byte(nil), target.start...)}
108-
if len(target.limit) > 0 {
109-
hint.LimitKey = append([]byte(nil), target.limit...)
114+
return f.routeMutation(ctx, mutation, true, ensureCommitRoutingHint(req))
115+
}
116+
117+
func (f *channelFinder) routeMutation(ctx context.Context, mutation *sppb.Mutation, preferLeader bool, hint *sppb.RoutingHint) channelEndpoint {
118+
if mutation == nil || hint == nil {
119+
return nil
120+
}
121+
f.recipeCache.applySchemaGeneration(hint)
122+
target := f.recipeCache.mutationToTargetRange(mutation)
123+
if target == nil {
124+
return nil
110125
}
111-
return f.fillRoutingHint(ctx, preferLeaderFromTransactionOptions(req.GetOptions()), rangeModeCoveringSplit, &sppb.DirectedReadOptions{}, hint)
126+
f.recipeCache.applyTargetRange(hint, target)
127+
return f.fillRoutingHint(ctx, preferLeader, rangeModeCoveringSplit, &sppb.DirectedReadOptions{}, hint)
112128
}
113129

114130
func (f *channelFinder) fillRoutingHint(ctx context.Context, preferLeader bool, mode rangeMode, directedReadOptions *sppb.DirectedReadOptions, hint *sppb.RoutingHint) channelEndpoint {

spanner/client.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,9 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf
643643
var locationRouter *locationRouter
644644
if isExperimentalLocationAPIEnabled() {
645645
sc.baseClientOpts = endpointClientOpts
646+
if conn := pool.Conn(); conn != nil {
647+
sc.endpointAuthority = normalizeAuthorityTarget(conn.Target())
648+
}
646649
epCache := newEndpointClientCache(sc.createEndpointClient)
647650
locationRouter = newLocationRouter(epCache)
648651
}
@@ -694,6 +697,16 @@ Multiplexed session enabled: true
694697
return c, nil
695698
}
696699

700+
func normalizeAuthorityTarget(target string) string {
701+
if idx := strings.Index(target, ":///"); idx >= 0 {
702+
return strings.TrimSuffix(target[idx+4:], "/")
703+
}
704+
if idx := strings.Index(target, "://"); idx >= 0 {
705+
return strings.TrimSuffix(target[idx+3:], "/")
706+
}
707+
return strings.TrimSuffix(target, "/")
708+
}
709+
697710
// NewMultiEndpointClient is the same as NewMultiEndpointClientWithConfig with
698711
// the default client configuration.
699712
//

spanner/endpoint_authority_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
Copyright 2026 Google LLC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package spanner
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"testing"
23+
24+
"cloud.google.com/go/spanner/apiv1/spannerpb"
25+
"cloud.google.com/go/spanner/internal/testutil"
26+
"google.golang.org/grpc"
27+
"google.golang.org/grpc/metadata"
28+
)
29+
30+
type authorityInterceptor struct {
31+
unaryHeaders metadata.MD
32+
}
33+
34+
func (ai *authorityInterceptor) interceptUnary(
35+
ctx context.Context,
36+
req any,
37+
info *grpc.UnaryServerInfo,
38+
handler grpc.UnaryHandler,
39+
) (any, error) {
40+
md, ok := metadata.FromIncomingContext(ctx)
41+
if !ok {
42+
return nil, fmt.Errorf("missing metadata in unary request")
43+
}
44+
ai.unaryHeaders = md
45+
return handler(ctx, req)
46+
}
47+
48+
func TestCreateEndpointClientPreservesDefaultAuthority(t *testing.T) {
49+
interceptor := &authorityInterceptor{}
50+
server, clientOpts, teardown := testutil.NewMockedSpannerInMemTestServer(
51+
t, grpc.UnaryInterceptor(interceptor.interceptUnary),
52+
)
53+
defer teardown()
54+
55+
database := "projects/p/instances/i/databases/d"
56+
sc := newSessionClient(
57+
nil,
58+
database,
59+
"",
60+
nil,
61+
"",
62+
false,
63+
metadata.Pairs(resourcePrefixHeader, database),
64+
0,
65+
nil,
66+
nil,
67+
)
68+
sc.baseClientOpts = clientOpts
69+
sc.endpointAuthority = "spanner.spanner-ns:15000"
70+
sc.metricsTracerFactory = &builtinMetricsTracerFactory{}
71+
72+
client, err := sc.createEndpointClient(context.Background(), server.ServerAddress)
73+
if err != nil {
74+
t.Fatalf("createEndpointClient() failed: %v", err)
75+
}
76+
defer client.Close()
77+
78+
_, err = client.CreateSession(context.Background(), &spannerpb.CreateSessionRequest{
79+
Database: database,
80+
Session: &spannerpb.Session{},
81+
})
82+
if err != nil {
83+
t.Fatalf("CreateSession() failed: %v", err)
84+
}
85+
86+
if got := interceptor.unaryHeaders.Get(":authority"); len(got) != 1 || got[0] != sc.endpointAuthority {
87+
t.Fatalf("authority mismatch\ngot: %v\nwant: [%s]", got, sc.endpointAuthority)
88+
}
89+
}
90+
91+
func TestNormalizeAuthorityTarget(t *testing.T) {
92+
for _, tc := range []struct {
93+
name string
94+
target string
95+
want string
96+
}{
97+
{name: "plain", target: "spanner.googleapis.com:443", want: "spanner.googleapis.com:443"},
98+
{name: "dns", target: "dns:///spanner.googleapis.com:443", want: "spanner.googleapis.com:443"},
99+
{name: "passthrough", target: "passthrough:///10.0.0.1:15000", want: "10.0.0.1:15000"},
100+
{name: "google-c2p", target: "google-c2p:///spanner.googleapis.com", want: "spanner.googleapis.com"},
101+
{name: "https", target: "https://spanner.googleapis.com:443", want: "spanner.googleapis.com:443"},
102+
} {
103+
t.Run(tc.name, func(t *testing.T) {
104+
if got := normalizeAuthorityTarget(tc.target); got != tc.want {
105+
t.Fatalf("normalizeAuthorityTarget(%q) = %q, want %q", tc.target, got, tc.want)
106+
}
107+
})
108+
}
109+
}

spanner/key_recipe_cache.go

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -191,11 +191,7 @@ func (c *keyRecipeCache) computeReadKeys(req *sppb.ReadRequest) {
191191
if target == nil {
192192
return
193193
}
194-
hint.Key = append(hint.Key[:0], target.start...)
195-
hint.LimitKey = hint.LimitKey[:0]
196-
if len(target.limit) > 0 {
197-
hint.LimitKey = append(hint.LimitKey, target.limit...)
198-
}
194+
c.applyTargetRange(hint, target)
199195
}
200196

201197
func (c *keyRecipeCache) computeQueryKeys(req *sppb.ExecuteSqlRequest) {
@@ -229,11 +225,7 @@ func (c *keyRecipeCache) computeQueryKeys(req *sppb.ExecuteSqlRequest) {
229225
if target == nil {
230226
return
231227
}
232-
hint.Key = append(hint.Key[:0], target.start...)
233-
hint.LimitKey = hint.LimitKey[:0]
234-
if len(target.limit) > 0 {
235-
hint.LimitKey = append(hint.LimitKey, target.limit...)
236-
}
228+
c.applyTargetRange(hint, target)
237229
}
238230

239231
func (c *keyRecipeCache) mutationToTargetRange(mutation *sppb.Mutation) *targetRange {
@@ -253,6 +245,28 @@ func (c *keyRecipeCache) mutationToTargetRange(mutation *sppb.Mutation) *targetR
253245
return recipe.mutationToTargetRange(mutation)
254246
}
255247

248+
func (c *keyRecipeCache) applySchemaGeneration(hint *sppb.RoutingHint) {
249+
if hint == nil {
250+
return
251+
}
252+
c.mu.Lock()
253+
if len(c.schemaGeneration) > 0 {
254+
hint.SchemaGeneration = append([]byte(nil), c.schemaGeneration...)
255+
}
256+
c.mu.Unlock()
257+
}
258+
259+
func (c *keyRecipeCache) applyTargetRange(hint *sppb.RoutingHint, target *targetRange) {
260+
if hint == nil || target == nil {
261+
return
262+
}
263+
hint.Key = append(hint.Key[:0], target.start...)
264+
hint.LimitKey = hint.LimitKey[:0]
265+
if len(target.limit) > 0 {
266+
hint.LimitKey = append(hint.LimitKey, target.limit...)
267+
}
268+
}
269+
256270
func (c *keyRecipeCache) clear() {
257271
c.mu.Lock()
258272
defer c.mu.Unlock()
@@ -356,6 +370,20 @@ func ensureExecuteSQLRoutingHint(req *sppb.ExecuteSqlRequest) *sppb.RoutingHint
356370
return req.RoutingHint
357371
}
358372

373+
func ensureBeginTransactionRoutingHint(req *sppb.BeginTransactionRequest) *sppb.RoutingHint {
374+
if req.RoutingHint == nil {
375+
req.RoutingHint = &sppb.RoutingHint{}
376+
}
377+
return req.RoutingHint
378+
}
379+
380+
func ensureCommitRoutingHint(req *sppb.CommitRequest) *sppb.RoutingHint {
381+
if req.RoutingHint == nil {
382+
req.RoutingHint = &sppb.RoutingHint{}
383+
}
384+
return req.RoutingHint
385+
}
386+
359387
func hashString(h hash.Hash64, s string) {
360388
hashBytes(h, []byte(s))
361389
}

spanner/location_aware_client.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ func (c *locationAwareSpannerClient) BeginTransaction(ctx context.Context, req *
229229
if err != nil {
230230
return nil, err
231231
}
232+
c.router.observeTransaction(resp)
232233
if len(resp.GetId()) > 0 {
233234
if isReadOnly, readOnlyStrong := readOnlyBeginFromTransactionOptions(req.GetOptions()); isReadOnly {
234235
c.router.trackReadOnlyTransaction(string(resp.GetId()), readOnlyStrong)
@@ -242,8 +243,15 @@ func (c *locationAwareSpannerClient) BeginTransaction(ctx context.Context, req *
242243
// --- Affinity RPCs ---
243244

244245
func (c *locationAwareSpannerClient) Commit(ctx context.Context, req *spannerpb.CommitRequest, opts ...gax.CallOption) (*spannerpb.CommitResponse, error) {
245-
client := c.affinityClient(req.GetTransactionId())
246+
ep := c.router.prepareCommitRequest(ctx, req)
247+
if txID := req.GetTransactionId(); len(txID) > 0 {
248+
if affinityEndpoint := c.router.getTransactionAffinity(string(txID)); affinityEndpoint != nil {
249+
ep = affinityEndpoint
250+
}
251+
}
252+
client := c.clientForEndpoint(ep)
246253
resp, err := client.Commit(ctx, req, opts...)
254+
c.router.observeCommitResponse(resp)
247255
c.router.clearTransactionAffinity(string(req.GetTransactionId()))
248256
return resp, err
249257
}

0 commit comments

Comments
 (0)