-
-
Notifications
You must be signed in to change notification settings - Fork 750
Open
Labels
flaky testIntermittent failures on CI.Intermittent failures on CI.
Description
Details
____________________________ test_wait_for_outgoing ____________________________
3406
3407 def test_func():
3408 result = None
3409 workers = []
3410 with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
3411
3412 async def coro():
3413 with dask.config.set(config):
3414 s = False
3415 for i in range(5):
3416 try:
3417 s, ws = await start_cluster(
3418 nthreads,
3419 scheduler,
3420 loop,
3421 security=security,
3422 Worker=Worker,
3423 scheduler_kwargs=scheduler_kwargs,
3424 worker_kwargs=worker_kwargs,
3425 )
3426 except Exception as e:
3427 logger.error(
3428 "Failed to start gen_cluster, retrying",
3429 exc_info=True,
3430 )
3431 await asyncio.sleep(1)
3432 else:
3433 workers[:] = ws
3434 args = [s] + workers
3435 break
3436 if s is False:
3437 raise Exception("Could not start cluster")
3438 if client:
3439 c = await Client(
3440 s.address,
3441 loop=loop,
3442 security=security,
3443 asynchronous=True,
3444 **client_kwargs,
3445 )
3446 args = [c] + args
3447 try:
3448 future = func(*args)
3449 if timeout:
3450 future = asyncio.wait_for(future, timeout)
3451 result = await future
3452 if s.validate:
3453 s.validate_state()
3454 finally:
3455 if client and c.status not in ("closing", "closed"):
3456 await c._close(fast=s.status == Status.closed)
3457 await end_cluster(s, workers)
3458 await asyncio.wait_for(cleanup_global_workers(), 1)
3459
3460 try:
3461 c = await default_client()
3462 except ValueError:
3463 pass
3464 else:
3465 await c._close(fast=True)
3466
3467 def get_unclosed():
3468 return [c for c in Comm._instances if not c.closed()] + [
3469 c
3470 for c in _global_clients.values()
3471 if c.status != "closed"
3472 ]
3473
3474 try:
3475 start = time()
3476 while time() < start + 5:
3477 gc.collect()
3478 if not get_unclosed():
3479 break
3480 await asyncio.sleep(0.05)
3481 else:
3482 if allow_unclosed:
3483 print(f"Unclosed Comms: {get_unclosed()}")
3484 else:
3485 raise RuntimeError("Unclosed Comms", get_unclosed())
3486 finally:
3487 Comm._instances.clear()
3488 _global_clients.clear()
3489
3490 return result
3491
3492 result = loop.run_sync(
3493> coro, timeout=timeout * 2 if timeout else timeout
3494 )
3495
3496distributed/utils_test.py:954:
3497_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
3498../../../miniconda/envs/dask-distributed/lib/python3.7/site-packages/tornado/ioloop.py:530: in run_sync
3499 return future_cell[0].result()
3500distributed/utils_test.py:912: in coro
3501 result = await future
3502../../../miniconda/envs/dask-distributed/lib/python3.7/asyncio/tasks.py:442: in wait_for
3503 return fut.result()
3504distributed/tests/test_worker.py:1316: in test_wait_for_outgoing
3505 future = await c.scatter(x, workers=a.address)
3506distributed/client.py:2077: in _scatter
3507 nthreads, data2, report=False, rpc=self.rpc
3508distributed/utils_comm.py:149: in scatter_to_workers
3509 for address, v in d.items()
3510distributed/utils.py:229: in All
3511 result = await tasks.next()
3512distributed/core.py:852: in send_recv_from_rpc
3513 result = await send_recv(comm=comm, op=key, **kwargs)
3514distributed/core.py:651: in send_recv
3515 raise exc.with_traceback(tb)
3516distributed/core.py:422: in handle_comm
3517 msg = await comm.read()
3518distributed/comm/tcp.py:216: in read
3519 allow_offload=self.allow_offload,
3520distributed/comm/utils.py:78: in from_frames
3521 res = await offload(_from_frames)
3522distributed/utils.py:1455: in offload
3523 return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
3524../../../miniconda/envs/dask-distributed/lib/python3.7/asyncio/base_events.py:755: in run_in_executor
3525 executor.submit(func, *args), loop=self)
3526_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
3527
3528> raise RuntimeError('cannot schedule new futures after shutdown')
3529E RuntimeError: cannot schedule new futures after shutdown
3530
3531../../../miniconda/envs/dask-distributed/lib/python3.7/concurrent/futures/thread.py:163: RuntimeError
Metadata
Metadata
Assignees
Labels
flaky testIntermittent failures on CI.Intermittent failures on CI.