Skip to content

Commit 8c785fa

Browse files
committed
kgo: fix race between client closing and purging
Purging topics can actually restart consuming. This can race with killing fetch sessions while closing. Purging internally calls assignPartitions with the consumer topics. In general, we need to prevent anything resuming consuming no matter what, so we add a bool field to the client that forcibly prevents consuming.
1 parent ee392e1 commit 8c785fa

File tree

4 files changed

+20
-12
lines changed

4 files changed

+20
-12
lines changed

pkg/kgo/client.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -947,18 +947,19 @@ func (cl *Client) Close() {
947947
})
948948

949949
c := &cl.consumer
950+
c.kill.Store(true)
950951
if c.g != nil {
951952
cl.LeaveGroup()
952953
} else if c.d != nil {
953-
c.mu.Lock() // lock for assign
954-
c.assignPartitions(nil, assignInvalidateAll, noTopicsPartitions, "") // we do not use a log message when not in a group
954+
c.mu.Lock() // lock for assign
955+
c.assignPartitions(nil, assignInvalidateAll, nil, "") // we do not use a log message when not in a group
955956
c.mu.Unlock()
956957
}
957958

958959
// After the above, consumers cannot consume anymore. LeaveGroup
959-
// internally assigns noTopicsPartitions, which uses noConsumerSession,
960-
// which prevents loopFetch from starting. Assigning also waits for the
961-
// prior session to be complete, meaning loopFetch cannot be running.
960+
// internally assigns nil, which uses noConsumerSession, which prevents
961+
// loopFetch from starting. Assigning also waits for the prior session
962+
// to be complete, meaning loopFetch cannot be running.
962963

963964
sessCloseCtx, sessCloseCancel := context.WithTimeout(cl.ctx, time.Second)
964965
var wg sync.WaitGroup

pkg/kgo/consumer.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ type consumer struct {
196196
sessionChangeMu sync.Mutex
197197

198198
session atomic.Value // *consumerSession
199+
kill atomic.Bool
199200

200201
usingCursors usedCursors
201202

@@ -927,8 +928,13 @@ func (f fmtAssignment) String() string {
927928
return sb.String()
928929
}
929930

930-
// assignPartitions, called under the consumer's mu, is used to set new
931-
// cursors or add to the existing cursors.
931+
// assignPartitions, called under the consumer's mu, is used to set new cursors
932+
// or add to the existing cursors.
933+
//
934+
// We do not need to pass tps when we are bumping the session or when we are
935+
// invalidating all. All other cases, we want the tps -- the logic below does
936+
// not fully differentiate needing to start a new session vs. just reusing the
937+
// old (third if case below)
932938
func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how assignHow, tps *topicsPartitions, why string) {
933939
if c.mu.TryLock() {
934940
c.mu.Unlock()
@@ -1058,7 +1064,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
10581064

10591065
// This assignment could contain nothing (for the purposes of
10601066
// invalidating active fetches), so we only do this if needed.
1061-
if len(assignments) == 0 || how == assignInvalidateMatching || how == assignPurgeMatching || how == assignSetMatching || how == assignBumpSession {
1067+
if len(assignments) == 0 || how != assignWithoutInvalidating {
10621068
return
10631069
}
10641070

@@ -1565,7 +1571,7 @@ func (c *consumer) stopSession() (listOrEpochLoads, *topicsPartitions) {
15651571
session := c.loadSession()
15661572

15671573
if session == noConsumerSession {
1568-
return listOrEpochLoads{}, noTopicsPartitions // we had no session
1574+
return listOrEpochLoads{}, nil // we had no session
15691575
}
15701576

15711577
// Before storing noConsumerSession, cancel our old. This pairs
@@ -1621,6 +1627,9 @@ func (c *consumer) stopSession() (listOrEpochLoads, *topicsPartitions) {
16211627
// 1 worker allows for initialization work to prevent the session from being
16221628
// immediately stopped.
16231629
func (c *consumer) startNewSession(tps *topicsPartitions) *consumerSession {
1630+
if c.kill.Load() {
1631+
tps = nil
1632+
}
16241633
session := c.newConsumerSession(tps)
16251634
c.session.Store(session)
16261635

pkg/kgo/consumer_group.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func (cl *Client) LeaveGroup() {
174174

175175
c.waitAndAddRebalance()
176176
c.mu.Lock() // lock for assign
177-
c.assignPartitions(nil, assignInvalidateAll, noTopicsPartitions, "invalidating all assignments in LeaveGroup")
177+
c.assignPartitions(nil, assignInvalidateAll, nil, "invalidating all assignments in LeaveGroup")
178178
wait := c.g.leave()
179179
c.mu.Unlock()
180180
c.unaddRebalance()

pkg/kgo/topics_and_partitions.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,6 @@ type topicPartitions struct {
250250

251251
func (t *topicPartitions) load() *topicPartitionsData { return t.v.Load().(*topicPartitionsData) }
252252

253-
var noTopicsPartitions = newTopicsPartitions()
254-
255253
func newTopicsPartitions() *topicsPartitions {
256254
var t topicsPartitions
257255
t.v.Store(make(topicsPartitionsData))

0 commit comments

Comments
 (0)