Skip to content

Commit d72d87a

Browse files
committed
Revert #5883
1 parent 2d3fddc commit d72d87a

File tree

2 files changed

+5
-48
lines changed

2 files changed

+5
-48
lines changed

distributed/tests/test_worker.py

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3306,37 +3306,3 @@ async def test_Worker__to_dict(c, s, a):
33063306
}
33073307
assert d["tasks"]["x"]["key"] == "x"
33083308
assert d["data"] == ["x"]
3309-
3310-
3311-
@gen_cluster(nthreads=[])
3312-
async def test_do_not_block_event_loop_during_shutdown(s):
3313-
loop = asyncio.get_running_loop()
3314-
called_handler = threading.Event()
3315-
block_handler = threading.Event()
3316-
3317-
w = await Worker(s.address)
3318-
executor = w.executors["default"]
3319-
3320-
# The block wait must be smaller than the test timeout and smaller than the
3321-
# default value for timeout in `Worker.close``
3322-
async def block():
3323-
def fn():
3324-
called_handler.set()
3325-
assert block_handler.wait(20)
3326-
3327-
await loop.run_in_executor(executor, fn)
3328-
3329-
async def set_future():
3330-
while True:
3331-
try:
3332-
await loop.run_in_executor(executor, sleep, 0.1)
3333-
except RuntimeError: # executor has started shutting down
3334-
block_handler.set()
3335-
return
3336-
3337-
async def close():
3338-
called_handler.wait()
3339-
# executor_wait is True by default but we want to be explicit here
3340-
await w.close(executor_wait=True)
3341-
3342-
await asyncio.gather(block(), close(), set_future())

distributed/worker.py

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
from distributed.comm import connect, get_address_host
5050
from distributed.comm.addressing import address_from_user_args, parse_address
5151
from distributed.comm.utils import OFFLOAD_THRESHOLD
52-
from distributed.compatibility import to_thread
5352
from distributed.core import (
5453
CommClosedError,
5554
Status,
@@ -1478,19 +1477,11 @@ async def close(
14781477
for executor in self.executors.values():
14791478
if executor is utils._offload_executor:
14801479
continue # Never shutdown the offload executor
1481-
1482-
def _close():
1483-
if isinstance(executor, ThreadPoolExecutor):
1484-
executor._work_queue.queue.clear()
1485-
executor.shutdown(wait=executor_wait, timeout=timeout)
1486-
else:
1487-
executor.shutdown(wait=executor_wait)
1488-
1489-
# Waiting for the shutdown can block the event loop causing
1490-
# weird deadlocks particularly if the task that is executing in
1491-
# the thread is waiting for a server reply, e.g. when using
1492-
# worker clients, semaphores, etc.
1493-
await to_thread(_close)
1480+
if isinstance(executor, ThreadPoolExecutor):
1481+
executor._work_queue.queue.clear()
1482+
executor.shutdown(wait=executor_wait, timeout=timeout)
1483+
else:
1484+
executor.shutdown(wait=executor_wait)
14941485

14951486
self.stop()
14961487
await self.rpc.close()

0 commit comments

Comments
 (0)