feat(broker): negotiate MetadataRequest version#3158
feat(broker): negotiate MetadataRequest version#3158
Conversation
puellanivis
left a comment
There was a problem hiding this comment.
Really, only coming up with lint, which is a good sign. 🙂
9dc76a8 to
5b46375
Compare
puellanivis
left a comment
There was a problem hiding this comment.
I have run out of anything to comment on. :)
|
@puellanivis thanks for the review! I'd love to run the CI on it to see if there's anything I've missed. |
a9aa93e to
8208821
Compare
dnwe
left a comment
There was a problem hiding this comment.
@trapped thanks for spotting the incompatibility with redpanda and coming up with this PR to start to adhere to the apiversions responses
I know you added it to avoid changing the existing NewMetadataRequest signature, but I was wondering if we could avoid the need for NewNegotiatedMetadataRequest and the changes in client.go (which take the broker lock) if we instead introduced a new func to the protocolBody interface something like restrictApiVersion(minVersion, maxVersion int16) error which gives an error if the existing version is less than minVersion and silently rolls down the version if it exceeds maxVersion. Then we replace your "validate the request is using a supported API version" in sendInternal to call rb.restrictProtocolVersions(apiVersion.MinVersion, apiVersions.MaxVersion) if there is a value in b.brokerAPIVersions?
The implementation in metadata_request.go would be something like:
func (m *MetadataRequest) restrictApiVersion(minVersion, maxVersion int16) error {
if m.Version < minVersion {
return fmt.Errorf("%w: unsupported API version %d for %T, supported versions are %d-%d",
ErrUnsupportedVersion, m.Version, m, minVersion, maxVersion)
}
m.Version = max(m.Version, maxVersion)
return nil
}Which I think would still give the behaviour that Version in sarama.Config can be used by a user to pin to "no more than was supported by this KafkaVersion" whilst also adhering to "no more than is supported by the ApiVersionsResponse maxVersion values either"
|
Hey @dnwe! Of course, I'm happy to make wider changes 🙂 Yours seems like a really nice idea. It should be easy to then add the method to every other request, at which point we'd essentially have this "version negotiation" for everything I guess. I'll come up with the changes ASAP! |
|
@trapped 😅 well it was just an initial idea, so don't discard what you have here, but if you have time and motivation then feel free to give it a go on a separate branch and see how it might look The useful property is that it can all be done as "internal" api changes so we don't have to worry about anyone that might be using the protocol types externally, which gives us more freedom to experiment and redesign as we go along |
|
@dnwe yeah, makes sense 🙂 I'll share a link to another draft PR here soon. |
|
@trapped thanks! One thing I forgot to take account of in my original snippet was that maxVersion supported by the remote broker can and will exceed the max version of the protocol that we've actually implemented the encoding/decoding for within Sarama. I'd also incorrectly used e.g., for FetchRequest we've only currently implemented up to V11 but Kafka 2.7 and newer will advertise V12+ – so we need to include that in the sum. So for MetadataRequest probably something like this: func (m *MetadataRequest) restrictApiVersion(minVersion, maxVersion int16) error {
maxEncodedVersion := min(10, maxVersion) // see existing isValidVersion() bound check for max supported by Sarama
if m.Version < minVersion {
return fmt.Errorf("%w: unsupported API version %d for %T, supported versions are %d-%d",
ErrUnsupportedVersion, m.Version, m, minVersion, maxEncodedVersion)
}
m.Version = min(m.Version, maxEncodedVersion)
return nil
} |
The latest
github.com/IBM/saramaversion fails to connect to Redpanda brokers (latest version too):Unsupported version 10 for metadata APIkafka: client has run out of available brokers to talk to: EOFSarama logs
``` 2025/05/13 18:26:29 Initializing new client 2025/05/13 18:26:29 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:29 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:29 client/metadata fetching metadata for all topics from broker localhost:9092 2025/05/13 18:26:29 Connected to broker at localhost:9092 (unregistered) 2025/05/13 18:26:29 client/metadata got error from broker -1 while fetching metadata: EOF 2025/05/13 18:26:29 Closed connection to broker localhost:9092 2025/05/13 18:26:29 client/metadata no available broker to send metadata request to 2025/05/13 18:26:29 client/brokers resurrecting 1 dead seed brokers 2025/05/13 18:26:29 Error while sending ApiVersionsRequest to broker localhost:9092: kafka: broker not connected 2025/05/13 18:26:30 client/metadata retrying after 250ms... (2 attempts remaining) 2025/05/13 18:26:30 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:30 client/metadata fetching metadata for all topics from broker localhost:9092 2025/05/13 18:26:30 Connected to broker at localhost:9092 (unregistered) 2025/05/13 18:26:30 client/metadata got error from broker -1 while fetching metadata: EOF 2025/05/13 18:26:30 Closed connection to broker localhost:9092 2025/05/13 18:26:30 client/metadata no available broker to send metadata request to 2025/05/13 18:26:30 client/brokers resurrecting 1 dead seed brokers 2025/05/13 18:26:30 client/metadata retrying after 250ms... (1 attempts remaining) 2025/05/13 18:26:30 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:30 client/metadata fetching metadata for all topics from broker localhost:9092 2025/05/13 18:26:30 Connected to broker at localhost:9092 (unregistered) 2025/05/13 18:26:30 client/metadata got error from broker -1 while fetching metadata: EOF 2025/05/13 18:26:30 Closed connection to broker localhost:9092 2025/05/13 18:26:30 client/metadata no available broker to send metadata request to 2025/05/13 18:26:30 client/brokers resurrecting 1 dead seed brokers 2025/05/13 18:26:30 client/metadata retrying after 250ms... (0 attempts remaining) 2025/05/13 18:26:30 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:30 client/metadata fetching metadata for all topics from broker localhost:9092 2025/05/13 18:26:30 Connected to broker at localhost:9092 (unregistered) 2025/05/13 18:26:30 client/metadata got error from broker -1 while fetching metadata: EOF 2025/05/13 18:26:30 Closed connection to broker localhost:9092 2025/05/13 18:26:30 client/metadata no available broker to send metadata request to 2025/05/13 18:26:30 client/brokers resurrecting 1 dead seed brokers 2025/05/13 18:26:30 Closing Client ```The culprit being simply that:
config.Version, even though it sends an ApiVersionsRequestThis PR adds a
brokerAPIVersionsfield to theBrokertype, storing theApiVersionsResponsevalues ifconf.Version.IsAtLeast(V2_4_0_0) && conf.ApiVersionsRequest; request versions are then validated before sending requests.Additionally, adds a
NewNegotiatedMetadataRequestconstructor that behaves likeNewMetadataRequest, but considers the broker's advertised min/max API versions; this is then used in the client when refreshing metadata.These changes allow connecting to Redpanda while maintaining
config.Version = sarama.MaxVersion; they are not a complete implementation of version selection. Hopefully though this can be a starting point.