@@ -353,6 +353,7 @@ type testClient struct {
353353 c * bigtable.Client // c stores the Bigtable client under test
354354 appProfileID string // appProfileID is currently unused
355355 perOperationTimeout * duration.Duration // perOperationTimeout sets a custom timeout for methods calls on this client
356+ isOpen bool // isOpen indicates whether this client is open for new requests
356357}
357358
358359// credentialsBundle implements credentials.Bundle interface
@@ -493,7 +494,19 @@ type goTestProxyServer struct {
493494 pb.UnimplementedCloudBigtableV2TestProxyServer
494495 clientsLock sync.RWMutex // clientsLock prevents simultaneous mutation of the clientIDs map
495496 clientIDs map [string ]* testClient // clientIDs has all of the bigtable.Client objects under test
497+ }
496498
499+ // client retrieves a testClient from the clientIDs map. You must lock clientsLock before calling
500+ // this method.
501+ func (s * goTestProxyServer ) client (clientID string ) (* testClient , bool ) {
502+ client , ok := s .clientIDs [clientID ]
503+ if ! ok {
504+ return nil , false
505+ }
506+ if ! client .isOpen {
507+ return nil , false
508+ }
509+ return client , true
497510}
498511
499512// CreateClient responds to the CreateClient RPC. This method adds a new client
@@ -536,11 +549,29 @@ func (s *goTestProxyServer) CreateClient(ctx context.Context, req *pb.CreateClie
536549 c : c ,
537550 appProfileID : req .AppProfileId ,
538551 perOperationTimeout : req .PerOperationTimeout ,
552+ isOpen : true ,
539553 }
540554
541555 return & pb.CreateClientResponse {}, nil
542556}
543557
558+ // CloseClient responds to the CloseClient RPC. This method closes an existing
559+ // client, making it inaccessible to new requests.
560+ func (s * goTestProxyServer ) CloseClient (ctx context.Context , req * pb.CloseClientRequest ) (* pb.CloseClientResponse , error ) {
561+ clientID := req .ClientId
562+ s .clientsLock .Lock ()
563+ defer s .clientsLock .Unlock ()
564+
565+ btc , exists := s .client (clientID )
566+ if ! exists {
567+ return nil , stat .Error (codes .InvalidArgument ,
568+ fmt .Sprintf ("%s: ClientID does not exist" , logLabel ))
569+ }
570+ btc .isOpen = false
571+
572+ return & pb.CloseClientResponse {}, nil
573+ }
574+
544575// RemoveClient responds to the RemoveClient RPC. This method removes an
545576// existing client from the goTestProxyServer
546577func (s * goTestProxyServer ) RemoveClient (ctx context.Context , req * pb.RemoveClientRequest ) (* pb.RemoveClientResponse , error ) {
@@ -549,13 +580,15 @@ func (s *goTestProxyServer) RemoveClient(ctx context.Context, req *pb.RemoveClie
549580 s .clientsLock .Lock ()
550581 defer s .clientsLock .Unlock ()
551582
583+ // RemoveClient can ignore whether the client accepts new requests
552584 btc , exists := s .clientIDs [clientID ]
553585 if ! exists {
554586 return nil , stat .Error (codes .InvalidArgument ,
555587 fmt .Sprintf ("%s: ClientID does not exist" , logLabel ))
556588 }
557589
558590 // this closes every ClientConn in the pool.
591+ btc .isOpen = false
559592 btc .c .Close ()
560593 delete (s .clientIDs , clientID )
561594
@@ -566,7 +599,7 @@ func (s *goTestProxyServer) RemoveClient(ctx context.Context, req *pb.RemoveClie
566599// data for a single row in the Table.
567600func (s * goTestProxyServer ) ReadRow (ctx context.Context , req * pb.ReadRowRequest ) (* pb.RowResult , error ) {
568601 s .clientsLock .RLock ()
569- btc , exists := s .clientIDs [ req .ClientId ]
602+ btc , exists := s .client ( req .ClientId )
570603 s .clientsLock .RUnlock ()
571604
572605 if ! exists {
@@ -612,7 +645,7 @@ func (s *goTestProxyServer) ReadRow(ctx context.Context, req *pb.ReadRowRequest)
612645// data for a set of rows, a range of rows, or the entire table.
613646func (s * goTestProxyServer ) ReadRows (ctx context.Context , req * pb.ReadRowsRequest ) (* pb.RowsResult , error ) {
614647 s .clientsLock .RLock ()
615- btc , exists := s .clientIDs [ req .ClientId ]
648+ btc , exists := s .client ( req .ClientId )
616649 s .clientsLock .RUnlock ()
617650
618651 if ! exists {
@@ -685,7 +718,7 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques
685718// changes (or deletions) to a single row in a table.
686719func (s * goTestProxyServer ) MutateRow (ctx context.Context , req * pb.MutateRowRequest ) (* pb.MutateRowResult , error ) {
687720 s .clientsLock .RLock ()
688- btc , exists := s .clientIDs [ req .ClientId ]
721+ btc , exists := s .client ( req .ClientId )
689722 s .clientsLock .RUnlock ()
690723
691724 if ! exists {
@@ -723,7 +756,7 @@ func (s *goTestProxyServer) MutateRow(ctx context.Context, req *pb.MutateRowRequ
723756// series of changes or deletions to multiple rows in a single call.
724757func (s * goTestProxyServer ) BulkMutateRows (ctx context.Context , req * pb.MutateRowsRequest ) (* pb.MutateRowsResult , error ) {
725758 s .clientsLock .RLock ()
726- btc , exists := s .clientIDs [ req .ClientId ]
759+ btc , exists := s .client ( req .ClientId )
727760 s .clientsLock .RUnlock ()
728761
729762 if ! exists {
@@ -791,7 +824,7 @@ func (s *goTestProxyServer) BulkMutateRows(ctx context.Context, req *pb.MutateRo
791824// one mutation if a condition is true and another mutation if it is false.
792825func (s * goTestProxyServer ) CheckAndMutateRow (ctx context.Context , req * pb.CheckAndMutateRowRequest ) (* pb.CheckAndMutateRowResult , error ) {
793826 s .clientsLock .RLock ()
794- btc , exists := s .clientIDs [ req .ClientId ]
827+ btc , exists := s .client ( req .ClientId )
795828 s .clientsLock .RUnlock ()
796829
797830 if ! exists {
@@ -848,7 +881,7 @@ func (s *goTestProxyServer) CheckAndMutateRow(ctx context.Context, req *pb.Check
848881// of the keys available in a table.
849882func (s * goTestProxyServer ) SampleRowKeys (ctx context.Context , req * pb.SampleRowKeysRequest ) (* pb.SampleRowKeysResult , error ) {
850883 s .clientsLock .RLock ()
851- btc , exists := s .clientIDs [ req .ClientId ]
884+ btc , exists := s .client ( req .ClientId )
852885 s .clientsLock .RUnlock ()
853886
854887 if ! exists {
@@ -894,7 +927,7 @@ func (s *goTestProxyServer) SampleRowKeys(ctx context.Context, req *pb.SampleRow
894927// applies a non-idempotent change to a row.
895928func (s * goTestProxyServer ) ReadModifyWriteRow (ctx context.Context , req * pb.ReadModifyWriteRowRequest ) (* pb.RowResult , error ) {
896929 s .clientsLock .RLock ()
897- btc , exists := s .clientIDs [ req .ClientId ]
930+ btc , exists := s .client ( req .ClientId )
898931 s .clientsLock .RUnlock ()
899932
900933 if ! exists {
0 commit comments