Skip to content

Commit 5978156

Browse files
committed
kversion: fix version detection for Kafka v2.7 through 3.4
Kafka v3.4 added envelope support for the Envelope request to the zk based broker, so I added that to kversion. This made Envelope a required request for version detection ever since it was introduced -- v2.7 for raft -- and pinned version detection to "at least 2.7" for all cluster between 2.7 and 3.4. We now ignore the envelope key when version guessing because ultimately other keys are differentiating enough. This allows version detection to now again correctly guess 2.7 through 3.4. Tested against zk 3.1, 3.2, 3.5, as well as kraft 3.5 manually, and adds a unit test for the 3.1 versions. Closes #536.
1 parent 83cb9fe commit 5978156

File tree

2 files changed

+84
-4
lines changed

2 files changed

+84
-4
lines changed

pkg/kversion/kversion.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func SkipKeys(keys ...int16) VersionGuessOpt {
138138
}
139139

140140
// TryRaftBroker changes from guessing the version for a classical ZooKeeper
141-
// based broker to guessing for a raft based broker (v2.8.0+).
141+
// based broker to guessing for a raft based broker (v2.8+).
142142
//
143143
// Note that with raft, there can be a TryRaftController attempt as well.
144144
func TryRaftBroker() VersionGuessOpt {
@@ -147,7 +147,7 @@ func TryRaftBroker() VersionGuessOpt {
147147

148148
// TryRaftController changes from guessing the version for a classical
149149
// ZooKeeper based broker to guessing for a raft based controller broker
150-
// (v2.8.0+).
150+
// (v2.8+).
151151
//
152152
// Note that with raft, there can be a TryRaftBroker attempt as well. Odds are
153153
// that if you are an end user speaking to a raft based Kafka cluster, you are
@@ -164,7 +164,7 @@ type guessCfg struct {
164164

165165
// VersionGuess attempts to guess which version of Kafka these versions belong
166166
// to. If an exact match can be determined, this returns a string in the format
167-
// v0.#.# or v#.# (depending on whether Kafka is pre-1.0.0 or post). For
167+
// v0.#.# or v#.# (depending on whether Kafka is pre-1.0 or post). For
168168
// example, v0.8.0 or v2.7.
169169
//
170170
// Patch numbers are not included in the guess as it is not possible to
@@ -253,7 +253,15 @@ func (g guess) String() string {
253253
func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess {
254254
cfg := guessCfg{
255255
listener: zkBroker,
256-
skipKeys: []int16{4, 5, 6, 7, 27},
256+
// Envelope was added in 2.7 for kraft and zkBroker in 3.4; we
257+
// need to skip it for 2.7 through 3.4 otherwise the version
258+
// detection fails. We can just skip it generally since there
259+
// are enough differentiating factors that accurately detecting
260+
// envelope doesn't matter.
261+
//
262+
// TODO: add introduced-version to differentiate some specific
263+
// keys.
264+
skipKeys: []int16{4, 5, 6, 7, 27, 58},
257265
}
258266
for _, opt := range opts {
259267
opt.apply(&cfg)

pkg/kversion/kversion_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,3 +119,75 @@ func TestEqual(t *testing.T) {
119119
t.Errorf("unexpectedly not equal after backing v0.8.1 down to v0.8.0, opposite direction")
120120
}
121121
}
122+
123+
func TestVersionProbeKafka3_1(t *testing.T) {
124+
versions := map[int16]int16{
125+
0: 9, // Produce
126+
1: 13, // Fetch
127+
2: 7, // ListOffsets
128+
3: 12, // Metadata
129+
4: 5, // LeaderAndISR
130+
5: 3, // StopReplica
131+
6: 7, // UpdateMetadata
132+
7: 3, // ControlledShutdown
133+
8: 8, // OffsetCommit
134+
9: 8, // OffsetFetch
135+
10: 4, // FindCoordinator
136+
11: 7, // JoinGroup
137+
12: 4, // Heartbeat
138+
13: 4, // LeaveGroup
139+
14: 5, // SyncGroup
140+
15: 5, // DescribeGroups
141+
16: 4, // ListGroups
142+
17: 1, // SASLHandshake
143+
18: 3, // ApiVersions
144+
19: 7, // CreateTopics
145+
20: 6, // DeleteTopics
146+
21: 2, // DeleteRecords
147+
22: 4, // InitProducerID
148+
23: 4, // OffsetForLeaderEpoch
149+
24: 3, // AddPartitionsToTxn
150+
25: 3, // AddOffsetsToTxn
151+
26: 3, // EndTxn
152+
27: 1, // WriteTxnMarkers
153+
28: 3, // TxnOffsetCommit
154+
29: 2, // DescribeACLs
155+
30: 2, // CreateACLs
156+
31: 2, // DeleteACLs
157+
32: 4, // DescribeConfigs
158+
33: 2, // AlterConfigs
159+
34: 2, // AlterReplicaLogDirs
160+
35: 2, // DescribeLogDirs
161+
36: 2, // SASLAuthenticate
162+
37: 3, // CreatePartitions
163+
38: 2, // CreateDelegationToken
164+
39: 2, // RenewDelegationToken
165+
40: 2, // ExpireDelegationToken
166+
41: 2, // DescribeDelegationToken
167+
42: 2, // DeleteGroups
168+
43: 2, // ElectLeaders
169+
44: 1, // IncrementalAlterConfigs
170+
45: 0, // AlterPartitionAssignments
171+
46: 0, // ListPartitionReassignments
172+
47: 0, // OffsetDelete
173+
48: 1, // DescribeClientQuotas
174+
49: 1, // AlterClientQuotas
175+
50: 0, // DescribeUserSCRAMCredentials
176+
51: 0, // AlterUserSCRAMCredentials
177+
56: 0, // AlterPartition
178+
57: 0, // UpdateFeatures
179+
60: 0, // DescribeCluster
180+
61: 0, // DescribeProducers
181+
65: 0, // DescribeTransactions
182+
66: 0, // ListTransactions
183+
67: 0, // AllocateProducerIDs
184+
}
185+
186+
var vs Versions
187+
for k, v := range versions {
188+
vs.SetMaxKeyVersion(k, v)
189+
}
190+
if guess := vs.VersionGuess(); guess != "v3.1" {
191+
t.Errorf("unexpected version guess, got %s != exp %s", guess, "v3.1")
192+
}
193+
}

0 commit comments

Comments
 (0)