-
-
Notifications
You must be signed in to change notification settings - Fork 52
Hang in AsyncQueue.join() #715
Copy link
Copy link
Closed
Description
Due to the way call_soon_threadsafe() works, calling SyncQueue.task_done() has one side effect: a race is possible between queue._finished.set() and queue._finished.clear(). As a result, queue._finished (asyncio.Event instance) can be set even when there were more put() calls than task_done() calls, causing an infinite loop with a complete blocking of the event loop and high CPU load:
queue = janus.Queue()
await queue.async_q.put(1) # queue._finished.clear()
assert queue.async_q.unfinished_tasks == 1
queue.sync_q.task_done() # call_soon_threadsafe(queue._finished.set)
assert queue.async_q.unfinished_tasks == 0
await queue.async_q.put(2) # queue._finished.clear()
assert queue.async_q.unfinished_tasks == 1
async def func():
print(1) # 1
await asyncio.sleep(0)
print(2) # will never be printed!
asyncio.create_task(func())
await queue.async_q.join() # infinite loop!A possible solution would be to replace asyncio.Event with asyncio.Condition.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels