Skip to content

Commit 1f696ca

Browse files
committed
kgo: avoid a consumer logic race where the consumer stops consuming
1 parent c1bb2be commit 1f696ca

File tree

1 file changed

+17
-0
lines changed

1 file changed

+17
-0
lines changed

pkg/kgo/source.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,23 @@ func (s *source) loopFetch() {
484484

485485
if session == noConsumerSession {
486486
s.fetchState.hardFinish()
487+
// It is possible that we were triggered to consume while we
488+
// had no consumer session, and then *after* loopFetch loaded
489+
// noConsumerSession, the session was saved and triggered to
490+
// consume again. If this function is slow the first time
491+
// around, it could still be running and about to hardFinish.
492+
// The second trigger will do nothing, and then we hardFinish
493+
// and block a new session from actually starting consuming.
494+
//
495+
// To guard against this, after we hard finish, we load the
496+
// session again: if it is *not* noConsumerSession, we trigger
497+
// attempting to consume again. Worst case, the trigger is
498+
// useless and it will exit below when it builds an empty
499+
// request.
500+
sessionNow := consumer.loadSession()
501+
if session != sessionNow {
502+
s.maybeConsume()
503+
}
487504
return
488505
}
489506

0 commit comments

Comments
 (0)