Skip to content

Commit f9b6e88

Browse files
fix(spanner): Fix blind retry for ResourceExhausted (#12523)
* fix(spanner): Fix blind retry for ResourceExhausted * Fix indentation issue in retry.go * Fix comments
1 parent baabc30 commit f9b6e88

File tree

2 files changed

+133
-2
lines changed

2 files changed

+133
-2
lines changed

spanner/read_test.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@ import (
3030
"github.com/googleapis/gax-go/v2"
3131
"go.opentelemetry.io/otel/metric/noop"
3232
"google.golang.org/api/iterator"
33+
"google.golang.org/genproto/googleapis/rpc/errdetails"
3334
"google.golang.org/grpc/codes"
3435
"google.golang.org/grpc/status"
3536
"google.golang.org/protobuf/proto"
37+
"google.golang.org/protobuf/types/known/durationpb"
3638
proto3 "google.golang.org/protobuf/types/known/structpb"
3739
structpb "google.golang.org/protobuf/types/known/structpb"
3840
)
@@ -1552,6 +1554,127 @@ func TestGrpcReconnect(t *testing.T) {
15521554
}
15531555
}
15541556

1557+
func TestRetryResourceExhaustedWithoutRetryInfo(t *testing.T) {
1558+
restore := setMaxBytesBetweenResumeTokens()
1559+
defer restore()
1560+
1561+
server, c, teardown := setupMockedTestServer(t)
1562+
defer teardown()
1563+
mc, err := c.sc.nextClient()
1564+
if err != nil {
1565+
t.Fatalf("failed to create a grpc client")
1566+
}
1567+
1568+
session, err := createSession(mc)
1569+
if err != nil {
1570+
t.Fatalf("failed to create a session")
1571+
}
1572+
1573+
// Simulate an ResourceExhausted error to interrupt the stream of PartialResultSet
1574+
// in order to test the grpc retrying mechanism.
1575+
server.TestSpanner.AddPartialResultSetError(
1576+
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
1577+
PartialResultSetExecutionTime{
1578+
ResumeToken: EncodeResumeToken(2),
1579+
Err: status.Errorf(codes.ResourceExhausted, "server is unavailable"),
1580+
},
1581+
)
1582+
1583+
// The retry is counted from the second call.
1584+
r := -1
1585+
// Establish a stream to mock cloud spanner server.
1586+
iter := stream(context.Background(), nil, c.metricsTracerFactory,
1587+
func(ct context.Context, resumeToken []byte, opts ...gax.CallOption) (streamingReceiver, error) {
1588+
r++
1589+
return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{
1590+
Session: session.Name,
1591+
Sql: SelectSingerIDAlbumIDAlbumTitleFromAlbums,
1592+
ResumeToken: resumeToken,
1593+
}, opts...)
1594+
1595+
},
1596+
nil,
1597+
func(error) {}, mc.(*grpcSpannerClient))
1598+
defer iter.Stop()
1599+
for {
1600+
_, err := iter.Next()
1601+
if err == iterator.Done {
1602+
err = nil
1603+
break
1604+
}
1605+
if err != nil {
1606+
break
1607+
}
1608+
}
1609+
if r != 0 {
1610+
t.Errorf("retry count = %v, want 0", r)
1611+
}
1612+
}
1613+
1614+
// Verify that streaming query get retried upon ResourceExhausted real gRPC server
1615+
// transport failures.
1616+
func TestRetryResourceExhaustedWithRetryInfo(t *testing.T) {
1617+
restore := setMaxBytesBetweenResumeTokens()
1618+
defer restore()
1619+
1620+
server, c, teardown := setupMockedTestServer(t)
1621+
defer teardown()
1622+
mc, err := c.sc.nextClient()
1623+
if err != nil {
1624+
t.Fatalf("failed to create a grpc client")
1625+
}
1626+
1627+
session, err := createSession(mc)
1628+
if err != nil {
1629+
t.Fatalf("failed to create a session")
1630+
}
1631+
1632+
// Simulate an ResourceExhausted error to interrupt the stream of PartialResultSet
1633+
// in order to test the grpc retrying mechanism.
1634+
st := status.New(codes.ResourceExhausted, "server is unavailable")
1635+
retry := &errdetails.RetryInfo{
1636+
RetryDelay: durationpb.New(time.Nanosecond),
1637+
}
1638+
st, _ = st.WithDetails(retry)
1639+
server.TestSpanner.AddPartialResultSetError(
1640+
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
1641+
PartialResultSetExecutionTime{
1642+
ResumeToken: EncodeResumeToken(2),
1643+
Err: st.Err(),
1644+
},
1645+
)
1646+
1647+
// The retry is counted from the second call.
1648+
r := -1
1649+
// Establish a stream to mock cloud spanner server.
1650+
iter := stream(context.Background(), nil, c.metricsTracerFactory,
1651+
func(ct context.Context, resumeToken []byte, opts ...gax.CallOption) (streamingReceiver, error) {
1652+
r++
1653+
return mc.ExecuteStreamingSql(ct, &sppb.ExecuteSqlRequest{
1654+
Session: session.Name,
1655+
Sql: SelectSingerIDAlbumIDAlbumTitleFromAlbums,
1656+
ResumeToken: resumeToken,
1657+
}, opts...)
1658+
1659+
},
1660+
nil,
1661+
func(error) {}, mc.(*grpcSpannerClient))
1662+
defer iter.Stop()
1663+
for {
1664+
_, err := iter.Next()
1665+
if err == iterator.Done {
1666+
err = nil
1667+
break
1668+
}
1669+
if err != nil {
1670+
break
1671+
}
1672+
}
1673+
if r != 1 {
1674+
t.Errorf("retry count = %v, want 1", r)
1675+
}
1676+
}
1677+
15551678
// Test cancel/timeout for client operations.
15561679
func TestCancelTimeout(t *testing.T) {
15571680
restore := setMaxBytesBetweenResumeTokens()

spanner/retry.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ func onCodes(bo gax.Backoff, cc ...codes.Code) gax.Retryer {
5959
// Retry returns the retry delay returned by Cloud Spanner if that is present.
6060
// Otherwise it returns the retry delay calculated by the generic gax Retryer.
6161
func (r *spannerRetryer) Retry(err error) (time.Duration, bool) {
62-
if status.Code(err) == codes.Internal &&
62+
errCode := status.Code(err)
63+
if errCode == codes.Internal &&
6364
!strings.Contains(err.Error(), "stream terminated by RST_STREAM") &&
6465
// See b/25451313.
6566
!strings.Contains(err.Error(), "HTTP/2 error code: INTERNAL_ERROR") &&
@@ -74,7 +75,14 @@ func (r *spannerRetryer) Retry(err error) (time.Duration, bool) {
7475
if !shouldRetry {
7576
return 0, false
7677
}
77-
if serverDelay, hasServerDelay := ExtractRetryDelay(err); hasServerDelay {
78+
79+
serverDelay, hasServerDelay := ExtractRetryDelay(err)
80+
// Retry ResourceExhausted error only if there's a server delay in the trailer
81+
if errCode == codes.ResourceExhausted && (!hasServerDelay || serverDelay <= 0) {
82+
return 0, false
83+
}
84+
85+
if hasServerDelay {
7886
delay = serverDelay
7987
}
8088
return delay, true

0 commit comments

Comments
 (0)