Skip to content

Commit 1c971ae

Browse files
authored
feat(protocol): support CreateTopicRequest V4 (#3238)
Add support for using CreateTopicRequest V4 against Kafka 2.4 and newer clusters. Protocol-wise this is identical to the existing V3, but the version indicates that the server should accept `-1` as the partition and replicate-factor values to indicate "use the server side defaults" (KIP-464). Contributes-to: #3233 Signed-off-by: Dominic Evans <[email protected]>
1 parent 79f54aa commit 1c971ae

File tree

3 files changed

+29
-24
lines changed

3 files changed

+29
-24
lines changed

admin.go

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -251,22 +251,12 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
251251
topicDetails := make(map[string]*TopicDetail)
252252
topicDetails[topic] = detail
253253

254-
request := &CreateTopicsRequest{
255-
TopicDetails: topicDetails,
256-
ValidateOnly: validateOnly,
257-
Timeout: ca.conf.Admin.Timeout,
258-
}
259-
260-
if ca.conf.Version.IsAtLeast(V2_0_0_0) {
261-
// Version 3 is the same as version 2 (brokers response before throttling)
262-
request.Version = 3
263-
} else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
264-
// Version 2 is the same as version 1 (response has ThrottleTime)
265-
request.Version = 2
266-
} else if ca.conf.Version.IsAtLeast(V0_10_2_0) {
267-
// Version 1 adds validateOnly.
268-
request.Version = 1
269-
}
254+
request := NewCreateTopicsRequest(
255+
ca.conf.Version,
256+
topicDetails,
257+
ca.conf.Admin.Timeout,
258+
validateOnly,
259+
)
270260

271261
return ca.retryOnError(isRetriableControllerError, func() error {
272262
b, err := ca.Controller()

create_topics_request.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,29 @@ func (c *CreateTopicsRequest) setVersion(v int16) {
2020
c.Version = v
2121
}
2222

23-
func NewCreateTopicsRequest(version KafkaVersion, topicDetails map[string]*TopicDetail, timeout time.Duration) *CreateTopicsRequest {
23+
func NewCreateTopicsRequest(
24+
version KafkaVersion,
25+
topicDetails map[string]*TopicDetail,
26+
timeout time.Duration,
27+
validateOnly bool,
28+
) *CreateTopicsRequest {
2429
r := &CreateTopicsRequest{
2530
TopicDetails: topicDetails,
2631
Timeout: timeout,
27-
}
28-
if version.IsAtLeast(V2_0_0_0) {
32+
ValidateOnly: validateOnly,
33+
}
34+
switch {
35+
case version.IsAtLeast(V2_4_0_0):
36+
// Version 4 makes partitions/replicationFactor optional even when assignments are not present (KIP-464)
37+
r.Version = 4
38+
case version.IsAtLeast(V2_0_0_0):
39+
// Version 3 is the same as version 2 (brokers response before throttling)
2940
r.Version = 3
30-
} else if version.IsAtLeast(V0_11_0_0) {
41+
case version.IsAtLeast(V0_11_0_0):
42+
// Version 2 is the same as version 1 (response has ThrottleTime)
3143
r.Version = 2
32-
} else if version.IsAtLeast(V0_10_2_0) {
44+
case version.IsAtLeast(V0_10_2_0):
45+
// Version 1 adds validateOnly.
3346
r.Version = 1
3447
}
3548
return r
@@ -102,16 +115,18 @@ func (c *CreateTopicsRequest) version() int16 {
102115
return c.Version
103116
}
104117

105-
func (r *CreateTopicsRequest) headerVersion() int16 {
118+
func (c *CreateTopicsRequest) headerVersion() int16 {
106119
return 1
107120
}
108121

109122
func (c *CreateTopicsRequest) isValidVersion() bool {
110-
return c.Version >= 0 && c.Version <= 3
123+
return c.Version >= 0 && c.Version <= 4
111124
}
112125

113126
func (c *CreateTopicsRequest) requiredVersion() KafkaVersion {
114127
switch c.Version {
128+
case 4:
129+
return V2_4_0_0
115130
case 3:
116131
return V2_0_0_0
117132
case 2:

functional_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error {
387387

388388
// now create the topics empty
389389
{
390-
request := NewCreateTopicsRequest(config.Version, testTopicDetails, time.Minute)
390+
request := NewCreateTopicsRequest(config.Version, testTopicDetails, time.Minute, false)
391391
createRes, err := controller.CreateTopics(request)
392392
if err != nil {
393393
return fmt.Errorf("failed to create test topics: %w", err)

0 commit comments

Comments
 (0)