@@ -30,9 +30,11 @@ import (
3030 "github.com/googleapis/gax-go/v2"
3131 "go.opentelemetry.io/otel/metric/noop"
3232 "google.golang.org/api/iterator"
33+ "google.golang.org/genproto/googleapis/rpc/errdetails"
3334 "google.golang.org/grpc/codes"
3435 "google.golang.org/grpc/status"
3536 "google.golang.org/protobuf/proto"
37+ "google.golang.org/protobuf/types/known/durationpb"
3638 proto3 "google.golang.org/protobuf/types/known/structpb"
3739 structpb "google.golang.org/protobuf/types/known/structpb"
3840)
@@ -1552,6 +1554,127 @@ func TestGrpcReconnect(t *testing.T) {
15521554 }
15531555}
15541556
1557+ func TestRetryResourceExhaustedWithoutRetryInfo (t * testing.T ) {
1558+ restore := setMaxBytesBetweenResumeTokens ()
1559+ defer restore ()
1560+
1561+ server , c , teardown := setupMockedTestServer (t )
1562+ defer teardown ()
1563+ mc , err := c .sc .nextClient ()
1564+ if err != nil {
1565+ t .Fatalf ("failed to create a grpc client" )
1566+ }
1567+
1568+ session , err := createSession (mc )
1569+ if err != nil {
1570+ t .Fatalf ("failed to create a session" )
1571+ }
1572+
1573+ // Simulate an ResourceExhausted error to interrupt the stream of PartialResultSet
1574+ // in order to test the grpc retrying mechanism.
1575+ server .TestSpanner .AddPartialResultSetError (
1576+ SelectSingerIDAlbumIDAlbumTitleFromAlbums ,
1577+ PartialResultSetExecutionTime {
1578+ ResumeToken : EncodeResumeToken (2 ),
1579+ Err : status .Errorf (codes .ResourceExhausted , "server is unavailable" ),
1580+ },
1581+ )
1582+
1583+ // The retry is counted from the second call.
1584+ r := - 1
1585+ // Establish a stream to mock cloud spanner server.
1586+ iter := stream (context .Background (), nil , c .metricsTracerFactory ,
1587+ func (ct context.Context , resumeToken []byte , opts ... gax.CallOption ) (streamingReceiver , error ) {
1588+ r ++
1589+ return mc .ExecuteStreamingSql (ct , & sppb.ExecuteSqlRequest {
1590+ Session : session .Name ,
1591+ Sql : SelectSingerIDAlbumIDAlbumTitleFromAlbums ,
1592+ ResumeToken : resumeToken ,
1593+ }, opts ... )
1594+
1595+ },
1596+ nil ,
1597+ func (error ) {}, mc .(* grpcSpannerClient ))
1598+ defer iter .Stop ()
1599+ for {
1600+ _ , err := iter .Next ()
1601+ if err == iterator .Done {
1602+ err = nil
1603+ break
1604+ }
1605+ if err != nil {
1606+ break
1607+ }
1608+ }
1609+ if r != 0 {
1610+ t .Errorf ("retry count = %v, want 0" , r )
1611+ }
1612+ }
1613+
1614+ // Verify that streaming query get retried upon ResourceExhausted real gRPC server
1615+ // transport failures.
1616+ func TestRetryResourceExhaustedWithRetryInfo (t * testing.T ) {
1617+ restore := setMaxBytesBetweenResumeTokens ()
1618+ defer restore ()
1619+
1620+ server , c , teardown := setupMockedTestServer (t )
1621+ defer teardown ()
1622+ mc , err := c .sc .nextClient ()
1623+ if err != nil {
1624+ t .Fatalf ("failed to create a grpc client" )
1625+ }
1626+
1627+ session , err := createSession (mc )
1628+ if err != nil {
1629+ t .Fatalf ("failed to create a session" )
1630+ }
1631+
1632+ // Simulate an ResourceExhausted error to interrupt the stream of PartialResultSet
1633+ // in order to test the grpc retrying mechanism.
1634+ st := status .New (codes .ResourceExhausted , "server is unavailable" )
1635+ retry := & errdetails.RetryInfo {
1636+ RetryDelay : durationpb .New (time .Nanosecond ),
1637+ }
1638+ st , _ = st .WithDetails (retry )
1639+ server .TestSpanner .AddPartialResultSetError (
1640+ SelectSingerIDAlbumIDAlbumTitleFromAlbums ,
1641+ PartialResultSetExecutionTime {
1642+ ResumeToken : EncodeResumeToken (2 ),
1643+ Err : st .Err (),
1644+ },
1645+ )
1646+
1647+ // The retry is counted from the second call.
1648+ r := - 1
1649+ // Establish a stream to mock cloud spanner server.
1650+ iter := stream (context .Background (), nil , c .metricsTracerFactory ,
1651+ func (ct context.Context , resumeToken []byte , opts ... gax.CallOption ) (streamingReceiver , error ) {
1652+ r ++
1653+ return mc .ExecuteStreamingSql (ct , & sppb.ExecuteSqlRequest {
1654+ Session : session .Name ,
1655+ Sql : SelectSingerIDAlbumIDAlbumTitleFromAlbums ,
1656+ ResumeToken : resumeToken ,
1657+ }, opts ... )
1658+
1659+ },
1660+ nil ,
1661+ func (error ) {}, mc .(* grpcSpannerClient ))
1662+ defer iter .Stop ()
1663+ for {
1664+ _ , err := iter .Next ()
1665+ if err == iterator .Done {
1666+ err = nil
1667+ break
1668+ }
1669+ if err != nil {
1670+ break
1671+ }
1672+ }
1673+ if r != 1 {
1674+ t .Errorf ("retry count = %v, want 1" , r )
1675+ }
1676+ }
1677+
15551678// Test cancel/timeout for client operations.
15561679func TestCancelTimeout (t * testing.T ) {
15571680 restore := setMaxBytesBetweenResumeTokens ()
0 commit comments