@@ -810,11 +810,27 @@ type retriable struct {
810810 // that can fail / do not need to retry forever.
811811 limitRetries int
812812
813- // parseRetryErr, if non-nil, can parse a retriable error out of the
814- // response and return it. This error is *not* returned from the
815- // request if the req cannot be retried due to timeout or retry limits,
816- // but it *can* allow a retry if neither limit is hit yet.
817- parseRetryErr func (kmsg.Response ) error
813+ // parseRetryErr, if non-nil, can delete stale cached brokers. We do
814+ // *not* return the error from this function to the caller, but we do
815+ // use it to potentially retry. It is not necessary, but also not
816+ // harmful, to return the input error.
817+ parseRetryErr func (kmsg.Response , error ) error
818+ }
819+
820+ type failDial struct { fails int8 }
821+
822+ // The controller and group/txn coordinators are cached. If dialing the broker
823+ // repeatedly fails, we need to forget our cache to force a re-load: the broker
824+ // may have completely died.
825+ func (d * failDial ) isRepeatedDialFail (err error ) bool {
826+ if isDialErr (err ) {
827+ d .fails ++
828+ if d .fails == 3 {
829+ d .fails = 0
830+ return true
831+ }
832+ }
833+ return false
818834}
819835
820836func (r * retriable ) Request (ctx context.Context , req kmsg.Request ) (kmsg.Response , error ) {
@@ -831,8 +847,8 @@ start:
831847 var retryErr error
832848 if err == nil {
833849 resp , err = r .last .waitResp (ctx , req )
834- if err == nil && r .parseRetryErr != nil {
835- retryErr = r .parseRetryErr (resp )
850+ if r .parseRetryErr != nil {
851+ retryErr = r .parseRetryErr (resp , err )
836852 }
837853 }
838854
@@ -1098,7 +1114,6 @@ func (cl *Client) controller(ctx context.Context) (*broker, error) {
10981114func (cl * Client ) forgetControllerID (id int32 ) {
10991115 cl .controllerIDMu .Lock ()
11001116 defer cl .controllerIDMu .Unlock ()
1101-
11021117 if cl .controllerID == id {
11031118 cl .controllerID = unknownControllerID
11041119 }
@@ -1288,18 +1303,21 @@ func (cl *Client) maybeDeleteStaleCoordinator(name string, typ int8, err error)
12881303 case errors .Is (err , kerr .CoordinatorNotAvailable ),
12891304 errors .Is (err , kerr .CoordinatorLoadInProgress ),
12901305 errors .Is (err , kerr .NotCoordinator ):
1291-
1292- cl .coordinatorsMu .Lock ()
1293- delete (cl .coordinators , coordinatorKey {
1294- name : name ,
1295- typ : typ ,
1296- })
1297- cl .coordinatorsMu .Unlock ()
1306+ cl .deleteStaleCoordinator (name , typ )
12981307 return true
12991308 }
13001309 return false
13011310}
13021311
1312+ func (cl * Client ) deleteStaleCoordinator (name string , typ int8 ) {
1313+ cl .coordinatorsMu .Lock ()
1314+ defer cl .coordinatorsMu .Unlock ()
1315+ delete (cl .coordinators , coordinatorKey {
1316+ name : name ,
1317+ typ : typ ,
1318+ })
1319+ }
1320+
13031321type brokerOrErr struct {
13041322 b * broker
13051323 err error
@@ -1325,7 +1343,14 @@ func (cl *Client) handleAdminReq(ctx context.Context, req kmsg.Request) Response
13251343 cl .maybeDeleteMappedMetadata (topics ... )
13261344 }
13271345
1328- r .parseRetryErr = func (resp kmsg.Response ) error {
1346+ var d failDial
1347+ r .parseRetryErr = func (resp kmsg.Response , err error ) error {
1348+ if err != nil {
1349+ if d .isRepeatedDialFail (err ) {
1350+ cl .forgetControllerID (r .last .meta .NodeID )
1351+ }
1352+ return err
1353+ }
13291354 var code int16
13301355 switch t := resp .(type ) {
13311356 case * kmsg.CreateTopicsResponse :
@@ -1455,7 +1480,14 @@ func (cl *Client) handleReqWithCoordinator(
14551480 req kmsg.Request ,
14561481) (* broker , kmsg.Response , error ) {
14571482 r := cl .retriableBrokerFn (coordinator )
1458- r .parseRetryErr = func (resp kmsg.Response ) error {
1483+ var d failDial
1484+ r .parseRetryErr = func (resp kmsg.Response , err error ) error {
1485+ if err != nil {
1486+ if d .isRepeatedDialFail (err ) {
1487+ cl .deleteStaleCoordinator (name , typ )
1488+ }
1489+ return err
1490+ }
14591491 var code int16
14601492 switch t := resp .(type ) {
14611493 // TXN
0 commit comments