Skip to content

Commit e2e80bf

Browse files
committed
kgo: clear controller/coordinator caches on failed dials
If we are repeatedly unable to dial the controller or coordinator, it is possible that the broker is just gone. We need to clear our cache internally so that we refresh the cache and pick up a new controller / coordinator. We retry 3 times just to handle temporary dial errors. Closes #239.
1 parent b845981 commit e2e80bf

File tree

1 file changed

+49
-17
lines changed

1 file changed

+49
-17
lines changed

pkg/kgo/client.go

Lines changed: 49 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -810,11 +810,27 @@ type retriable struct {
810810
// that can fail / do not need to retry forever.
811811
limitRetries int
812812

813-
// parseRetryErr, if non-nil, can parse a retriable error out of the
814-
// response and return it. This error is *not* returned from the
815-
// request if the req cannot be retried due to timeout or retry limits,
816-
// but it *can* allow a retry if neither limit is hit yet.
817-
parseRetryErr func(kmsg.Response) error
813+
// parseRetryErr, if non-nil, can delete stale cached brokers. We do
814+
// *not* return the error from this function to the caller, but we do
815+
// use it to potentially retry. It is not necessary, but also not
816+
// harmful, to return the input error.
817+
parseRetryErr func(kmsg.Response, error) error
818+
}
819+
820+
type failDial struct{ fails int8 }
821+
822+
// The controller and group/txn coordinators are cached. If dialing the broker
823+
// repeatedly fails, we need to forget our cache to force a re-load: the broker
824+
// may have completely died.
825+
func (d *failDial) isRepeatedDialFail(err error) bool {
826+
if isDialErr(err) {
827+
d.fails++
828+
if d.fails == 3 {
829+
d.fails = 0
830+
return true
831+
}
832+
}
833+
return false
818834
}
819835

820836
func (r *retriable) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) {
@@ -831,8 +847,8 @@ start:
831847
var retryErr error
832848
if err == nil {
833849
resp, err = r.last.waitResp(ctx, req)
834-
if err == nil && r.parseRetryErr != nil {
835-
retryErr = r.parseRetryErr(resp)
850+
if r.parseRetryErr != nil {
851+
retryErr = r.parseRetryErr(resp, err)
836852
}
837853
}
838854

@@ -1098,7 +1114,6 @@ func (cl *Client) controller(ctx context.Context) (*broker, error) {
10981114
func (cl *Client) forgetControllerID(id int32) {
10991115
cl.controllerIDMu.Lock()
11001116
defer cl.controllerIDMu.Unlock()
1101-
11021117
if cl.controllerID == id {
11031118
cl.controllerID = unknownControllerID
11041119
}
@@ -1288,18 +1303,21 @@ func (cl *Client) maybeDeleteStaleCoordinator(name string, typ int8, err error)
12881303
case errors.Is(err, kerr.CoordinatorNotAvailable),
12891304
errors.Is(err, kerr.CoordinatorLoadInProgress),
12901305
errors.Is(err, kerr.NotCoordinator):
1291-
1292-
cl.coordinatorsMu.Lock()
1293-
delete(cl.coordinators, coordinatorKey{
1294-
name: name,
1295-
typ: typ,
1296-
})
1297-
cl.coordinatorsMu.Unlock()
1306+
cl.deleteStaleCoordinator(name, typ)
12981307
return true
12991308
}
13001309
return false
13011310
}
13021311

1312+
func (cl *Client) deleteStaleCoordinator(name string, typ int8) {
1313+
cl.coordinatorsMu.Lock()
1314+
defer cl.coordinatorsMu.Unlock()
1315+
delete(cl.coordinators, coordinatorKey{
1316+
name: name,
1317+
typ: typ,
1318+
})
1319+
}
1320+
13031321
type brokerOrErr struct {
13041322
b *broker
13051323
err error
@@ -1325,7 +1343,14 @@ func (cl *Client) handleAdminReq(ctx context.Context, req kmsg.Request) Response
13251343
cl.maybeDeleteMappedMetadata(topics...)
13261344
}
13271345

1328-
r.parseRetryErr = func(resp kmsg.Response) error {
1346+
var d failDial
1347+
r.parseRetryErr = func(resp kmsg.Response, err error) error {
1348+
if err != nil {
1349+
if d.isRepeatedDialFail(err) {
1350+
cl.forgetControllerID(r.last.meta.NodeID)
1351+
}
1352+
return err
1353+
}
13291354
var code int16
13301355
switch t := resp.(type) {
13311356
case *kmsg.CreateTopicsResponse:
@@ -1455,7 +1480,14 @@ func (cl *Client) handleReqWithCoordinator(
14551480
req kmsg.Request,
14561481
) (*broker, kmsg.Response, error) {
14571482
r := cl.retriableBrokerFn(coordinator)
1458-
r.parseRetryErr = func(resp kmsg.Response) error {
1483+
var d failDial
1484+
r.parseRetryErr = func(resp kmsg.Response, err error) error {
1485+
if err != nil {
1486+
if d.isRepeatedDialFail(err) {
1487+
cl.deleteStaleCoordinator(name, typ)
1488+
}
1489+
return err
1490+
}
14591491
var code int16
14601492
switch t := resp.(type) {
14611493
// TXN

0 commit comments

Comments
 (0)