File tree Expand file tree Collapse file tree 1 file changed +17
-0
lines changed
Expand file tree Collapse file tree 1 file changed +17
-0
lines changed Original file line number Diff line number Diff line change @@ -484,6 +484,23 @@ func (s *source) loopFetch() {
484484
485485 if session == noConsumerSession {
486486 s .fetchState .hardFinish ()
487+ // It is possible that we were triggered to consume while we
488+ // had no consumer session, and then *after* loopFetch loaded
489+ // noConsumerSession, the session was saved and triggered to
490+ // consume again. If this function is slow the first time
491+ // around, it could still be running and about to hardFinish.
492+ // The second trigger will do nothing, and then we hardFinish
493+ // and block a new session from actually starting consuming.
494+ //
495+ // To guard against this, after we hard finish, we load the
496+ // session again: if it is *not* noConsumerSession, we trigger
497+ // attempting to consume again. Worst case, the trigger is
498+ // useless and it will exit below when it builds an empty
499+ // request.
500+ sessionNow := consumer .loadSession ()
501+ if session != sessionNow {
502+ s .maybeConsume ()
503+ }
487504 return
488505 }
489506
You can’t perform that action at this time.
0 commit comments