-
-
Notifications
You must be signed in to change notification settings - Fork 750
Closed
Labels
bugSomething is brokenSomething is brokentestsUnit tests and/or continuous integrationUnit tests and/or continuous integration
Description
distributed/distributed/worker.py
Lines 1517 to 1521 in 1cbee7f
| if isinstance(executor, ThreadPoolExecutor): | |
| executor._work_queue.queue.clear() | |
| executor.shutdown(wait=executor_wait, timeout=timeout) | |
| else: | |
| executor.shutdown(wait=executor_wait) |
shutdown is a blocking call that generally calls join on the thread/process. If the executor takes a long time to shut down (say there's a function running that's itself blocked on something), this can block the worker event loop for 30s, or whatever the timeout is.
This is particularly important for our test suite, because the worker event loop is actually the only event loop.
I discovered this trying to run a test something like
event = distributed.Event()
f = client.submit(event.wait, workers=[a.address])
t = asyncio.create_task(a.close())
await asyncio.sleep(1)
await event.set() # this hangs!
# because the whole event loop is blocked waiting for the ThreadPoolExecutor to shut down,
# which is waiting for the event to be set... which is waiting for the event loop to be freeThis probably won't affect real-life clusters that much, but would be good to clean up.
cc @graingert
Metadata
Metadata
Assignees
Labels
bugSomething is brokenSomething is brokentestsUnit tests and/or continuous integrationUnit tests and/or continuous integration