Skip to content

Add UAMI OAuth changes#2189

Merged
Justin Wang (Claimundefine) merged 11 commits intomasterfrom
uami_sr
Feb 20, 2026
Merged

Add UAMI OAuth changes#2189
Justin Wang (Claimundefine) merged 11 commits intomasterfrom
uami_sr

Conversation

@Claimundefine
Copy link
Copy Markdown
Contributor

@Claimundefine Justin Wang (Claimundefine) commented Feb 17, 2026

What

Checklist

  • Contains customer facing changes? Including API/behavior changes
  • Did you add sufficient unit test and/or integration test coverage for this PR?
    • If not, please explain why it is not required

References

JIRA:

https://confluentinc.atlassian.net/browse/DGS-23492

Test & Review

Open questions / Follow-ups

Copilot AI review requested due to automatic review settings February 17, 2026 03:45
@confluent-cla-assistant
Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request adds support for Azure Instance Metadata Service (IMDS) OAuth authentication (User Assigned Managed Identity) to the Kafka Schema Registry clients. It refactors the OAuth bearer field provider architecture to support multiple authentication methods through a builder pattern, consolidating common OAuth logic into a new shared module.

Changes:

  • Introduces OAUTHBEARER_AZURE_IMDS as a new authentication method for Azure IMDS-based credential-less authentication
  • Refactors OAuth bearer field provider classes into an abstract builder pattern with separate sync and async implementations
  • Moves common OAuth logic to a new _oauthbearer.py module to reduce code duplication
  • Updates tests to cover the new Azure IMDS authentication flow and refactored architecture
  • Updates examples to demonstrate Azure IMDS OAuth configuration

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
tests/schema_registry/_sync/test_config.py Reformatted dict definitions and added comprehensive tests for Azure IMDS OAuth configuration validation
tests/schema_registry/_sync/test_bearer_field_provider.py Updated tests to use new token_object attribute and added tests for Azure IMDS client
tests/schema_registry/_async/test_config.py Added Azure IMDS OAuth tests and reformatted existing tests (contains duplicate function)
tests/schema_registry/_async/test_bearer_field_provider.py Updated async bearer field provider tests for refactored architecture
src/confluent_kafka/schema_registry/common/schema_registry_client.py Removed bearer field provider classes that were moved to separate module
src/confluent_kafka/schema_registry/common/_oauthbearer.py New module containing abstract base classes and builders for OAuth bearer authentication
src/confluent_kafka/schema_registry/_sync/schema_registry_client.py Refactored to use new builder pattern, added Azure IMDS client implementation
src/confluent_kafka/schema_registry/_async/schema_registry_client.py Refactored async client with new builder pattern and Azure IMDS support
examples/oauth_schema_registry.py Added Azure IMDS OAuth configuration example
examples/oauth_oidc_ccloud_azure_imds_producer.py Enhanced example to include Schema Registry integration with Azure IMDS
Comments suppressed due to low confidence (1)

tests/schema_registry/_async/test_config.py:345

  • Duplicate test function definition. The function test_custom_bearer_config is defined twice (lines 333-338 and 341-345) with identical implementations. The second definition will shadow the first, causing the first test to never run. One of these duplicate definitions should be removed.
def test_custom_bearer_config():
    conf = {'url': TEST_URL, 'bearer.auth.credentials.source': 'CUSTOM'}

    with pytest.raises(ValueError, match='Missing required custom OAuth configuration properties:'):
        AsyncSchemaRegistryClient(conf)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

class _AsyncCustomOAuthClient(_AsyncBearerFieldProvider):
def __init__(self, custom_function: Callable[[Dict], Awaitable[Dict]], custom_config: dict):
class _AsyncCustomOAuthClient(_BearerFieldProvider):
def __init__(self, custom_function: Callable[[Dict], Dict], custom_config: dict):
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect type annotation for async custom function. The custom_function parameter is typed as Callable[[Dict], Dict], but on line 95 the code awaits the result with return await self.custom_function(self.custom_config). The type should be Callable[[Dict], Awaitable[Dict]] to match the async usage.

Suggested change
def __init__(self, custom_function: Callable[[Dict], Dict], custom_config: dict):
def __init__(self, custom_function: Callable[[Dict], Awaitable[Dict]], custom_config: dict):

Copilot uses AI. Check for mistakes.
Comment on lines +178 to +180
self.token_object = await self.client.get(self.token_endpoint, headers=[
('Metadata', 'true')
]).json()
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing parentheses in async await expression. The expression await self.client.get(...).json() is parsed as await (self.client.get(...).json()) due to operator precedence, which attempts to call .json() on a coroutine before awaiting it. This will raise an AttributeError. The code should be (await self.client.get(...)).json() with explicit parentheses to await the coroutine first, then call .json() on the resulting Response object.

Suggested change
self.token_object = await self.client.get(self.token_endpoint, headers=[
('Metadata', 'true')
]).json()
self.token_object = (await self.client.get(
self.token_endpoint,
headers=[('Metadata', 'true')],
)).json()

Copilot uses AI. Check for mistakes.
import certifi
import httpx
from authlib.integrations.httpx_client import OAuth2Client
from typing import List, Dict, Optional, Union, Any, Callable
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate import statement. The typing module imports are duplicated on lines 26 and 32. Line 32 imports List, Dict, Optional, Union, Any, Callable which are already imported on line 26. Remove the duplicate import on line 32.

Suggested change
from typing import List, Dict, Optional, Union, Any, Callable

Copilot uses AI. Check for mistakes.

# This example uses Azure IMDS for credential-less authentication
# to Kafka on Confluent Cloud
# This example use Azure IMDS for credential-less authentication
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammatical error in comment. The comment "This example use Azure IMDS" should be "This example uses Azure IMDS" (uses, not use).

Suggested change
# This example use Azure IMDS for credential-less authentication
# This example uses Azure IMDS for credential-less authentication

Copilot uses AI. Check for mistakes.
@@ -146,6 +142,110 @@ def generate_access_token(self) -> None:
time.sleep(full_jitter(self.retries_wait_ms, self.retries_max_wait_ms, i) / 1000)
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect method call in abstract class. On line 135 within generate_access_token, the code calls self.client.fetch_token(url=self.token_endpoint, grant_type='client_credentials'), but self.client and self.token_endpoint are not defined in _AbstractOAuthClient. This should call the abstract method self.fetch_token() instead, which subclasses properly implement.

Copilot uses AI. Check for mistakes.
Comment on lines +159 to +161
required_properties = ['bearer.auth.client.id', 'bearer.auth.client.secret',
'bearer.auth.scope',
'bearer.auth.issuer.endpoint.url']
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect required_properties for static token builder. The _StaticOAuthBearerFieldProviderBuilder class defines required_properties that include OAuth client credentials (bearer.auth.client.id, bearer.auth.client.secret, bearer.auth.scope, bearer.auth.issuer.endpoint.url), but these properties are not relevant for a static token authentication flow. These properties should be removed, as static token authentication only requires the token itself (which is validated separately in the _validate method).

Suggested change
required_properties = ['bearer.auth.client.id', 'bearer.auth.client.secret',
'bearer.auth.scope',
'bearer.auth.issuer.endpoint.url']
required_properties = []

Copilot uses AI. Check for mistakes.
schema_registry_conf = schema_registry_config(args)
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

string_serializer = StringSerializer('utf_8')
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable string_serializer is initialized twice. It's first initialized on line 137, and then reinitialized on line 165 with the same value. Remove the duplicate initialization on line 165.

Suggested change
string_serializer = StringSerializer('utf_8')

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this


Args:
err (KafkaError): The error that occurred, or None on success.
err (KafkaError): The error that occurred on None on success.
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammatical error in docstring. The text "The error that occurred on None on success" should be "The error that occurred, or None on success" (with a comma after "occurred").

Suggested change
err (KafkaError): The error that occurred on None on success.
err (KafkaError): The error that occurred, or None on success.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 typo

parser.add_argument('-b', dest="bootstrap_servers", required=True, help="Bootstrap broker(s) (host[:port])")
parser.add_argument('-t', dest="topic", default="example_producer_oauth", help="Topic name")
parser.add_argument('--query', dest="query", required=True, help="Query parameters for Azure IMDS token endpoint")
parser = argparse.ArgumentParser(description="OAUTH example with client credentials grant")
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent argument description. The argument parser description on line 191 says "OAUTH example with client credentials grant", but this example is about Azure IMDS authentication (which doesn't use client credentials). The description should be updated to accurately reflect that this is an Azure IMDS OAuth example.

Suggested change
parser = argparse.ArgumentParser(description="OAUTH example with client credentials grant")
parser = argparse.ArgumentParser(description="Azure IMDS OAuth example with credential-less authentication")

Copilot uses AI. Check for mistakes.

# This example uses Azure IMDS for credential-less authentication
# to Kafka on Confluent Cloud
# This example use Azure IMDS for credential-less authentication
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 nit typo

self.favorite_number = favorite_number
self.favorite_color = favorite_color
# address should not be serialized, see user_to_dict()
self._address = address
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit confused by the address field. Is it related to the UAMI oauth feature?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, I think the example was just written as an example of creating an object and only serializing certain fields to be passed into Kafka

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Saw the address(str): User's address; confidential comment. I am okay with this as it's just mimicking some common serialization use case

Comment thread src/confluent_kafka/schema_registry/common/_oauthbearer.py Outdated

# This example uses Azure IMDS for credential-less authentication
# to Kafka on Confluent Cloud
# through to Schema Registry on Confluent Cloud
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove through

Copy link
Copy Markdown
Member

@fangnx Naxin Fang (fangnx) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, other than typos in comments

@sonarqube-confluent
Copy link
Copy Markdown

@Claimundefine Justin Wang (Claimundefine) merged commit 612349e into master Feb 20, 2026
2 of 3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants