Skip to content

Commit c013050

Browse files
committed
kgo: avoid rare panic
Scenario is: * Metadata update is actively running and has stopped an active session, returning all topicPartitions that were actively in list/epoch. These list/epoch loads are stored in reloadOffsets. Metadata grabs the session change mutex. * Client.Close is now called, stores client.consumer.kill(true). The Close is blocked briefly because Close calls assignPartitions which tries to lock to stop the session. Close is now paused -- however, importantly, the consumer.kill atomic is set to true. * Metadata tries to start a new session. startNewSession returns noConsumerSession because consumer.kill is now true. * Metadata calls reloadOffsets.loadWithSession, which panics once the session tries to access the client variable c. This panic can only happen if all of the following are true: * Client.Close is being called * Metadata is updating * Metadata response is moving a partition from one broker to another * The timing is perfect The fix to this is to check in listOrEpoch if the consumerSession is noConsumerSession. If so, return early. Note that doOnMetadataUpdate, incWorker, and decWorker already checked noConsumerSession. The other methods do not need to check: * mapLoadsToBrokers is called in listOrEpochs on a valid session * handleListOrEpochResults is the same * desireFetch is only called in source after noConsumerSession is checked, and manageFetchConcurrency is called only in desireFetch Closes redpanda-data/redpanda#13791.
1 parent 6a961da commit c013050

File tree

1 file changed

+9
-0
lines changed

1 file changed

+9
-0
lines changed

pkg/kgo/consumer.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1667,6 +1667,15 @@ func (c *consumer) startNewSession(tps *topicsPartitions) *consumerSession {
16671667
func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool, why string) {
16681668
defer s.decWorker()
16691669

1670+
// It is possible for a metadata update to try to migrate partition
1671+
// loads if the update moves partitions between brokers. If we are
1672+
// closing the client, the consumer session could already be stopped,
1673+
// but this stops before the metadata goroutine is killed. So, if we
1674+
// are in this function but actually have no session, we return.
1675+
if s == noConsumerSession {
1676+
return
1677+
}
1678+
16701679
wait := true
16711680
if immediate {
16721681
s.c.cl.triggerUpdateMetadataNow(why)

0 commit comments

Comments
 (0)