Skip to content

Commit d49d5ca

Browse files
committed
Maybe fix leaking task from client
1 parent d2898fc commit d49d5ca

File tree

1 file changed

+6
-8
lines changed

1 file changed

+6
-8
lines changed

distributed/client.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1212,6 +1212,8 @@ async def _reconnect(self):
12121212
assert self.scheduler_comm.comm.closed()
12131213

12141214
self.status = "connecting"
1215+
if self.scheduler_comm:
1216+
await self.scheduler_comm.close()
12151217
self.scheduler_comm = None
12161218

12171219
for st in self.futures.values():
@@ -1287,6 +1289,7 @@ async def _ensure_connected(self, timeout=None):
12871289
if msg[0].get("warning"):
12881290
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
12891291

1292+
assert not self.scheduler_comm, self.scheduler_comm
12901293
bcomm = BatchedSend(interval="10ms", name="Client")
12911294
bcomm.start(comm)
12921295
self.scheduler_comm = bcomm
@@ -1514,13 +1517,11 @@ async def _close(self, fast=False):
15141517
if self.get == dask.config.get("get", None):
15151518
del dask.config.config["get"]
15161519

1517-
if (
1518-
self.scheduler_comm
1519-
and self.scheduler_comm.comm
1520-
and not self.scheduler_comm.comm.closed()
1521-
):
1520+
if self.scheduler_comm:
15221521
self._send_to_scheduler({"op": "close-client"})
15231522
self._send_to_scheduler({"op": "close-stream"})
1523+
await self.scheduler_comm.close()
1524+
self.scheduler_comm = None
15241525

15251526
current_task = asyncio.current_task()
15261527
handle_report_task = self._handle_report_task
@@ -1533,9 +1534,6 @@ async def _close(self, fast=False):
15331534
with suppress(asyncio.CancelledError, TimeoutError):
15341535
await asyncio.wait_for(asyncio.shield(handle_report_task), 0.1)
15351536

1536-
if self.scheduler_comm:
1537-
await self.scheduler_comm.close()
1538-
15391537
for key in list(self.futures):
15401538
self._release_key(key=key)
15411539

0 commit comments

Comments
 (0)