-
-
Notifications
You must be signed in to change notification settings - Fork 750
Open
Labels
flaky testIntermittent failures on CI.Intermittent failures on CI.
Description
Details:
2021-02-08T21:33:58.6228723Z ______________________________ test_idle_timeout ______________________________
2021-02-08T21:33:58.6229193Z
2021-02-08T21:33:58.6229763Z def test_func():
2021-02-08T21:33:58.6230524Z result = None
2021-02-08T21:33:58.6231092Z workers = []
2021-02-08T21:33:58.6231819Z with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
2021-02-08T21:33:58.6232442Z
2021-02-08T21:33:58.6232952Z async def coro():
2021-02-08T21:33:58.6233671Z with dask.config.set(config):
2021-02-08T21:33:58.6278267Z s = False
2021-02-08T21:33:58.6278895Z for i in range(5):
2021-02-08T21:33:58.6279521Z try:
2021-02-08T21:33:58.6280066Z s, ws = await start_cluster(
2021-02-08T21:33:58.6280690Z nthreads,
2021-02-08T21:33:58.6281251Z scheduler,
2021-02-08T21:33:58.6281838Z loop,
2021-02-08T21:33:58.6282375Z security=security,
2021-02-08T21:33:58.6283003Z Worker=Worker,
2021-02-08T21:33:58.6283786Z scheduler_kwargs=scheduler_kwargs,
2021-02-08T21:33:58.6284524Z worker_kwargs=worker_kwargs,
2021-02-08T21:33:58.6285057Z )
2021-02-08T21:33:58.6285649Z except Exception as e:
2021-02-08T21:33:58.6286225Z logger.error(
2021-02-08T21:33:58.6287545Z "Failed to start gen_cluster, retrying",
2021-02-08T21:33:58.6288199Z exc_info=True,
2021-02-08T21:33:58.6288731Z )
2021-02-08T21:33:58.6289384Z await asyncio.sleep(1)
2021-02-08T21:33:58.6289897Z else:
2021-02-08T21:33:58.6290325Z workers[:] = ws
2021-02-08T21:33:58.6290808Z args = [s] + workers
2021-02-08T21:33:58.6291322Z break
2021-02-08T21:33:58.6291862Z if s is False:
2021-02-08T21:33:58.6292484Z raise Exception("Could not start cluster")
2021-02-08T21:33:58.6293207Z if client:
2021-02-08T21:33:58.6294100Z c = await Client(
2021-02-08T21:33:58.6294588Z s.address,
2021-02-08T21:33:58.6295005Z loop=loop,
2021-02-08T21:33:58.6295466Z security=security,
2021-02-08T21:33:58.6296157Z asynchronous=True,
2021-02-08T21:33:58.6296725Z **client_kwargs,
2021-02-08T21:33:58.6297253Z )
2021-02-08T21:33:58.6297723Z args = [c] + args
2021-02-08T21:33:58.6298668Z try:
2021-02-08T21:33:58.6299164Z future = func(*args)
2021-02-08T21:33:58.6299742Z if timeout:
2021-02-08T21:33:58.6300376Z future = asyncio.wait_for(future, timeout)
2021-02-08T21:33:58.6301080Z result = await future
2021-02-08T21:33:58.6301626Z if s.validate:
2021-02-08T21:33:58.6302228Z s.validate_state()
2021-02-08T21:33:58.6302746Z finally:
2021-02-08T21:33:58.6304747Z if client and c.status not in ("closing", "closed"):
2021-02-08T21:33:58.6305455Z await c._close(fast=s.status == Status.closed)
2021-02-08T21:33:58.6306113Z await end_cluster(s, workers)
2021-02-08T21:33:58.6306746Z await asyncio.wait_for(cleanup_global_workers(), 1)
2021-02-08T21:33:58.6307296Z
2021-02-08T21:33:58.6307881Z try:
2021-02-08T21:33:58.6308395Z c = await default_client()
2021-02-08T21:33:58.6308901Z except ValueError:
2021-02-08T21:33:58.6309389Z pass
2021-02-08T21:33:58.6309765Z else:
2021-02-08T21:33:58.6310206Z await c._close(fast=True)
2021-02-08T21:33:58.6310645Z
2021-02-08T21:33:58.6311027Z def get_unclosed():
2021-02-08T21:33:58.6311645Z return [c for c in Comm._instances if not c.closed()] + [
2021-02-08T21:33:58.6312153Z c
2021-02-08T21:33:58.6312682Z for c in _global_clients.values()
2021-02-08T21:33:58.6313330Z if c.status != "closed"
2021-02-08T21:33:58.6313748Z ]
2021-02-08T21:33:58.6314115Z
2021-02-08T21:33:58.6314442Z try:
2021-02-08T21:33:58.6314874Z start = time()
2021-02-08T21:33:58.6315321Z while time() < start + 5:
2021-02-08T21:33:58.6315817Z gc.collect()
2021-02-08T21:33:58.6316280Z if not get_unclosed():
2021-02-08T21:33:58.6316711Z break
2021-02-08T21:33:58.6317222Z await asyncio.sleep(0.05)
2021-02-08T21:33:58.6317681Z else:
2021-02-08T21:33:58.6318153Z if allow_unclosed:
2021-02-08T21:33:58.6318710Z print(f"Unclosed Comms: {get_unclosed()}")
2021-02-08T21:33:58.6319263Z else:
2021-02-08T21:33:58.6320929Z raise RuntimeError("Unclosed Comms", get_unclosed())
2021-02-08T21:33:58.6321600Z finally:
2021-02-08T21:33:58.6322085Z Comm._instances.clear()
2021-02-08T21:33:58.6322643Z _global_clients.clear()
2021-02-08T21:33:58.6323224Z
2021-02-08T21:33:58.6323609Z return result
2021-02-08T21:33:58.6324021Z
2021-02-08T21:33:58.6324414Z result = loop.run_sync(
2021-02-08T21:33:58.6324995Z > coro, timeout=timeout * 2 if timeout else timeout
2021-02-08T21:33:58.6325529Z )
2021-02-08T21:33:58.6325869Z
2021-02-08T21:33:58.6326289Z distributed\utils_test.py:954:
2021-02-08T21:33:58.6326814Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2021-02-08T21:33:58.6327064Z
2021-02-08T21:33:58.6328098Z self = <tornado.platform.asyncio.AsyncIOLoop object at 0x00000194C8190608>
2021-02-08T21:33:58.6329351Z func = <function gen_cluster.<locals>._.<locals>.test_func.<locals>.coro at 0x00000194C8214B88>
2021-02-08T21:33:58.6330159Z timeout = 20
2021-02-08T21:33:58.6330549Z
2021-02-08T21:33:58.6331158Z def run_sync(self, func, timeout=None):
2021-02-08T21:33:58.6332007Z """Starts the `IOLoop`, runs the given function, and stops the loop.
2021-02-08T21:33:58.6332846Z
2021-02-08T21:33:59.5394622Z The function must return either an awaitable object or
2021-02-08T21:33:59.5405824Z ``None``. If the function returns an awaitable object, the
2021-02-08T21:33:59.5406838Z `IOLoop` will run until the awaitable is resolved (and
2021-02-08T21:33:59.5407735Z `run_sync()` will return the awaitable's result). If it raises
2021-02-08T21:33:59.5408810Z an exception, the `IOLoop` will stop and the exception will be
2021-02-08T21:33:59.5409684Z re-raised to the caller.
2021-02-08T21:33:59.5410221Z
2021-02-08T21:33:59.5411125Z The keyword-only argument ``timeout`` may be used to set
2021-02-08T21:33:59.5412010Z a maximum duration for the function. If the timeout expires,
2021-02-08T21:33:59.5412974Z a `tornado.util.TimeoutError` is raised.
2021-02-08T21:33:59.5413662Z
2021-02-08T21:33:59.5415602Z This method is useful to allow asynchronous calls in a
2021-02-08T21:33:59.5416534Z ``main()`` function::
2021-02-08T21:33:59.5417109Z
2021-02-08T21:33:59.5417614Z async def main():
2021-02-08T21:33:59.5418302Z # do stuff...
2021-02-08T21:33:59.5419219Z
2021-02-08T21:33:59.5419872Z if __name__ == '__main__':
2021-02-08T21:33:59.5420571Z IOLoop.current().run_sync(main)
2021-02-08T21:33:59.5421220Z
2021-02-08T21:33:59.5421799Z .. versionchanged:: 4.3
2021-02-08T21:33:59.5422558Z Returning a non-``None``, non-awaitable value is now an error.
2021-02-08T21:33:59.5423258Z
2021-02-08T21:33:59.5423768Z .. versionchanged:: 5.0
2021-02-08T21:33:59.5425166Z If a timeout occurs, the ``func`` coroutine will be cancelled.
2021-02-08T21:33:59.5425804Z
2021-02-08T21:33:59.5426425Z """
2021-02-08T21:33:59.5426891Z future_cell = [None]
2021-02-08T21:33:59.5427574Z
2021-02-08T21:33:59.5428039Z def run():
2021-02-08T21:33:59.5429861Z try:
2021-02-08T21:33:59.5430281Z result = func()
2021-02-08T21:33:59.5430773Z if result is not None:
2021-02-08T21:33:59.5431342Z from tornado.gen import convert_yielded
2021-02-08T21:33:59.5432007Z result = convert_yielded(result)
2021-02-08T21:33:59.5432535Z except Exception:
2021-02-08T21:33:59.5433081Z future_cell[0] = Future()
2021-02-08T21:33:59.5433650Z future_set_exc_info(future_cell[0], sys.exc_info())
2021-02-08T21:33:59.5434154Z else:
2021-02-08T21:33:59.5434645Z if is_future(result):
2021-02-08T21:33:59.5435118Z future_cell[0] = result
2021-02-08T21:33:59.5435587Z else:
2021-02-08T21:33:59.5436016Z future_cell[0] = Future()
2021-02-08T21:33:59.5436586Z future_cell[0].set_result(result)
2021-02-08T21:33:59.5437256Z self.add_future(future_cell[0], lambda future: self.stop())
2021-02-08T21:33:59.5437906Z self.add_callback(run)
2021-02-08T21:33:59.5438727Z if timeout is not None:
2021-02-08T21:33:59.5439882Z def timeout_callback():
2021-02-08T21:33:59.5440548Z # If we can cancel the future, do so and wait on it. If not,
2021-02-08T21:33:59.5441252Z # Just stop the loop and return with the task still pending.
2021-02-08T21:33:59.5441993Z # (If we neither cancel nor wait for the task, a warning
2021-02-08T21:33:59.5442541Z # will be logged).
2021-02-08T21:33:59.5443054Z if not future_cell[0].cancel():
2021-02-08T21:33:59.5443514Z self.stop()
2021-02-08T21:33:59.5444226Z timeout_handle = self.add_timeout(self.time() + timeout, timeout_callback)
2021-02-08T21:33:59.5444872Z self.start()
2021-02-08T21:33:59.5445305Z if timeout is not None:
2021-02-08T21:33:59.5446133Z self.remove_timeout(timeout_handle)
2021-02-08T21:33:59.5446789Z if future_cell[0].cancelled() or not future_cell[0].done():
2021-02-08T21:33:59.5447598Z > raise TimeoutError('Operation timed out after %s seconds' % timeout)
2021-02-08T21:33:59.5448521Z E tornado.util.TimeoutError: Operation timed out after 20 seconds
2021-02-08T21:33:59.5449180Z
2021-02-08T21:33:59.5449928Z C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\ioloop.py:575: TimeoutError
Metadata
Metadata
Assignees
Labels
flaky testIntermittent failures on CI.Intermittent failures on CI.