Skip to content

feat(broker): negotiate MetadataRequest version#3158

Closed
trapped wants to merge 0 commit intoIBM:mainfrom
trapped:main
Closed

feat(broker): negotiate MetadataRequest version#3158
trapped wants to merge 0 commit intoIBM:mainfrom
trapped:main

Conversation

@trapped
Copy link
Copy Markdown
Contributor

@trapped trapped commented May 14, 2025

The latest github.com/IBM/sarama version fails to connect to Redpanda brokers (latest version too):

  • Redpanda rejects the request with Unsupported version 10 for metadata API
  • sarama fails with kafka: client has run out of available brokers to talk to: EOF
Sarama logs ``` 2025/05/13 18:26:29 Initializing new client 2025/05/13 18:26:29 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:29 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:29 client/metadata fetching metadata for all topics from broker localhost:9092 2025/05/13 18:26:29 Connected to broker at localhost:9092 (unregistered) 2025/05/13 18:26:29 client/metadata got error from broker -1 while fetching metadata: EOF 2025/05/13 18:26:29 Closed connection to broker localhost:9092 2025/05/13 18:26:29 client/metadata no available broker to send metadata request to 2025/05/13 18:26:29 client/brokers resurrecting 1 dead seed brokers 2025/05/13 18:26:29 Error while sending ApiVersionsRequest to broker localhost:9092: kafka: broker not connected 2025/05/13 18:26:30 client/metadata retrying after 250ms... (2 attempts remaining) 2025/05/13 18:26:30 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:30 client/metadata fetching metadata for all topics from broker localhost:9092 2025/05/13 18:26:30 Connected to broker at localhost:9092 (unregistered) 2025/05/13 18:26:30 client/metadata got error from broker -1 while fetching metadata: EOF 2025/05/13 18:26:30 Closed connection to broker localhost:9092 2025/05/13 18:26:30 client/metadata no available broker to send metadata request to 2025/05/13 18:26:30 client/brokers resurrecting 1 dead seed brokers 2025/05/13 18:26:30 client/metadata retrying after 250ms... (1 attempts remaining) 2025/05/13 18:26:30 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:30 client/metadata fetching metadata for all topics from broker localhost:9092 2025/05/13 18:26:30 Connected to broker at localhost:9092 (unregistered) 2025/05/13 18:26:30 client/metadata got error from broker -1 while fetching metadata: EOF 2025/05/13 18:26:30 Closed connection to broker localhost:9092 2025/05/13 18:26:30 client/metadata no available broker to send metadata request to 2025/05/13 18:26:30 client/brokers resurrecting 1 dead seed brokers 2025/05/13 18:26:30 client/metadata retrying after 250ms... (0 attempts remaining) 2025/05/13 18:26:30 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:30 client/metadata fetching metadata for all topics from broker localhost:9092 2025/05/13 18:26:30 Connected to broker at localhost:9092 (unregistered) 2025/05/13 18:26:30 client/metadata got error from broker -1 while fetching metadata: EOF 2025/05/13 18:26:30 Closed connection to broker localhost:9092 2025/05/13 18:26:30 client/metadata no available broker to send metadata request to 2025/05/13 18:26:30 client/brokers resurrecting 1 dead seed brokers 2025/05/13 18:26:30 Closing Client ```

The culprit being simply that:

  1. Sarama chooses request versions solely according to the provided config.Version, even though it sends an ApiVersionsRequest
  2. MetadataRequest uses versions 8-9-10 for Kafka versions >= v2.4.0.0
  3. Redpanda only supports versions 0 to 8, and it correctly advertises it

This PR adds a brokerAPIVersions field to the Broker type, storing the ApiVersionsResponse values if conf.Version.IsAtLeast(V2_4_0_0) && conf.ApiVersionsRequest; request versions are then validated before sending requests.

Additionally, adds a NewNegotiatedMetadataRequest constructor that behaves like NewMetadataRequest, but considers the broker's advertised min/max API versions; this is then used in the client when refreshing metadata.

These changes allow connecting to Redpanda while maintaining config.Version = sarama.MaxVersion; they are not a complete implementation of version selection. Hopefully though this can be a starting point.

@trapped trapped marked this pull request as ready for review May 14, 2025 08:30
Copy link
Copy Markdown
Collaborator

@puellanivis puellanivis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really, only coming up with lint, which is a good sign. 🙂

@trapped trapped force-pushed the main branch 2 times, most recently from 9dc76a8 to 5b46375 Compare May 16, 2025 15:10
Copy link
Copy Markdown
Collaborator

@puellanivis puellanivis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have run out of anything to comment on. :)

@trapped
Copy link
Copy Markdown
Contributor Author

trapped commented May 19, 2025

@puellanivis thanks for the review! I'd love to run the CI on it to see if there's anything I've missed.

@dnwe dnwe force-pushed the main branch 2 times, most recently from a9aa93e to 8208821 Compare May 19, 2025 14:06
Copy link
Copy Markdown
Collaborator

@dnwe dnwe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@trapped thanks for spotting the incompatibility with redpanda and coming up with this PR to start to adhere to the apiversions responses

I know you added it to avoid changing the existing NewMetadataRequest signature, but I was wondering if we could avoid the need for NewNegotiatedMetadataRequest and the changes in client.go (which take the broker lock) if we instead introduced a new func to the protocolBody interface something like restrictApiVersion(minVersion, maxVersion int16) error which gives an error if the existing version is less than minVersion and silently rolls down the version if it exceeds maxVersion. Then we replace your "validate the request is using a supported API version" in sendInternal to call rb.restrictProtocolVersions(apiVersion.MinVersion, apiVersions.MaxVersion) if there is a value in b.brokerAPIVersions?

The implementation in metadata_request.go would be something like:

func (m *MetadataRequest) restrictApiVersion(minVersion, maxVersion int16) error {
	if m.Version < minVersion {
		return fmt.Errorf("%w: unsupported API version %d for %T, supported versions are %d-%d",
			ErrUnsupportedVersion, m.Version, m, minVersion, maxVersion)
	}
	m.Version = max(m.Version, maxVersion)
	return nil
}

Which I think would still give the behaviour that Version in sarama.Config can be used by a user to pin to "no more than was supported by this KafkaVersion" whilst also adhering to "no more than is supported by the ApiVersionsResponse maxVersion values either"

@trapped
Copy link
Copy Markdown
Contributor Author

trapped commented May 19, 2025

Hey @dnwe! Of course, I'm happy to make wider changes 🙂

Yours seems like a really nice idea. It should be easy to then add the method to every other request, at which point we'd essentially have this "version negotiation" for everything I guess.

I'll come up with the changes ASAP!

@dnwe
Copy link
Copy Markdown
Collaborator

dnwe commented May 19, 2025

@trapped 😅 well it was just an initial idea, so don't discard what you have here, but if you have time and motivation then feel free to give it a go on a separate branch and see how it might look

The useful property is that it can all be done as "internal" api changes so we don't have to worry about anyone that might be using the protocol types externally, which gives us more freedom to experiment and redesign as we go along

@trapped
Copy link
Copy Markdown
Contributor Author

trapped commented May 19, 2025

@dnwe yeah, makes sense 🙂 I'll share a link to another draft PR here soon.

@trapped
Copy link
Copy Markdown
Contributor Author

trapped commented May 20, 2025

@dnwe for now I've opened a draft and copypasted your snipped practically verbatim to all request/response types: #3170

@dnwe
Copy link
Copy Markdown
Collaborator

dnwe commented May 20, 2025

@trapped thanks! One thing I forgot to take account of in my original snippet was that maxVersion supported by the remote broker can and will exceed the max version of the protocol that we've actually implemented the encoding/decoding for within Sarama. I'd also incorrectly used max where we probably wanted min semantics, as we probably want the old Version field from sarama.Config to pin the maximum versions that are used, and people could set Version to sarama.MaxVersion to opt-in to relying on api versions negotiated usage instead.

e.g., for FetchRequest we've only currently implemented up to V11 but Kafka 2.7 and newer will advertise V12+ – so we need to include that in the sum.

So for MetadataRequest probably something like this:

func (m *MetadataRequest) restrictApiVersion(minVersion, maxVersion int16) error {
	maxEncodedVersion := min(10, maxVersion) // see existing isValidVersion() bound check for max supported by Sarama
	if m.Version < minVersion {
		return fmt.Errorf("%w: unsupported API version %d for %T, supported versions are %d-%d",
			ErrUnsupportedVersion, m.Version, m, minVersion, maxEncodedVersion)
	}
	m.Version = min(m.Version, maxEncodedVersion)
	return nil
}

@trapped
Copy link
Copy Markdown
Contributor Author

trapped commented May 27, 2025

Hey @dnwe, sorry for the delay 🙂 I've just pushed a commit to #3170 applying the change you requested to both Request and Response types - I'm not super familiar with the Sarama codebase/logic yet so thanks for clarifying what behavior it should implement!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants