Skip to content

Commit 76ca69a

Browse files
prestonadnwe
authored andcommitted
feat(proto): support for Metadata V6-V10
In particular, this adds support for topic UUIDs Signed-off-by: Adrian Preston <[email protected]>
1 parent 10dd922 commit 76ca69a

File tree

9 files changed

+761
-68
lines changed

9 files changed

+761
-68
lines changed

async_producer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1780,7 +1780,7 @@ func TestTxnProduceBumpEpoch(t *testing.T) {
17801780
config.ApiVersionsRequest = false
17811781

17821782
metadataLeader := new(MetadataResponse)
1783-
metadataLeader.Version = 7
1783+
metadataLeader.Version = 9
17841784
metadataLeader.ControllerID = broker.brokerID
17851785
metadataLeader.AddBroker(broker.Addr(), broker.BrokerID())
17861786
metadataLeader.AddTopic("test-topic", ErrNoError)

broker.go

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,7 @@ func (b *Broker) Rack() string {
371371
// GetMetadata send a metadata request and returns a metadata response or error
372372
func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
373373
response := new(MetadataResponse)
374+
response.Version = request.Version // Required to ensure use of the correct response header version
374375

375376
err := b.sendAndReceive(request, response)
376377
if err != nil {
@@ -1072,7 +1073,12 @@ func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
10721073
return err
10731074
}
10741075

1075-
host, err := pd.getString()
1076+
var host string
1077+
if version < 9 {
1078+
host, err = pd.getString()
1079+
} else {
1080+
host, err = pd.getCompactString()
1081+
}
10761082
if err != nil {
10771083
return err
10781084
}
@@ -1082,18 +1088,27 @@ func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
10821088
return err
10831089
}
10841090

1085-
if version >= 1 {
1091+
if version >= 1 && version < 9 {
10861092
b.rack, err = pd.getNullableString()
1087-
if err != nil {
1088-
return err
1089-
}
1093+
} else if version >= 9 {
1094+
b.rack, err = pd.getCompactNullableString()
1095+
}
1096+
if err != nil {
1097+
return err
10901098
}
10911099

10921100
b.addr = net.JoinHostPort(host, fmt.Sprint(port))
10931101
if _, _, err := net.SplitHostPort(b.addr); err != nil {
10941102
return err
10951103
}
10961104

1105+
if version >= 9 {
1106+
_, err := pd.getEmptyTaggedFieldArray()
1107+
if err != nil {
1108+
return err
1109+
}
1110+
}
1111+
10971112
return nil
10981113
}
10991114

@@ -1110,20 +1125,32 @@ func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
11101125

11111126
pe.putInt32(b.id)
11121127

1113-
err = pe.putString(host)
1128+
if version < 9 {
1129+
err = pe.putString(host)
1130+
} else {
1131+
err = pe.putCompactString(host)
1132+
}
11141133
if err != nil {
11151134
return err
11161135
}
11171136

11181137
pe.putInt32(int32(port))
11191138

11201139
if version >= 1 {
1121-
err = pe.putNullableString(b.rack)
1140+
if version < 9 {
1141+
err = pe.putNullableString(b.rack)
1142+
} else {
1143+
err = pe.putNullableCompactString(b.rack)
1144+
}
11221145
if err != nil {
11231146
return err
11241147
}
11251148
}
11261149

1150+
if version >= 9 {
1151+
pe.putEmptyTaggedFieldArray()
1152+
}
1153+
11271154
return nil
11281155
}
11291156

consumer_group_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,16 @@ func TestConsume_RaceTest(t *testing.T) {
134134
Offset: offsetStart,
135135
LeaderEpoch: 0,
136136
Metadata: "",
137-
Err: ErrNoError})
137+
Err: ErrNoError,
138+
})
138139

139140
offsetResponse := &OffsetResponse{
140141
Version: 1,
141142
}
142143
offsetResponse.AddTopicPartition(topic, 0, offsetStart)
143144

144145
metadataResponse := new(MetadataResponse)
146+
metadataResponse.Version = 10
145147
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
146148
metadataResponse.AddTopic("mismatched-topic", ErrUnknownTopicOrPartition)
147149

metadata_request.go

Lines changed: 144 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,35 @@
11
package sarama
22

3+
import "encoding/base64"
4+
5+
type Uuid [16]byte
6+
7+
func (u Uuid) String() string {
8+
return base64.URLEncoding.WithPadding(base64.NoPadding).EncodeToString(u[:])
9+
}
10+
11+
var NullUUID = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
12+
313
type MetadataRequest struct {
414
// Version defines the protocol version to use for encode and decode
515
Version int16
616
// Topics contains the topics to fetch metadata for.
717
Topics []string
818
// AllowAutoTopicCreation contains a If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so.
9-
AllowAutoTopicCreation bool
19+
AllowAutoTopicCreation bool
20+
IncludeClusterAuthorizedOperations bool // version 8 and up
21+
IncludeTopicAuthorizedOperations bool // version 8 and up
1022
}
1123

1224
func NewMetadataRequest(version KafkaVersion, topics []string) *MetadataRequest {
1325
m := &MetadataRequest{Topics: topics}
14-
if version.IsAtLeast(V2_1_0_0) {
26+
if version.IsAtLeast(V2_8_0_0) {
27+
m.Version = 10
28+
} else if version.IsAtLeast(V2_4_0_0) {
29+
m.Version = 9
30+
} else if version.IsAtLeast(V2_4_0_0) {
31+
m.Version = 8
32+
} else if version.IsAtLeast(V2_1_0_0) {
1533
m.Version = 7
1634
} else if version.IsAtLeast(V2_0_0_0) {
1735
m.Version = 6
@@ -28,46 +46,124 @@ func NewMetadataRequest(version KafkaVersion, topics []string) *MetadataRequest
2846
}
2947

3048
func (r *MetadataRequest) encode(pe packetEncoder) (err error) {
31-
if r.Version < 0 || r.Version > 7 {
49+
if r.Version < 0 || r.Version > 10 {
3250
return PacketEncodingError{"invalid or unsupported MetadataRequest version field"}
3351
}
3452
if r.Version == 0 || len(r.Topics) > 0 {
35-
err := pe.putArrayLength(len(r.Topics))
36-
if err != nil {
37-
return err
38-
}
39-
40-
for i := range r.Topics {
41-
err = pe.putString(r.Topics[i])
53+
if r.Version < 9 {
54+
err := pe.putArrayLength(len(r.Topics))
4255
if err != nil {
4356
return err
4457
}
58+
59+
for i := range r.Topics {
60+
err = pe.putString(r.Topics[i])
61+
if err != nil {
62+
return err
63+
}
64+
}
65+
} else if r.Version == 9 {
66+
pe.putCompactArrayLength(len(r.Topics))
67+
for _, topicName := range r.Topics {
68+
if err := pe.putCompactString(topicName); err != nil {
69+
return err
70+
}
71+
pe.putEmptyTaggedFieldArray()
72+
}
73+
} else { // r.Version = 10
74+
pe.putCompactArrayLength(len(r.Topics))
75+
for _, topicName := range r.Topics {
76+
if err := pe.putRawBytes(NullUUID); err != nil {
77+
return err
78+
}
79+
// Avoid implicit memory aliasing in for loop
80+
tn := topicName
81+
if err := pe.putNullableCompactString(&tn); err != nil {
82+
return err
83+
}
84+
pe.putEmptyTaggedFieldArray()
85+
}
4586
}
4687
} else {
47-
pe.putInt32(-1)
88+
if r.Version < 9 {
89+
pe.putInt32(-1)
90+
} else {
91+
pe.putCompactArrayLength(-1)
92+
}
4893
}
4994

50-
if r.Version >= 4 {
95+
if r.Version > 3 {
5196
pe.putBool(r.AllowAutoTopicCreation)
5297
}
53-
98+
if r.Version > 7 {
99+
pe.putBool(r.IncludeClusterAuthorizedOperations)
100+
pe.putBool(r.IncludeTopicAuthorizedOperations)
101+
}
102+
if r.Version > 8 {
103+
pe.putEmptyTaggedFieldArray()
104+
}
54105
return nil
55106
}
56107

57108
func (r *MetadataRequest) decode(pd packetDecoder, version int16) (err error) {
58109
r.Version = version
59-
size, err := pd.getInt32()
60-
if err != nil {
61-
return err
62-
}
63-
if size > 0 {
64-
r.Topics = make([]string, size)
110+
if r.Version < 9 {
111+
size, err := pd.getInt32()
112+
if err != nil {
113+
return err
114+
}
115+
if size > 0 {
116+
r.Topics = make([]string, size)
117+
for i := range r.Topics {
118+
topic, err := pd.getString()
119+
if err != nil {
120+
return err
121+
}
122+
r.Topics[i] = topic
123+
}
124+
}
125+
} else if r.Version == 9 {
126+
size, err := pd.getCompactArrayLength()
127+
if err != nil {
128+
return err
129+
}
130+
if size > 0 {
131+
r.Topics = make([]string, size)
132+
}
65133
for i := range r.Topics {
66-
topic, err := pd.getString()
134+
topic, err := pd.getCompactString()
67135
if err != nil {
68136
return err
69137
}
70138
r.Topics[i] = topic
139+
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
140+
return err
141+
}
142+
}
143+
} else { // version 10+
144+
size, err := pd.getCompactArrayLength()
145+
if err != nil {
146+
return err
147+
}
148+
149+
if size > 0 {
150+
r.Topics = make([]string, size)
151+
}
152+
for i := range r.Topics {
153+
if _, err = pd.getRawBytes(16); err != nil { // skip UUID
154+
return err
155+
}
156+
topic, err := pd.getCompactNullableString()
157+
if err != nil {
158+
return err
159+
}
160+
if topic != nil {
161+
r.Topics[i] = *topic
162+
}
163+
164+
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
165+
return err
166+
}
71167
}
72168
}
73169

@@ -77,6 +173,23 @@ func (r *MetadataRequest) decode(pd packetDecoder, version int16) (err error) {
77173
}
78174
}
79175

176+
if r.Version > 7 {
177+
includeClusterAuthz, err := pd.getBool()
178+
if err != nil {
179+
return err
180+
}
181+
r.IncludeClusterAuthorizedOperations = includeClusterAuthz
182+
includeTopicAuthz, err := pd.getBool()
183+
if err != nil {
184+
return err
185+
}
186+
r.IncludeTopicAuthorizedOperations = includeTopicAuthz
187+
}
188+
if r.Version > 8 {
189+
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
190+
return err
191+
}
192+
}
80193
return nil
81194
}
82195

@@ -89,15 +202,24 @@ func (r *MetadataRequest) version() int16 {
89202
}
90203

91204
func (r *MetadataRequest) headerVersion() int16 {
205+
if r.Version >= 9 {
206+
return 2
207+
}
92208
return 1
93209
}
94210

95211
func (r *MetadataRequest) isValidVersion() bool {
96-
return r.Version >= 0 && r.Version <= 7
212+
return r.Version >= 0 && r.Version <= 10
97213
}
98214

99215
func (r *MetadataRequest) requiredVersion() KafkaVersion {
100216
switch r.Version {
217+
case 10:
218+
return V2_8_0_0
219+
case 9:
220+
return V2_4_0_0
221+
case 8:
222+
return V2_3_0_0
101223
case 7:
102224
return V2_1_0_0
103225
case 6:
@@ -113,6 +235,6 @@ func (r *MetadataRequest) requiredVersion() KafkaVersion {
113235
case 0:
114236
return V0_8_2_0
115237
default:
116-
return V2_1_0_0
238+
return V2_8_0_0
117239
}
118240
}

0 commit comments

Comments
 (0)