QueueIterator raises StopAsyncIteration when iterator/channel is closed.#615
Conversation
a5180c0 to
15ca5ee
Compare
9ea9416 to
f12ecfb
Compare
a2e3647 to
5652d2d
Compare
5652d2d to
71fc125
Compare
|
@mosquito ping |
|
@Darsstar this request contains a lot of changes that could potentially breaks backward compatibility. I'm still thinking about how to test it so that I understand I need to make a separate major release, or make do with a minor one. |
I assume you are refering to:
9.4.0 already dropped Python 3.7. The changes taking advantage of 3.8, all contained in a single commit, were made in a way it should be backward compatible. It probably should be a new major version due to the protected properties. Although, I think those could be rewritten as |
709ecc8 to
8155901
Compare
|
I rebased, which added the 3.12 tests, and now |
mosquito
left a comment
There was a problem hiding this comment.
Thank you for not giving up on this improvement, I'm sorry that I don't have much time to devote to it right now. But I'm open to discussion.
|
Small update: I am currently under the impression this PR isn't at fault. I branched from I don't have a strong lead, but I started looking at |
8155901 to
6ec53f1
Compare
All good, we appreciate your diligent work in supporting this project. |
|
Unsurprisingly fixing this requires more code and therefor increases the runtime. @gglluukk's benchmark (thanks!) show about 10% runtime degredation: |
It would be great to get the time down but I am personally willing to accept a 10% degradation in order to not have a memory leak. |
I don't see optimisation potential without porting the library to C/C++/Rust. (While keeping the implementation correct.) |
Hopefully PyO3 gets better support for async/await, while maintaining interoperability with asyncio. There is a lot of active work on this right now. Once that happens, using Rust will be feasible. In the meantime, losing 10% in performance is worthwhile to make sure I dont get OOM kills. |
|
You should do a performance test with cProfile for example. The last time I did this, it marshall in pamqp the slowest one. |
9abe45f to
6c744ba
Compare
|
I guess we should just finish to this already. Will you fix the code style? |
db55f50 to
ea449ba
Compare
Done |
|
I finally profiled it... Python file I used to profileimport asyncio
import cProfile
from aio_pika import Message
from aio_pika.robust_connection import connect_robust
NUMBER_OF_MESSAGES = 1_000_000
# NUMBER_OF_MESSAGES = 1_000
async def main() -> None:
profiler = cProfile.Profile(
timer=asyncio.get_event_loop().time,
)
async with await connect_robust(host='localhost', port=5672, login='guest', password='guest') as conn:
async with await conn.channel() as channel:
exchange = channel.default_exchange
queue = await channel.declare_queue('queue')
pending = set()
for i in range(NUMBER_OF_MESSAGES):
pending.add(
asyncio.create_task(
exchange.publish(
message=Message(
body=f"message {i}".encode(encoding='utf-8'),
),
routing_key=queue.name,
mandatory=True,
)
)
)
await asyncio.wait(pending)
profiler.enable()
count = 0
async with queue.iterator() as queue_iter:
async for message in queue_iter:
await message.ack()
count += 1
if count == NUMBER_OF_MESSAGES:
await queue_iter.close()
break
profiler.disable()
profiler.print_stats()
profiler.dump_stats('D:/PR.stats')
if __name__ == '__main__':
asyncio.run(main())Regarding I also made |
ac79041 to
e39a628
Compare
e39a628 to
c2bf331
Compare
The first bug is `Channel` passing `Optional[BaseException]` to `self.close()` while `RobustChannel` passed `asyncio.Future` The second is registering a `CallbackCollection` instance as a callback for a different `CallbackCollection`. (Which was not supported before)
c2bf331 to
1acd87e
Compare
|
gentle ping with this PR, guys. any ETA on the merge? looking forward to see this released |
|
@mosquito think you will have time before the end of the year? (It's been a month since the last ping, and the end of the year is about a months away if I factor in holidays. No more significance than that.) If a face to face review has your preference that seems doable. My work is one metro stop away from the Nebius offices. I don't mind paying for a drink. |
I am definitely willing to chip into the beer fund if that means this gets merged!!! |
|
Thanks guys, sorry for taking so long to respond. These changes have already been published in version 9.5.0 |
See #358
Currently QueueIterator never throws a StopAsyncIterator exception. Not when the channel is closed, and not after QueueIterator's close method is called.
Which implies that starting an
async for message in queue.iterator():loop will keep running "forever" even if no new message will ever arrive. ("forever" because the asyncio task can be canceled, etc.)This PR fixes that in a backwards compatible way. Some tests are refactored to rely on this implicitly, and new ones that explicitly test QueueIterator.anext() throws certain exceptions have been added as well.