Skip to content

Resolve API versions on connection#1136

Merged
ods merged 6 commits intoaio-libs:masterfrom
vmaurin:solve_versions_on_connection
Nov 27, 2025
Merged

Resolve API versions on connection#1136
ods merged 6 commits intoaio-libs:masterfrom
vmaurin:solve_versions_on_connection

Conversation

@vmaurin
Copy link
Copy Markdown
Contributor

@vmaurin vmaurin commented Nov 18, 2025

As stated in the Kafka protocol documentation https://kafka.apache.org/protocol#api_versions the API versions should be solved for each single connection we open with a broker (the broker might be updated while we talk to it).

This commit aim to move all the versions logic into the protocol and the connection only instead of being spread in various classes.

It is introducing for that a new "Request" class that is acting as a builder for the Struct we are going to send into the wire. The Request class tries to collect all possible parameters, and then will react according the best available version in two possible ways:

  • failing, as the intent of the caller was clearly to use a new feature that is not available on the broker yet
  • best effort, as the caller can totally work with an older version of the API

This choice is made on per API basis.

From the caller perspective, if a different logic need to be ran according the API version, it can look up on the Response.API_VERSION

Doing such a change will help aiokafka supporting Kafka 4.0 as some API versions were removed as mentioned in #1085

It might not achieve the compatibility as we could lack some protocol definition, but it will definitively help.

Changes

Fixes #

Checklist

  • I think the code is well written
  • Unit tests for the changes exist
  • Documentation reflects the changes
  • Add a new news fragment into the CHANGES folder
    • name it <issue_id>.<type> (e.g. 588.bugfix)
    • if you don't have an issue_id change it to the pr id after creating the PR
    • ensure type is one of the following:
      • .feature: Signifying a new feature.
      • .bugfix: Signifying a bug fix.
      • .doc: Signifying a documentation improvement.
      • .removal: Signifying a deprecation or removal of public API.
      • .misc: A ticket has been closed, but it is not of interest to users.
    • Make sure to use full sentences with correct case and punctuation, for example: Fix issue with non-ascii contents in doctest text files.

@vmaurin vmaurin marked this pull request as draft November 18, 2025 16:59
Comment thread aiokafka/client.py Fixed
@vmaurin vmaurin force-pushed the solve_versions_on_connection branch 8 times, most recently from 22f0102 to 1033aa4 Compare November 19, 2025 11:29
Comment thread tests/test_consumer.py
self.assertEqual(rmsg1.timestamp, None)
self.assertEqual(rmsg1.timestamp_type, None)
self.assertNotEqual(rmsg1.timestamp, None)
self.assertTrue(rmsg1.timestamp >= s_time_ms)

Check notice

Code scanning / CodeQL

Imprecise assert Note test

assertTrue(a >= b) cannot provide an informative message. Using assertGreaterEqual(a, b) instead will give more informative messages.
@vmaurin vmaurin force-pushed the solve_versions_on_connection branch 7 times, most recently from 829c29b to 6a5edf6 Compare November 19, 2025 14:28
@codecov
Copy link
Copy Markdown

codecov Bot commented Nov 19, 2025

Codecov Report

❌ Patch coverage is 98.32134% with 14 lines in your changes missing coverage. Please review.
✅ Project coverage is 94.85%. Comparing base (5ec91e7) to head (5e22d32).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
tests/test_conn.py 84.84% 2 Missing and 3 partials ⚠️
aiokafka/protocol/api.py 92.68% 2 Missing and 1 partial ⚠️
aiokafka/protocol/group.py 96.72% 1 Missing and 1 partial ⚠️
aiokafka/admin/client.py 92.85% 1 Missing ⚠️
aiokafka/cluster.py 0.00% 1 Missing ⚠️
aiokafka/producer/sender.py 90.90% 1 Missing ⚠️
aiokafka/protocol/message.py 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #1136      +/-   ##
==========================================
- Coverage   95.79%   94.85%   -0.95%     
==========================================
  Files          88       88              
  Lines       15834    15671     -163     
  Branches     1425     1374      -51     
==========================================
- Hits        15168    14864     -304     
- Misses        411      563     +152     
+ Partials      255      244      -11     
Flag Coverage Δ
cext 94.81% <98.20%> (-0.79%) ⬇️
integration 94.73% <98.20%> (-0.79%) ⬇️
purepy 94.81% <98.20%> (-0.79%) ⬇️
unit 51.28% <79.25%> (+0.51%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@vmaurin vmaurin force-pushed the solve_versions_on_connection branch 7 times, most recently from c04dded to a62d7a6 Compare November 19, 2025 22:00
@vmaurin vmaurin changed the title Solve API version by connection Resolve API version by connection Nov 19, 2025
@vmaurin vmaurin changed the title Resolve API version by connection Resolve API versions by connection Nov 19, 2025
@vmaurin vmaurin changed the title Resolve API versions by connection Resolve API versions on connection Nov 19, 2025
@vmaurin vmaurin force-pushed the solve_versions_on_connection branch from a62d7a6 to 710a348 Compare November 19, 2025 22:21
@ods
Copy link
Copy Markdown
Collaborator

ods commented Nov 25, 2025

@vmaurin @ijuma Are there any other questions, or shall I proceed with the merge?

@ods
Copy link
Copy Markdown
Collaborator

ods commented Nov 25, 2025

@vmaurin I see one test became flaky with "BrokenPipeError: [Errno 32] Broken pipe" error after this change, while it succeeds for other your branch. Any idea why it may happen?

@vmaurin
Copy link
Copy Markdown
Contributor Author

vmaurin commented Nov 25, 2025

@vmaurin @ijuma Are there any other questions, or shall I proceed with the merge?

I am still a bit worried by the failing test, but I just can't reproduce it locally. I am seen something weird with the writer drain #1138 but it is probably not related

@vmaurin
Copy link
Copy Markdown
Contributor Author

vmaurin commented Nov 25, 2025

@vmaurin I see one test became flaky with "BrokenPipeError: [Errno 32] Broken pipe" error after this change, while it succeeds for other your branch. Any idea why it may happen?

I have a lead @ods . I fix a garbage collection issue of the connection being passed to log.debug that was preventing the proper collection. But as it is not held by the log, it is probably getting collected to early here. Let me see if I can find a fix

@vmaurin vmaurin force-pushed the solve_versions_on_connection branch 2 times, most recently from f2f76cf to e86cff6 Compare November 25, 2025 20:19
@ods
Copy link
Copy Markdown
Collaborator

ods commented Nov 26, 2025

I fix a garbage collection issue of the connection being passed to log.debug that was preventing the proper collection. But as it is not held by the log, it is probably getting collected to early here.

Could you elaborate please? How log.debug() can prevent collection?

@vmaurin
Copy link
Copy Markdown
Contributor Author

vmaurin commented Nov 26, 2025

Could you elaborate please? How log.debug() can prevent collection?

This test was failing on CI while running on my local

async def test_conn_warn_unclosed(self):

After investigating, it appears it was related to the log level used for running the test, appearing only with level == DEBUG.

In the code, there are statement like these

log.debug("%s Request %d: %s", self, correlation_id, request)

that are passing self (here a connection) to the log layer. The log layer might hold a reference long enough then to prevent a garbage collection.

I solved the issue by using

        log.debug(
            "%s Request %d: %s", weakref.ref(self), correlation_id, request_struct
        )

Unfortunately, I tried a commit here to fix the test e86cff6 but it haven't made things better, I need to investigate further

@vmaurin vmaurin force-pushed the solve_versions_on_connection branch from e86cff6 to a9b30c7 Compare November 26, 2025 08:20
@ods
Copy link
Copy Markdown
Collaborator

ods commented Nov 26, 2025

Have you tried using repr(self) instead of weakref.ref(self)? I'd prefer to keep lazy evaluation, but here it's not expensive here and this class defines cleanup in __del__ (which we shouldn't rely on, but it's not a subject of this PR).

@ods
Copy link
Copy Markdown
Collaborator

ods commented Nov 26, 2025

BTW, using -s --log-level DEBUG didn't help me to reproduce this test failure locally.

@vmaurin vmaurin force-pushed the solve_versions_on_connection branch from a9b30c7 to 1913063 Compare November 26, 2025 09:20
@vmaurin
Copy link
Copy Markdown
Contributor Author

vmaurin commented Nov 26, 2025

It is some kind of flaky behavior if the logging is happening and releasing the resource before or after the "del conn" in the integration test. I have updated the code to use repr and also add a conditional to avoid paying too much resource to compute the repr.

I still have the flaky test with the broken pipe to solve

@vmaurin
Copy link
Copy Markdown
Contributor Author

vmaurin commented Nov 26, 2025

The main difference I am seeing at the moment, is that before, this test was not performing the api versions request/response roundtrip (so it was purely writing data on the socket)

Now we have:

  • API versions requests
  • API versions response
  • Produce without responses requests

@ods
Copy link
Copy Markdown
Collaborator

ods commented Nov 26, 2025

It is some kind of flaky behavior if the logging is happening and releasing the resource before or after the "del conn" in the integration test.

Do you see a way to make the problem reproducable locally, e.g. by inserting sleep somewhere?

@vmaurin
Copy link
Copy Markdown
Contributor Author

vmaurin commented Nov 26, 2025

For me (and the CI) it was systematic. Maybe playing with the log handler configuration to have something slower here ? But at least this one is fixed, and it is probably a good thing to not leak references to the logger layer (we cannot know how fast is handled)

@vmaurin
Copy link
Copy Markdown
Contributor Author

vmaurin commented Nov 26, 2025

For the current flaky tests, it has nothing to do with garbage collection. I have dumped the server logs and we do have an issue

INFO [KafkaApi-0] Closing connection due to error during produce request with correlation id 2 from client id aiokafka with ack=0
Topic and partition to exceptions: b'foo'-0 -> org.apache.kafka.common.errors.UnknownTopicOrPartitionException (kafka.server.KafkaApis)

So the server is closing the connection, resulting in the broken pipe. In my local, it is not fast enough to close the connection before the test end

@vmaurin vmaurin force-pushed the solve_versions_on_connection branch 2 times, most recently from 1e55a44 to 9996c9a Compare November 26, 2025 12:08
@vmaurin vmaurin force-pushed the solve_versions_on_connection branch from 9996c9a to 08bbfc9 Compare November 26, 2025 13:50
Comment thread tests/test_requests.py
def test_creating_invalid_request_classes():
with pytest.raises(TypeError):

class MissingFieldRequest(Request):

Check notice

Code scanning / CodeQL

Unused local variable Note test

Variable MissingFieldRequest is not used.
Comment thread tests/test_requests.py

with pytest.raises(TypeError):

class WrongInheritence(int, Request):

Check notice

Code scanning / CodeQL

Unused local variable Note test

Variable WrongInheritence is not used.
Comment thread aiokafka/conn.py Outdated
Comment thread aiokafka/conn.py Outdated
Comment thread aiokafka/conn.py Outdated
@ods
Copy link
Copy Markdown
Collaborator

ods commented Nov 27, 2025

Thanks! Great work. Let me know when you think we’re ready to merge.

@vmaurin
Copy link
Copy Markdown
Contributor Author

vmaurin commented Nov 27, 2025

Thanks! Great work. Let me know when you think we’re ready to merge.

I guess it is ready. I did a sanity check on my test project (so running with producer/consumer/transactional loop/admin on a cluster compose of 3 workers) and I didn't see errors

@vmaurin vmaurin force-pushed the solve_versions_on_connection branch from 54b7eaa to 5e22d32 Compare November 27, 2025 11:12
@ods ods merged commit 7d0bd25 into aio-libs:master Nov 27, 2025
25 of 26 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants