@@ -1212,6 +1212,8 @@ async def _reconnect(self):
12121212 assert self .scheduler_comm .comm .closed ()
12131213
12141214 self .status = "connecting"
1215+ if self .scheduler_comm :
1216+ await self .scheduler_comm .close ()
12151217 self .scheduler_comm = None
12161218
12171219 for st in self .futures .values ():
@@ -1287,6 +1289,7 @@ async def _ensure_connected(self, timeout=None):
12871289 if msg [0 ].get ("warning" ):
12881290 warnings .warn (version_module .VersionMismatchWarning (msg [0 ]["warning" ]))
12891291
1292+ assert not self .scheduler_comm , self .scheduler_comm
12901293 bcomm = BatchedSend (interval = "10ms" , name = "Client" )
12911294 bcomm .start (comm )
12921295 self .scheduler_comm = bcomm
@@ -1514,13 +1517,11 @@ async def _close(self, fast=False):
15141517 if self .get == dask .config .get ("get" , None ):
15151518 del dask .config .config ["get" ]
15161519
1517- if (
1518- self .scheduler_comm
1519- and self .scheduler_comm .comm
1520- and not self .scheduler_comm .comm .closed ()
1521- ):
1520+ if self .scheduler_comm :
15221521 self ._send_to_scheduler ({"op" : "close-client" })
15231522 self ._send_to_scheduler ({"op" : "close-stream" })
1523+ await self .scheduler_comm .close ()
1524+ self .scheduler_comm = None
15241525
15251526 current_task = asyncio .current_task ()
15261527 handle_report_task = self ._handle_report_task
@@ -1533,9 +1534,6 @@ async def _close(self, fast=False):
15331534 with suppress (asyncio .CancelledError , TimeoutError ):
15341535 await asyncio .wait_for (asyncio .shield (handle_report_task ), 0.1 )
15351536
1536- if self .scheduler_comm :
1537- await self .scheduler_comm .close ()
1538-
15391537 for key in list (self .futures ):
15401538 self ._release_key (key = key )
15411539
0 commit comments