Skip to content

Commit dc5283e

Browse files
committed
kgo: re-fix #493, supporting other buggy clients, and add a test
1 parent 32ac27f commit dc5283e

File tree

2 files changed

+16
-1
lines changed

2 files changed

+16
-1
lines changed

pkg/kgo/group_balancer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ func NewConsumerBalancer(balance ConsumerBalancerBalance, members []kmsg.JoinGro
216216
// workaround. See #493.
217217
if bytes.HasPrefix(memberMeta, []byte{0, 1}) {
218218
memberMeta[0] = 0
219-
memberMeta[0] = 0
219+
memberMeta[1] = 0
220220
if err = meta.ReadFrom(memberMeta); err != nil {
221221
return nil, fmt.Errorf("unable to read member metadata: %v", err)
222222
}

pkg/kgo/group_balancer_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,18 @@ func Test_stickyAdjustCooperative(t *testing.T) {
9898
t.Errorf("got plan != exp\ngot: %#v\nexp: %#v\n", inPlan, expPlan)
9999
}
100100
}
101+
102+
func TestNewConsumerBalancerIssue493(t *testing.T) {
103+
m := kmsg.NewConsumerMemberMetadata()
104+
m.Version = 0
105+
m.Topics = []string{"foo"}
106+
protoMeta := m.AppendTo(nil)
107+
protoMeta[1] = 1
108+
member := kmsg.NewJoinGroupResponseMember()
109+
member.MemberID = "test"
110+
member.ProtocolMetadata = protoMeta
111+
_, err := NewConsumerBalancer(nil, []kmsg.JoinGroupResponseMember{member})
112+
if err != nil {
113+
t.Errorf("got unexpected error: %v", err)
114+
}
115+
}

0 commit comments

Comments
 (0)