Skip to content

Commit 4dcfb06

Browse files
committed
kgo: add LeaveGroupContext
This allows more control over timeouts when leaving a group or closing the client, and also gives more insight into group leave errors. Closes #556.
1 parent 5f5c721 commit 4dcfb06

File tree

2 files changed

+103
-43
lines changed

2 files changed

+103
-43
lines changed

pkg/kgo/client.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -929,7 +929,10 @@ func (cl *Client) CloseAllowingRebalance() {
929929
cl.Close()
930930
}
931931

932-
// Close leaves any group and closes all connections and goroutines.
932+
// Close leaves any group and closes all connections and goroutines. This
933+
// function waits for the group to be left. If you want to force leave a group
934+
// immediately and ensure a speedy shutdown you can use LeaveGroupContext first
935+
// (and then Close will be immediate).
933936
//
934937
// If you are group consuming and have overridden the default
935938
// OnPartitionsRevoked, you must manually commit offsets before closing the
@@ -942,6 +945,10 @@ func (cl *Client) CloseAllowingRebalance() {
942945
// notification of revoked partitions. If you want to automatically allow
943946
// rebalancing, use CloseAllowingRebalance.
944947
func (cl *Client) Close() {
948+
cl.close(cl.ctx)
949+
}
950+
951+
func (cl *Client) close(ctx context.Context) (rerr error) {
945952
defer cl.cfg.hooks.each(func(h Hook) {
946953
if h, ok := h.(HookClientClosed); ok {
947954
h.OnClientClosed(cl)
@@ -951,7 +958,7 @@ func (cl *Client) Close() {
951958
c := &cl.consumer
952959
c.kill.Store(true)
953960
if c.g != nil {
954-
cl.LeaveGroup()
961+
rerr = cl.LeaveGroupContext(ctx)
955962
} else if c.d != nil {
956963
c.mu.Lock() // lock for assign
957964
c.assignPartitions(nil, assignInvalidateAll, nil, "") // we do not use a log message when not in a group
@@ -963,7 +970,7 @@ func (cl *Client) Close() {
963970
// loopFetch from starting. Assigning also waits for the prior session
964971
// to be complete, meaning loopFetch cannot be running.
965972

966-
sessCloseCtx, sessCloseCancel := context.WithTimeout(cl.ctx, time.Second)
973+
sessCloseCtx, sessCloseCancel := context.WithTimeout(ctx, time.Second)
967974
var wg sync.WaitGroup
968975
cl.allSinksAndSources(func(sns sinkAndSource) {
969976
if sns.source.session.id != 0 {
@@ -1015,6 +1022,8 @@ func (cl *Client) Close() {
10151022
closing.Close()
10161023
}
10171024
}
1025+
1026+
return rerr
10181027
}
10191028

10201029
// Request issues a request to Kafka, waiting for and returning the response.

pkg/kgo/consumer_group.go

Lines changed: 91 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -150,36 +150,82 @@ type groupConsumer struct {
150150
// We set this once to manage the group lifecycle once.
151151
managing bool
152152

153-
dying bool // set when closing, read in findNewAssignments
153+
dying bool // set when closing, read in findNewAssignments
154+
left chan struct{}
155+
leaveErr error // set before left is closed
156+
}
157+
158+
// LeaveGroup leaves a group. Close automatically leaves the group, so this is
159+
// only necessary to call if you plan to leave the group but continue to use
160+
// the client. If a rebalance is in progress, this function waits for the
161+
// rebalance to complete before the group can be left. This is necessary to
162+
// allow you to safely issue one final offset commit in OnPartitionsRevoked. If
163+
// you have overridden the default revoke, you must manually commit offsets
164+
// before leaving the group.
165+
//
166+
// If you have configured the group with an InstanceID, this does not leave the
167+
// group. With instance IDs, it is expected that clients will restart and
168+
// re-use the same instance ID. To leave a group using an instance ID, you must
169+
// manually issue a kmsg.LeaveGroupRequest or use an external tool (kafka
170+
// scripts or kcl).
171+
//
172+
// It is recommended to use LeaveGroupContext to see if the leave was
173+
// successful.
174+
func (cl *Client) LeaveGroup() {
175+
cl.LeaveGroupContext(cl.ctx)
154176
}
155177

156-
// LeaveGroup leaves a group if in one. Calling the client's Close function
157-
// also leaves a group, so this is only necessary to call if you plan to leave
158-
// the group and continue using the client. Note that if a rebalance is in
159-
// progress, this function waits for the rebalance to complete before the group
160-
// can be left. This is necessary to allow you to safely issue one final offset
161-
// commit in OnPartitionsRevoked. If you have overridden the default revoke,
162-
// you must manually commit offsets before leaving the group.
178+
// LeaveGroup leaves a group. Close automatically leaves the group, so this is
179+
// only necessary to call if you plan to leave the group but continue to use
180+
// the client. If a rebalance is in progress, this function waits for the
181+
// rebalance to complete before the group can be left. This is necessary to
182+
// allow you to safely issue one final offset commit in OnPartitionsRevoked. If
183+
// you have overridden the default revoke, you must manually commit offsets
184+
// before leaving the group.
185+
//
186+
// The context can be used to avoid waiting for the client to leave the group.
187+
// Not waiting may result in your client being stuck in the group and the
188+
// partitions this client was consuming being stuck until the session timeout.
189+
// This function returns any leave group error or context cancel error. If the
190+
// context is nil, this immediately leaves the group and does not wait and does
191+
// not return an error.
163192
//
164193
// If you have configured the group with an InstanceID, this does not leave the
165194
// group. With instance IDs, it is expected that clients will restart and
166195
// re-use the same instance ID. To leave a group using an instance ID, you must
167196
// manually issue a kmsg.LeaveGroupRequest or use an external tool (kafka
168197
// scripts or kcl).
169-
func (cl *Client) LeaveGroup() {
198+
func (cl *Client) LeaveGroupContext(ctx context.Context) error {
170199
c := &cl.consumer
171200
if c.g == nil {
172-
return
201+
return nil
202+
}
203+
var immediate bool
204+
if ctx == nil {
205+
var cancel func()
206+
ctx, cancel = context.WithCancel(context.Background())
207+
cancel()
208+
immediate = true
173209
}
174210

175-
c.waitAndAddRebalance()
176-
c.mu.Lock() // lock for assign
177-
c.assignPartitions(nil, assignInvalidateAll, nil, "invalidating all assignments in LeaveGroup")
178-
wait := c.g.leave()
179-
c.mu.Unlock()
180-
c.unaddRebalance()
211+
go func() {
212+
c.waitAndAddRebalance()
213+
c.mu.Lock() // lock for assign
214+
c.assignPartitions(nil, assignInvalidateAll, nil, "invalidating all assignments in LeaveGroup")
215+
c.g.leave(ctx)
216+
c.mu.Unlock()
217+
c.unaddRebalance()
218+
}()
181219

182-
wait() // wait after we unlock
220+
select {
221+
case <-ctx.Done():
222+
if immediate {
223+
return nil
224+
}
225+
return ctx.Err()
226+
case <-c.g.left:
227+
return c.g.leaveErr
228+
}
183229
}
184230

185231
// GroupMetadata returns the current group member ID and generation, or an
@@ -214,6 +260,8 @@ func (c *consumer) initGroup() {
214260
rejoinCh: make(chan string, 1),
215261
heartbeatForceCh: make(chan func(error)),
216262
using: make(map[string]int),
263+
264+
left: make(chan struct{}),
217265
}
218266
c.g = g
219267
if !g.cfg.setCommitCallback {
@@ -411,7 +459,7 @@ func (g *groupConsumer) manage() {
411459
}
412460
}
413461

414-
func (g *groupConsumer) leave() (wait func()) {
462+
func (g *groupConsumer) leave(ctx context.Context) {
415463
// If g.using is nonzero before this check, then a manage goroutine has
416464
// started. If not, it will never start because we set dying.
417465
g.mu.Lock()
@@ -421,43 +469,46 @@ func (g *groupConsumer) leave() (wait func()) {
421469
g.cancel()
422470
g.mu.Unlock()
423471

424-
done := make(chan struct{})
425-
426472
go func() {
427-
defer close(done)
428-
429473
if wasManaging {
430474
// We want to wait for the manage goroutine to be done
431475
// so that we call the user's on{Assign,RevokeLost}.
432476
<-g.manageDone
433477
}
434-
435478
if wasDead {
436479
// If we already called leave(), then we just wait for
437480
// the prior leave to finish and we avoid re-issuing a
438481
// LeaveGroup request.
439482
return
440483
}
441484

442-
if g.cfg.instanceID == nil {
443-
g.cfg.logger.Log(LogLevelInfo, "leaving group",
444-
"group", g.cfg.group,
445-
"member_id", g.memberID, // lock not needed now since nothing can change it (manageDone)
446-
)
447-
// If we error when leaving, there is not much
448-
// we can do. We may as well just return.
449-
req := kmsg.NewPtrLeaveGroupRequest()
450-
req.Group = g.cfg.group
451-
req.MemberID = g.memberID
452-
member := kmsg.NewLeaveGroupRequestMember()
453-
member.MemberID = g.memberID
454-
member.Reason = kmsg.StringPtr("client leaving group per normal operation")
455-
req.Members = append(req.Members, member)
456-
req.RequestWith(g.cl.ctx, g.cl)
485+
defer close(g.left)
486+
487+
if g.cfg.instanceID != nil {
488+
return
457489
}
458-
}()
459490

460-
return func() { <-done }
491+
g.cfg.logger.Log(LogLevelInfo, "leaving group",
492+
"group", g.cfg.group,
493+
"member_id", g.memberID, // lock not needed now since nothing can change it (manageDone)
494+
)
495+
// If we error when leaving, there is not much
496+
// we can do. We may as well just return.
497+
req := kmsg.NewPtrLeaveGroupRequest()
498+
req.Group = g.cfg.group
499+
req.MemberID = g.memberID
500+
member := kmsg.NewLeaveGroupRequestMember()
501+
member.MemberID = g.memberID
502+
member.Reason = kmsg.StringPtr("client leaving group per normal operation")
503+
req.Members = append(req.Members, member)
504+
505+
resp, err := req.RequestWith(ctx, g.cl)
506+
if err != nil {
507+
g.leaveErr = err
508+
return
509+
}
510+
g.leaveErr = kerr.ErrorForCode(resp.ErrorCode)
511+
}()
461512
}
462513

463514
// returns the difference of g.nowAssigned and g.lastAssigned.

0 commit comments

Comments
 (0)