@@ -150,36 +150,82 @@ type groupConsumer struct {
150150 // We set this once to manage the group lifecycle once.
151151 managing bool
152152
153- dying bool // set when closing, read in findNewAssignments
153+ dying bool // set when closing, read in findNewAssignments
154+ left chan struct {}
155+ leaveErr error // set before left is closed
156+ }
157+
158+ // LeaveGroup leaves a group. Close automatically leaves the group, so this is
159+ // only necessary to call if you plan to leave the group but continue to use
160+ // the client. If a rebalance is in progress, this function waits for the
161+ // rebalance to complete before the group can be left. This is necessary to
162+ // allow you to safely issue one final offset commit in OnPartitionsRevoked. If
163+ // you have overridden the default revoke, you must manually commit offsets
164+ // before leaving the group.
165+ //
166+ // If you have configured the group with an InstanceID, this does not leave the
167+ // group. With instance IDs, it is expected that clients will restart and
168+ // re-use the same instance ID. To leave a group using an instance ID, you must
169+ // manually issue a kmsg.LeaveGroupRequest or use an external tool (kafka
170+ // scripts or kcl).
171+ //
172+ // It is recommended to use LeaveGroupContext to see if the leave was
173+ // successful.
174+ func (cl * Client ) LeaveGroup () {
175+ cl .LeaveGroupContext (cl .ctx )
154176}
155177
156- // LeaveGroup leaves a group if in one. Calling the client's Close function
157- // also leaves a group, so this is only necessary to call if you plan to leave
158- // the group and continue using the client. Note that if a rebalance is in
159- // progress, this function waits for the rebalance to complete before the group
160- // can be left. This is necessary to allow you to safely issue one final offset
161- // commit in OnPartitionsRevoked. If you have overridden the default revoke,
162- // you must manually commit offsets before leaving the group.
178+ // LeaveGroup leaves a group. Close automatically leaves the group, so this is
179+ // only necessary to call if you plan to leave the group but continue to use
180+ // the client. If a rebalance is in progress, this function waits for the
181+ // rebalance to complete before the group can be left. This is necessary to
182+ // allow you to safely issue one final offset commit in OnPartitionsRevoked. If
183+ // you have overridden the default revoke, you must manually commit offsets
184+ // before leaving the group.
185+ //
186+ // The context can be used to avoid waiting for the client to leave the group.
187+ // Not waiting may result in your client being stuck in the group and the
188+ // partitions this client was consuming being stuck until the session timeout.
189+ // This function returns any leave group error or context cancel error. If the
190+ // context is nil, this immediately leaves the group and does not wait and does
191+ // not return an error.
163192//
164193// If you have configured the group with an InstanceID, this does not leave the
165194// group. With instance IDs, it is expected that clients will restart and
166195// re-use the same instance ID. To leave a group using an instance ID, you must
167196// manually issue a kmsg.LeaveGroupRequest or use an external tool (kafka
168197// scripts or kcl).
169- func (cl * Client ) LeaveGroup () {
198+ func (cl * Client ) LeaveGroupContext ( ctx context. Context ) error {
170199 c := & cl .consumer
171200 if c .g == nil {
172- return
201+ return nil
202+ }
203+ var immediate bool
204+ if ctx == nil {
205+ var cancel func ()
206+ ctx , cancel = context .WithCancel (context .Background ())
207+ cancel ()
208+ immediate = true
173209 }
174210
175- c .waitAndAddRebalance ()
176- c .mu .Lock () // lock for assign
177- c .assignPartitions (nil , assignInvalidateAll , nil , "invalidating all assignments in LeaveGroup" )
178- wait := c .g .leave ()
179- c .mu .Unlock ()
180- c .unaddRebalance ()
211+ go func () {
212+ c .waitAndAddRebalance ()
213+ c .mu .Lock () // lock for assign
214+ c .assignPartitions (nil , assignInvalidateAll , nil , "invalidating all assignments in LeaveGroup" )
215+ c .g .leave (ctx )
216+ c .mu .Unlock ()
217+ c .unaddRebalance ()
218+ }()
181219
182- wait () // wait after we unlock
220+ select {
221+ case <- ctx .Done ():
222+ if immediate {
223+ return nil
224+ }
225+ return ctx .Err ()
226+ case <- c .g .left :
227+ return c .g .leaveErr
228+ }
183229}
184230
185231// GroupMetadata returns the current group member ID and generation, or an
@@ -214,6 +260,8 @@ func (c *consumer) initGroup() {
214260 rejoinCh : make (chan string , 1 ),
215261 heartbeatForceCh : make (chan func (error )),
216262 using : make (map [string ]int ),
263+
264+ left : make (chan struct {}),
217265 }
218266 c .g = g
219267 if ! g .cfg .setCommitCallback {
@@ -411,7 +459,7 @@ func (g *groupConsumer) manage() {
411459 }
412460}
413461
414- func (g * groupConsumer ) leave () ( wait func () ) {
462+ func (g * groupConsumer ) leave (ctx context. Context ) {
415463 // If g.using is nonzero before this check, then a manage goroutine has
416464 // started. If not, it will never start because we set dying.
417465 g .mu .Lock ()
@@ -421,43 +469,46 @@ func (g *groupConsumer) leave() (wait func()) {
421469 g .cancel ()
422470 g .mu .Unlock ()
423471
424- done := make (chan struct {})
425-
426472 go func () {
427- defer close (done )
428-
429473 if wasManaging {
430474 // We want to wait for the manage goroutine to be done
431475 // so that we call the user's on{Assign,RevokeLost}.
432476 <- g .manageDone
433477 }
434-
435478 if wasDead {
436479 // If we already called leave(), then we just wait for
437480 // the prior leave to finish and we avoid re-issuing a
438481 // LeaveGroup request.
439482 return
440483 }
441484
442- if g .cfg .instanceID == nil {
443- g .cfg .logger .Log (LogLevelInfo , "leaving group" ,
444- "group" , g .cfg .group ,
445- "member_id" , g .memberID , // lock not needed now since nothing can change it (manageDone)
446- )
447- // If we error when leaving, there is not much
448- // we can do. We may as well just return.
449- req := kmsg .NewPtrLeaveGroupRequest ()
450- req .Group = g .cfg .group
451- req .MemberID = g .memberID
452- member := kmsg .NewLeaveGroupRequestMember ()
453- member .MemberID = g .memberID
454- member .Reason = kmsg .StringPtr ("client leaving group per normal operation" )
455- req .Members = append (req .Members , member )
456- req .RequestWith (g .cl .ctx , g .cl )
485+ defer close (g .left )
486+
487+ if g .cfg .instanceID != nil {
488+ return
457489 }
458- }()
459490
460- return func () { <- done }
491+ g .cfg .logger .Log (LogLevelInfo , "leaving group" ,
492+ "group" , g .cfg .group ,
493+ "member_id" , g .memberID , // lock not needed now since nothing can change it (manageDone)
494+ )
495+ // If we error when leaving, there is not much
496+ // we can do. We may as well just return.
497+ req := kmsg .NewPtrLeaveGroupRequest ()
498+ req .Group = g .cfg .group
499+ req .MemberID = g .memberID
500+ member := kmsg .NewLeaveGroupRequestMember ()
501+ member .MemberID = g .memberID
502+ member .Reason = kmsg .StringPtr ("client leaving group per normal operation" )
503+ req .Members = append (req .Members , member )
504+
505+ resp , err := req .RequestWith (ctx , g .cl )
506+ if err != nil {
507+ g .leaveErr = err
508+ return
509+ }
510+ g .leaveErr = kerr .ErrorForCode (resp .ErrorCode )
511+ }()
461512}
462513
463514// returns the difference of g.nowAssigned and g.lastAssigned.
0 commit comments