Skip to content

Commit 32ac27f

Browse files
committed
kgo: ensure assignPartitions is locked when pausing topics/partitions
Also adds a runtime panic if this is ever unlocked, since this is not the first time this bug has occurred and it is very hard to test against -- while a runtime panic is easy to catch in tests.
1 parent 6079141 commit 32ac27f

File tree

1 file changed

+14
-2
lines changed

1 file changed

+14
-2
lines changed

pkg/kgo/consumer.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,11 @@ func (cl *Client) PauseFetchTopics(topics ...string) []string {
562562

563563
c.pausedMu.Lock()
564564
defer c.pausedMu.Unlock()
565-
defer c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch topics %v", topics))
565+
defer func() {
566+
c.mu.Lock()
567+
defer c.mu.Unlock()
568+
c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch topics %v", topics))
569+
}()
566570

567571
paused := c.clonePaused()
568572
paused.addTopics(topics...)
@@ -590,7 +594,11 @@ func (cl *Client) PauseFetchPartitions(topicPartitions map[string][]int32) map[s
590594

591595
c.pausedMu.Lock()
592596
defer c.pausedMu.Unlock()
593-
defer c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch partitions %v", topicPartitions))
597+
defer func() {
598+
c.mu.Lock()
599+
defer c.mu.Unlock()
600+
c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch partitions %v", topicPartitions))
601+
}()
594602

595603
paused := c.clonePaused()
596604
paused.addPartitions(topicPartitions)
@@ -922,6 +930,10 @@ func (f fmtAssignment) String() string {
922930
// assignPartitions, called under the consumer's mu, is used to set new
923931
// cursors or add to the existing cursors.
924932
func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how assignHow, tps *topicsPartitions, why string) {
933+
if c.mu.TryLock() {
934+
panic("assignPartitions called without holding the consumer lock, this is a bug in franz-go, please open an issue at github.com/twmb/franz-go")
935+
}
936+
925937
// The internal code can avoid giving an assign reason in cases where
926938
// the caller logs itself immediately before assigning. We only log if
927939
// there is a reason.

0 commit comments

Comments
 (0)