-
-
Notifications
You must be signed in to change notification settings - Fork 750
Closed
Labels
flaky testIntermittent failures on CI.Intermittent failures on CI.
Description
Full traceback:
______________________ test_dont_steal_unknown_functions _______________________
def test_func():
result = None
workers = []
with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
async def coro():
with dask.config.set(config):
s = False
for i in range(5):
try:
s, ws = await start_cluster(
nthreads,
scheduler,
loop,
security=security,
Worker=Worker,
scheduler_kwargs=scheduler_kwargs,
worker_kwargs=worker_kwargs,
)
except Exception as e:
logger.error(
"Failed to start gen_cluster, retrying",
exc_info=True,
)
else:
workers[:] = ws
args = [s] + workers
break
if s is False:
raise Exception("Could not start cluster")
if client:
c = await Client(
s.address,
loop=loop,
security=security,
asynchronous=True,
**client_kwargs
)
args = [c] + args
try:
future = func(*args)
if timeout:
future = asyncio.wait_for(future, timeout)
result = await future
if s.validate:
s.validate_state()
finally:
if client and c.status not in ("closing", "closed"):
await c._close(fast=s.status == "closed")
await end_cluster(s, workers)
await asyncio.wait_for(cleanup_global_workers(), 1)
try:
c = await default_client()
except ValueError:
pass
else:
await c._close(fast=True)
for i in range(5):
if all(c.closed() for c in Comm._instances):
break
else:
await asyncio.sleep(0.05)
else:
L = [c for c in Comm._instances if not c.closed()]
Comm._instances.clear()
# raise ValueError("Unclosed Comms", L)
print("Unclosed Comms", L)
return result
result = loop.run_sync(
> coro, timeout=timeout * 2 if timeout else timeout
)
distributed/utils_test.py:957:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda/envs/test-environment/lib/python3.6/site-packages/tornado/ioloop.py:576: in run_sync
return future_cell[0].result()
distributed/utils_test.py:927: in coro
result = await future
../../../miniconda/envs/test-environment/lib/python3.6/asyncio/tasks.py:358: in wait_for
return fut.result()
../../../miniconda/envs/test-environment/lib/python3.6/site-packages/tornado/gen.py:1147: in run
yielded = self.gen.send(value)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
c = <Client: not connected>
s = <Scheduler: "tcp://127.0.0.1:43315" processes: 0 cores: 0>
a = <Worker: 'tcp://127.0.0.1:35456', 0, closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:42222', 1, closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2)
def test_dont_steal_unknown_functions(c, s, a, b):
futures = c.map(inc, [1, 2], workers=a.address, allow_other_workers=True)
yield wait(futures)
> assert len(a.data) == 2, [len(a.data), len(b.data)]
E AssertionError: [1, 1]
E assert 1 == 2
E + where 1 = len(Buffer<<LRU: 28/5023005081 on dict>, <Func: serialize_bytelist<->deserialize_bytes <File: /home/travis/build/dask/distributed/dask-worker-space/worker-hbrk40rs/storage, mode="a", 0 elements>>>)
E + where Buffer<<LRU: 28/5023005081 on dict>, <Func: serialize_bytelist<->deserialize_bytes <File: /home/travis/build/dask/distributed/dask-worker-space/worker-hbrk40rs/storage, mode="a", 0 elements>>> = <Worker: 'tcp://127.0.0.1:35456', 0, running, stored: 1, running: 0/1, ready: 0, comm: 0, waiting: 0>.data
distributed/tests/test_steal.py:116: AssertionErrorMetadata
Metadata
Assignees
Labels
flaky testIntermittent failures on CI.Intermittent failures on CI.