@@ -39,12 +39,12 @@ type Client struct {
3939 ctx context.Context
4040 ctxCancel func ()
4141
42- rng func () float64
42+ rng func (func ( * rand. Rand ))
4343
4444 brokersMu sync.RWMutex
4545 brokers []* broker // ordered by broker ID
4646 seeds atomic.Value // []*broker, seed brokers, also ordered by ID
47- anyBrokerIdx int32
47+ anyBrokerOrd [] int32 // shuffled brokers, for random ordering
4848 anySeedIdx int32
4949 stopBrokers bool // set to true on close to stop updateBrokers
5050
@@ -462,13 +462,13 @@ func NewClient(opts ...Opt) (*Client, error) {
462462 ctx : ctx ,
463463 ctxCancel : cancel ,
464464
465- rng : func () func () float64 {
465+ rng : func () func (func ( * rand. Rand )) {
466466 var mu sync.Mutex
467467 rng := rand .New (rand .NewSource (time .Now ().UnixNano ()))
468- return func () float64 {
468+ return func (fn func ( * rand. Rand )) {
469469 mu .Lock ()
470470 defer mu .Unlock ()
471- return rng . Float64 ( )
471+ fn ( rng )
472472 }
473473 }(),
474474
@@ -733,33 +733,45 @@ func (c *connTimeouter) timeouts(req kmsg.Request) (r, w time.Duration) {
733733 }
734734}
735735
736+ func (cl * Client ) reinitAnyBrokerOrd () {
737+ cl .anyBrokerOrd = append (cl .anyBrokerOrd [:0 ], make ([]int32 , len (cl .brokers ))... )
738+ for i := range cl .anyBrokerOrd {
739+ cl .anyBrokerOrd [i ] = int32 (i )
740+ }
741+ cl .rng (func (r * rand.Rand ) {
742+ r .Shuffle (len (cl .anyBrokerOrd ), func (i , j int ) {
743+ cl .anyBrokerOrd [i ], cl .anyBrokerOrd [j ] = cl .anyBrokerOrd [j ], cl .anyBrokerOrd [i ]
744+ })
745+ })
746+ }
747+
736748// broker returns a random broker from all brokers ever known.
737749func (cl * Client ) broker () * broker {
738- cl .brokersMu .Lock () // full lock needed for anyBrokerIdx below
750+ cl .brokersMu .Lock ()
739751 defer cl .brokersMu .Unlock ()
740752
741753 // Every time we loop through all discovered brokers, we issue one
742754 // request to the next seed. This ensures that if all discovered
743755 // brokers are down, we will *eventually* loop through seeds and
744756 // hopefully have a reachable seed.
745757 var b * broker
746- if len (cl .brokers ) > 0 && int (cl .anyBrokerIdx ) < len (cl .brokers ) {
747- cl .anyBrokerIdx %= int32 (len (cl .brokers ))
748- b = cl .brokers [cl .anyBrokerIdx ]
749- cl .anyBrokerIdx ++
750- } else {
751- seeds := cl .loadSeeds ()
752- cl .anySeedIdx %= int32 (len (seeds ))
753- b = seeds [cl .anySeedIdx ]
754- cl .anySeedIdx ++
755758
756- // If we have brokers, we ranged past discovered brokers.
757- // We now reset the anyBrokerIdx to begin ranging through
758- // discovered brokers again.
759- if len (cl .brokers ) > 0 {
760- cl .anyBrokerIdx = 0
761- }
759+ if len (cl .anyBrokerOrd ) > 0 {
760+ b = cl .brokers [cl .anyBrokerOrd [0 ]]
761+ cl .anyBrokerOrd = cl .anyBrokerOrd [1 :]
762+ return b
762763 }
764+
765+ seeds := cl .loadSeeds ()
766+ cl .anySeedIdx %= int32 (len (seeds ))
767+ b = seeds [cl .anySeedIdx ]
768+ cl .anySeedIdx ++
769+
770+ // If we have brokers, we ranged past discovered brokers.
771+ // We now reset the anyBrokerOrd to begin ranging through
772+ // discovered brokers again. If there are still no brokers,
773+ // this reinit will do nothing and we will keep looping seeds.
774+ cl .reinitAnyBrokerOrd ()
763775 return b
764776}
765777
@@ -946,6 +958,7 @@ func (cl *Client) updateBrokers(brokers []kmsg.MetadataResponseBroker) {
946958 }
947959
948960 cl .brokers = newBrokers
961+ cl .reinitAnyBrokerOrd ()
949962}
950963
951964// CloseAllowingRebalance allows rebalances, leaves any group, and closes all
0 commit comments