Add UAMI OAuth changes#2189
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Pull request overview
This pull request adds support for Azure Instance Metadata Service (IMDS) OAuth authentication (User Assigned Managed Identity) to the Kafka Schema Registry clients. It refactors the OAuth bearer field provider architecture to support multiple authentication methods through a builder pattern, consolidating common OAuth logic into a new shared module.
Changes:
- Introduces
OAUTHBEARER_AZURE_IMDSas a new authentication method for Azure IMDS-based credential-less authentication - Refactors OAuth bearer field provider classes into an abstract builder pattern with separate sync and async implementations
- Moves common OAuth logic to a new
_oauthbearer.pymodule to reduce code duplication - Updates tests to cover the new Azure IMDS authentication flow and refactored architecture
- Updates examples to demonstrate Azure IMDS OAuth configuration
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/schema_registry/_sync/test_config.py | Reformatted dict definitions and added comprehensive tests for Azure IMDS OAuth configuration validation |
| tests/schema_registry/_sync/test_bearer_field_provider.py | Updated tests to use new token_object attribute and added tests for Azure IMDS client |
| tests/schema_registry/_async/test_config.py | Added Azure IMDS OAuth tests and reformatted existing tests (contains duplicate function) |
| tests/schema_registry/_async/test_bearer_field_provider.py | Updated async bearer field provider tests for refactored architecture |
| src/confluent_kafka/schema_registry/common/schema_registry_client.py | Removed bearer field provider classes that were moved to separate module |
| src/confluent_kafka/schema_registry/common/_oauthbearer.py | New module containing abstract base classes and builders for OAuth bearer authentication |
| src/confluent_kafka/schema_registry/_sync/schema_registry_client.py | Refactored to use new builder pattern, added Azure IMDS client implementation |
| src/confluent_kafka/schema_registry/_async/schema_registry_client.py | Refactored async client with new builder pattern and Azure IMDS support |
| examples/oauth_schema_registry.py | Added Azure IMDS OAuth configuration example |
| examples/oauth_oidc_ccloud_azure_imds_producer.py | Enhanced example to include Schema Registry integration with Azure IMDS |
Comments suppressed due to low confidence (1)
tests/schema_registry/_async/test_config.py:345
- Duplicate test function definition. The function
test_custom_bearer_configis defined twice (lines 333-338 and 341-345) with identical implementations. The second definition will shadow the first, causing the first test to never run. One of these duplicate definitions should be removed.
def test_custom_bearer_config():
conf = {'url': TEST_URL, 'bearer.auth.credentials.source': 'CUSTOM'}
with pytest.raises(ValueError, match='Missing required custom OAuth configuration properties:'):
AsyncSchemaRegistryClient(conf)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| class _AsyncCustomOAuthClient(_AsyncBearerFieldProvider): | ||
| def __init__(self, custom_function: Callable[[Dict], Awaitable[Dict]], custom_config: dict): | ||
| class _AsyncCustomOAuthClient(_BearerFieldProvider): | ||
| def __init__(self, custom_function: Callable[[Dict], Dict], custom_config: dict): |
There was a problem hiding this comment.
Incorrect type annotation for async custom function. The custom_function parameter is typed as Callable[[Dict], Dict], but on line 95 the code awaits the result with return await self.custom_function(self.custom_config). The type should be Callable[[Dict], Awaitable[Dict]] to match the async usage.
| def __init__(self, custom_function: Callable[[Dict], Dict], custom_config: dict): | |
| def __init__(self, custom_function: Callable[[Dict], Awaitable[Dict]], custom_config: dict): |
| self.token_object = await self.client.get(self.token_endpoint, headers=[ | ||
| ('Metadata', 'true') | ||
| ]).json() |
There was a problem hiding this comment.
Missing parentheses in async await expression. The expression await self.client.get(...).json() is parsed as await (self.client.get(...).json()) due to operator precedence, which attempts to call .json() on a coroutine before awaiting it. This will raise an AttributeError. The code should be (await self.client.get(...)).json() with explicit parentheses to await the coroutine first, then call .json() on the resulting Response object.
| self.token_object = await self.client.get(self.token_endpoint, headers=[ | |
| ('Metadata', 'true') | |
| ]).json() | |
| self.token_object = (await self.client.get( | |
| self.token_endpoint, | |
| headers=[('Metadata', 'true')], | |
| )).json() |
| import certifi | ||
| import httpx | ||
| from authlib.integrations.httpx_client import OAuth2Client | ||
| from typing import List, Dict, Optional, Union, Any, Callable |
There was a problem hiding this comment.
Duplicate import statement. The typing module imports are duplicated on lines 26 and 32. Line 32 imports List, Dict, Optional, Union, Any, Callable which are already imported on line 26. Remove the duplicate import on line 32.
| from typing import List, Dict, Optional, Union, Any, Callable |
|
|
||
| # This example uses Azure IMDS for credential-less authentication | ||
| # to Kafka on Confluent Cloud | ||
| # This example use Azure IMDS for credential-less authentication |
There was a problem hiding this comment.
Grammatical error in comment. The comment "This example use Azure IMDS" should be "This example uses Azure IMDS" (uses, not use).
| # This example use Azure IMDS for credential-less authentication | |
| # This example uses Azure IMDS for credential-less authentication |
| @@ -146,6 +142,110 @@ def generate_access_token(self) -> None: | |||
| time.sleep(full_jitter(self.retries_wait_ms, self.retries_max_wait_ms, i) / 1000) | |||
There was a problem hiding this comment.
Incorrect method call in abstract class. On line 135 within generate_access_token, the code calls self.client.fetch_token(url=self.token_endpoint, grant_type='client_credentials'), but self.client and self.token_endpoint are not defined in _AbstractOAuthClient. This should call the abstract method self.fetch_token() instead, which subclasses properly implement.
| required_properties = ['bearer.auth.client.id', 'bearer.auth.client.secret', | ||
| 'bearer.auth.scope', | ||
| 'bearer.auth.issuer.endpoint.url'] |
There was a problem hiding this comment.
Incorrect required_properties for static token builder. The _StaticOAuthBearerFieldProviderBuilder class defines required_properties that include OAuth client credentials (bearer.auth.client.id, bearer.auth.client.secret, bearer.auth.scope, bearer.auth.issuer.endpoint.url), but these properties are not relevant for a static token authentication flow. These properties should be removed, as static token authentication only requires the token itself (which is validated separately in the _validate method).
| required_properties = ['bearer.auth.client.id', 'bearer.auth.client.secret', | |
| 'bearer.auth.scope', | |
| 'bearer.auth.issuer.endpoint.url'] | |
| required_properties = [] |
| schema_registry_conf = schema_registry_config(args) | ||
| schema_registry_client = SchemaRegistryClient(schema_registry_conf) | ||
|
|
||
| string_serializer = StringSerializer('utf_8') |
There was a problem hiding this comment.
Variable string_serializer is initialized twice. It's first initialized on line 137, and then reinitialized on line 165 with the same value. Remove the duplicate initialization on line 165.
| string_serializer = StringSerializer('utf_8') |
|
|
||
| Args: | ||
| err (KafkaError): The error that occurred, or None on success. | ||
| err (KafkaError): The error that occurred on None on success. |
There was a problem hiding this comment.
Grammatical error in docstring. The text "The error that occurred on None on success" should be "The error that occurred, or None on success" (with a comma after "occurred").
| err (KafkaError): The error that occurred on None on success. | |
| err (KafkaError): The error that occurred, or None on success. |
| parser.add_argument('-b', dest="bootstrap_servers", required=True, help="Bootstrap broker(s) (host[:port])") | ||
| parser.add_argument('-t', dest="topic", default="example_producer_oauth", help="Topic name") | ||
| parser.add_argument('--query', dest="query", required=True, help="Query parameters for Azure IMDS token endpoint") | ||
| parser = argparse.ArgumentParser(description="OAUTH example with client credentials grant") |
There was a problem hiding this comment.
Inconsistent argument description. The argument parser description on line 191 says "OAUTH example with client credentials grant", but this example is about Azure IMDS authentication (which doesn't use client credentials). The description should be updated to accurately reflect that this is an Azure IMDS OAuth example.
| parser = argparse.ArgumentParser(description="OAUTH example with client credentials grant") | |
| parser = argparse.ArgumentParser(description="Azure IMDS OAuth example with credential-less authentication") |
|
|
||
| # This example uses Azure IMDS for credential-less authentication | ||
| # to Kafka on Confluent Cloud | ||
| # This example use Azure IMDS for credential-less authentication |
| self.favorite_number = favorite_number | ||
| self.favorite_color = favorite_color | ||
| # address should not be serialized, see user_to_dict() | ||
| self._address = address |
There was a problem hiding this comment.
A bit confused by the address field. Is it related to the UAMI oauth feature?
There was a problem hiding this comment.
I don't think so, I think the example was just written as an example of creating an object and only serializing certain fields to be passed into Kafka
There was a problem hiding this comment.
Got it. Saw the address(str): User's address; confidential comment. I am okay with this as it's just mimicking some common serialization use case
|
|
||
| # This example uses Azure IMDS for credential-less authentication | ||
| # to Kafka on Confluent Cloud | ||
| # through to Schema Registry on Confluent Cloud |
There was a problem hiding this comment.
nit: remove through
Naxin Fang (fangnx)
left a comment
There was a problem hiding this comment.
LGTM, other than typos in comments
|




What
Checklist
References
JIRA:
https://confluentinc.atlassian.net/browse/DGS-23492
Test & Review
Open questions / Follow-ups