11package sarama
22
3+ import "encoding/base64"
4+
5+ type Uuid [16 ]byte
6+
7+ func (u Uuid ) String () string {
8+ return base64 .URLEncoding .WithPadding (base64 .NoPadding ).EncodeToString (u [:])
9+ }
10+
11+ var NullUUID = []byte {0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 }
12+
313type MetadataRequest struct {
414 // Version defines the protocol version to use for encode and decode
515 Version int16
616 // Topics contains the topics to fetch metadata for.
717 Topics []string
818 // AllowAutoTopicCreation contains a If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so.
9- AllowAutoTopicCreation bool
19+ AllowAutoTopicCreation bool
20+ IncludeClusterAuthorizedOperations bool // version 8 and up
21+ IncludeTopicAuthorizedOperations bool // version 8 and up
1022}
1123
1224func NewMetadataRequest (version KafkaVersion , topics []string ) * MetadataRequest {
1325 m := & MetadataRequest {Topics : topics }
14- if version .IsAtLeast (V2_1_0_0 ) {
26+ if version .IsAtLeast (V2_8_0_0 ) {
27+ m .Version = 10
28+ } else if version .IsAtLeast (V2_4_0_0 ) {
29+ m .Version = 9
30+ } else if version .IsAtLeast (V2_4_0_0 ) {
31+ m .Version = 8
32+ } else if version .IsAtLeast (V2_1_0_0 ) {
1533 m .Version = 7
1634 } else if version .IsAtLeast (V2_0_0_0 ) {
1735 m .Version = 6
@@ -28,46 +46,124 @@ func NewMetadataRequest(version KafkaVersion, topics []string) *MetadataRequest
2846}
2947
3048func (r * MetadataRequest ) encode (pe packetEncoder ) (err error ) {
31- if r .Version < 0 || r .Version > 7 {
49+ if r .Version < 0 || r .Version > 10 {
3250 return PacketEncodingError {"invalid or unsupported MetadataRequest version field" }
3351 }
3452 if r .Version == 0 || len (r .Topics ) > 0 {
35- err := pe .putArrayLength (len (r .Topics ))
36- if err != nil {
37- return err
38- }
39-
40- for i := range r .Topics {
41- err = pe .putString (r .Topics [i ])
53+ if r .Version < 9 {
54+ err := pe .putArrayLength (len (r .Topics ))
4255 if err != nil {
4356 return err
4457 }
58+
59+ for i := range r .Topics {
60+ err = pe .putString (r .Topics [i ])
61+ if err != nil {
62+ return err
63+ }
64+ }
65+ } else if r .Version == 9 {
66+ pe .putCompactArrayLength (len (r .Topics ))
67+ for _ , topicName := range r .Topics {
68+ if err := pe .putCompactString (topicName ); err != nil {
69+ return err
70+ }
71+ pe .putEmptyTaggedFieldArray ()
72+ }
73+ } else { // r.Version = 10
74+ pe .putCompactArrayLength (len (r .Topics ))
75+ for _ , topicName := range r .Topics {
76+ if err := pe .putRawBytes (NullUUID ); err != nil {
77+ return err
78+ }
79+ // Avoid implicit memory aliasing in for loop
80+ tn := topicName
81+ if err := pe .putNullableCompactString (& tn ); err != nil {
82+ return err
83+ }
84+ pe .putEmptyTaggedFieldArray ()
85+ }
4586 }
4687 } else {
47- pe .putInt32 (- 1 )
88+ if r .Version < 9 {
89+ pe .putInt32 (- 1 )
90+ } else {
91+ pe .putCompactArrayLength (- 1 )
92+ }
4893 }
4994
50- if r .Version >= 4 {
95+ if r .Version > 3 {
5196 pe .putBool (r .AllowAutoTopicCreation )
5297 }
53-
98+ if r .Version > 7 {
99+ pe .putBool (r .IncludeClusterAuthorizedOperations )
100+ pe .putBool (r .IncludeTopicAuthorizedOperations )
101+ }
102+ if r .Version > 8 {
103+ pe .putEmptyTaggedFieldArray ()
104+ }
54105 return nil
55106}
56107
57108func (r * MetadataRequest ) decode (pd packetDecoder , version int16 ) (err error ) {
58109 r .Version = version
59- size , err := pd .getInt32 ()
60- if err != nil {
61- return err
62- }
63- if size > 0 {
64- r .Topics = make ([]string , size )
110+ if r .Version < 9 {
111+ size , err := pd .getInt32 ()
112+ if err != nil {
113+ return err
114+ }
115+ if size > 0 {
116+ r .Topics = make ([]string , size )
117+ for i := range r .Topics {
118+ topic , err := pd .getString ()
119+ if err != nil {
120+ return err
121+ }
122+ r .Topics [i ] = topic
123+ }
124+ }
125+ } else if r .Version == 9 {
126+ size , err := pd .getCompactArrayLength ()
127+ if err != nil {
128+ return err
129+ }
130+ if size > 0 {
131+ r .Topics = make ([]string , size )
132+ }
65133 for i := range r .Topics {
66- topic , err := pd .getString ()
134+ topic , err := pd .getCompactString ()
67135 if err != nil {
68136 return err
69137 }
70138 r .Topics [i ] = topic
139+ if _ , err := pd .getEmptyTaggedFieldArray (); err != nil {
140+ return err
141+ }
142+ }
143+ } else { // version 10+
144+ size , err := pd .getCompactArrayLength ()
145+ if err != nil {
146+ return err
147+ }
148+
149+ if size > 0 {
150+ r .Topics = make ([]string , size )
151+ }
152+ for i := range r .Topics {
153+ if _ , err = pd .getRawBytes (16 ); err != nil { // skip UUID
154+ return err
155+ }
156+ topic , err := pd .getCompactNullableString ()
157+ if err != nil {
158+ return err
159+ }
160+ if topic != nil {
161+ r .Topics [i ] = * topic
162+ }
163+
164+ if _ , err := pd .getEmptyTaggedFieldArray (); err != nil {
165+ return err
166+ }
71167 }
72168 }
73169
@@ -77,6 +173,23 @@ func (r *MetadataRequest) decode(pd packetDecoder, version int16) (err error) {
77173 }
78174 }
79175
176+ if r .Version > 7 {
177+ includeClusterAuthz , err := pd .getBool ()
178+ if err != nil {
179+ return err
180+ }
181+ r .IncludeClusterAuthorizedOperations = includeClusterAuthz
182+ includeTopicAuthz , err := pd .getBool ()
183+ if err != nil {
184+ return err
185+ }
186+ r .IncludeTopicAuthorizedOperations = includeTopicAuthz
187+ }
188+ if r .Version > 8 {
189+ if _ , err := pd .getEmptyTaggedFieldArray (); err != nil {
190+ return err
191+ }
192+ }
80193 return nil
81194}
82195
@@ -89,15 +202,24 @@ func (r *MetadataRequest) version() int16 {
89202}
90203
91204func (r * MetadataRequest ) headerVersion () int16 {
205+ if r .Version >= 9 {
206+ return 2
207+ }
92208 return 1
93209}
94210
95211func (r * MetadataRequest ) isValidVersion () bool {
96- return r .Version >= 0 && r .Version <= 7
212+ return r .Version >= 0 && r .Version <= 10
97213}
98214
99215func (r * MetadataRequest ) requiredVersion () KafkaVersion {
100216 switch r .Version {
217+ case 10 :
218+ return V2_8_0_0
219+ case 9 :
220+ return V2_4_0_0
221+ case 8 :
222+ return V2_3_0_0
101223 case 7 :
102224 return V2_1_0_0
103225 case 6 :
@@ -113,6 +235,6 @@ func (r *MetadataRequest) requiredVersion() KafkaVersion {
113235 case 0 :
114236 return V0_8_2_0
115237 default :
116- return V2_1_0_0
238+ return V2_8_0_0
117239 }
118240}
0 commit comments