-
-
Notifications
You must be signed in to change notification settings - Fork 750
Closed
Description
What happened:
Starting with Python 3.8, there have been deprecation warnings about the loop parameter to asyncio.queues.Queue.
distributed/tests/test_actor.py: 218 warnings
/builddir/build/BUILD/distributed-2021.09.1/distributed/actor.py:171: DeprecationWarning: The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10.
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
distributed/tests/test_actor.py: 218 warnings
/usr/lib64/python3.9/asyncio/queues.py:48: DeprecationWarning: The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10.
self._finished = locks.Event(loop=loop)
With 3.10, the parameter has been removed and causes several test failures:
Python 3.10 `Queue` failures
__________________________ test_client_actions[True] ___________________________
direct_to_workers = True
@pytest.mark.parametrize("direct_to_workers", [True, False])
def test_client_actions(direct_to_workers):
@gen_cluster(client=True)
async def test(c, s, a, b):
c = await Client(
s.address, asynchronous=True, direct_to_workers=direct_to_workers
)
counter = c.submit(Counter, workers=[a.address], actor=True)
assert isinstance(counter, Future)
counter = await counter
assert counter._address
assert hasattr(counter, "increment")
assert hasattr(counter, "add")
assert hasattr(counter, "n")
n = await counter.n
assert n == 0
assert counter._address == a.address
assert isinstance(a.actors[counter.key], Counter)
assert s.tasks[counter.key].actor
await asyncio.gather(counter.increment(), counter.increment())
n = await counter.n
assert n == 2
counter.add(10)
while (await counter.n) != 10 + 2:
n = await counter.n
await asyncio.sleep(0.01)
await c.close()
> test()
distributed/tests/test_actor.py:109:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/utils_test.py:994: in test_func
result = loop.run_sync(
/usr/lib64/python3.10/site-packages/tornado/ioloop.py:530: in run_sync
return future_cell[0].result()
distributed/utils_test.py:953: in coro
result = await future
/usr/lib64/python3.10/asyncio/tasks.py:447: in wait_for
return fut.result()
distributed/tests/test_actor.py:97: in test
await asyncio.gather(counter.increment(), counter.increment())
distributed/actor.py:171: in func
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce2a068c0>
def __init__(self, *, loop=_marker):
if loop is not _marker:
> raise TypeError(
f'As of 3.10, the *loop* parameter was removed from '
f'{type(self).__name__}() since it is no longer necessary'
)
E TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary
/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
__________________________ test_client_actions[False] __________________________
direct_to_workers = False
@pytest.mark.parametrize("direct_to_workers", [True, False])
def test_client_actions(direct_to_workers):
@gen_cluster(client=True)
async def test(c, s, a, b):
c = await Client(
s.address, asynchronous=True, direct_to_workers=direct_to_workers
)
counter = c.submit(Counter, workers=[a.address], actor=True)
assert isinstance(counter, Future)
counter = await counter
assert counter._address
assert hasattr(counter, "increment")
assert hasattr(counter, "add")
assert hasattr(counter, "n")
n = await counter.n
assert n == 0
assert counter._address == a.address
assert isinstance(a.actors[counter.key], Counter)
assert s.tasks[counter.key].actor
await asyncio.gather(counter.increment(), counter.increment())
n = await counter.n
assert n == 2
counter.add(10)
while (await counter.n) != 10 + 2:
n = await counter.n
await asyncio.sleep(0.01)
await c.close()
> test()
distributed/tests/test_actor.py:109:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/utils_test.py:994: in test_func
result = loop.run_sync(
/usr/lib64/python3.10/site-packages/tornado/ioloop.py:530: in run_sync
return future_cell[0].result()
distributed/utils_test.py:953: in coro
result = await future
/usr/lib64/python3.10/asyncio/tasks.py:447: in wait_for
return fut.result()
distributed/tests/test_actor.py:97: in test
await asyncio.gather(counter.increment(), counter.increment())
distributed/actor.py:171: in func
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce29e7ca0>
def __init__(self, *, loop=_marker):
if loop is not _marker:
> raise TypeError(
f'As of 3.10, the *loop* parameter was removed from '
f'{type(self).__name__}() since it is no longer necessary'
)
E TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary
/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
__________________________ test_worker_actions[False] __________________________
separate_thread = False
@pytest.mark.parametrize("separate_thread", [False, True])
def test_worker_actions(separate_thread):
@gen_cluster(client=True)
async def test(c, s, a, b):
counter = c.submit(Counter, workers=[a.address], actor=True)
a_address = a.address
def f(counter):
start = counter.n
assert type(counter) is Actor
assert counter._address == a_address
future = counter.increment(separate_thread=separate_thread)
assert isinstance(future, ActorFuture)
assert "Future" in type(future).__name__
end = future.result(timeout=1)
assert end > start
futures = [c.submit(f, counter, pure=False) for _ in range(10)]
await c.gather(futures)
counter = await counter
assert await counter.n == 10
> test()
distributed/tests/test_actor.py:137:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/utils_test.py:994: in test_func
result = loop.run_sync(
/usr/lib64/python3.10/site-packages/tornado/ioloop.py:530: in run_sync
return future_cell[0].result()
distributed/utils_test.py:953: in coro
result = await future
/usr/lib64/python3.10/asyncio/tasks.py:447: in wait_for
return fut.result()
distributed/tests/test_actor.py:132: in test
await c.gather(futures)
distributed/client.py:1831: in _gather
raise exception.with_traceback(traceback)
distributed/tests/test_actor.py:125: in f
future = counter.increment(separate_thread=separate_thread)
distributed/actor.py:171: in func
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
"""Event loop mixins."""
import threading
from . import events
_global_lock = threading.Lock()
# Used as a sentinel for loop parameter
_marker = object()
class _LoopBoundMixin:
_loop = None
def __init__(self, *, loop=_marker):
if loop is not _marker:
> raise TypeError(
f'As of 3.10, the *loop* parameter was removed from '
f'{type(self).__name__}() since it is no longer necessary'
)
E TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary
/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
----------------------------- Captured stderr call -----------------------------
distributed.worker - WARNING - Compute Failed
Function: f
args: (<Actor: Counter, key=Counter-7720d2e1-493f-4a52-b833-667ee96b2cd8>)
kwargs: {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')
distributed.worker - WARNING - Compute Failed
Function: f
args: (<Actor: Counter, key=Counter-7720d2e1-493f-4a52-b833-667ee96b2cd8>)
kwargs: {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')
distributed.worker - WARNING - Compute Failed
Function: f
args: (<Actor: Counter, key=Counter-7720d2e1-493f-4a52-b833-667ee96b2cd8>)
kwargs: {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')
distributed.worker - WARNING - Compute Failed
Function: f
args: (<Actor: Counter, key=Counter-7720d2e1-493f-4a52-b833-667ee96b2cd8>)
kwargs: {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')
__________________________ test_worker_actions[True] ___________________________
separate_thread = True
@pytest.mark.parametrize("separate_thread", [False, True])
def test_worker_actions(separate_thread):
@gen_cluster(client=True)
async def test(c, s, a, b):
counter = c.submit(Counter, workers=[a.address], actor=True)
a_address = a.address
def f(counter):
start = counter.n
assert type(counter) is Actor
assert counter._address == a_address
future = counter.increment(separate_thread=separate_thread)
assert isinstance(future, ActorFuture)
assert "Future" in type(future).__name__
end = future.result(timeout=1)
assert end > start
futures = [c.submit(f, counter, pure=False) for _ in range(10)]
await c.gather(futures)
counter = await counter
assert await counter.n == 10
> test()
distributed/tests/test_actor.py:137:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/utils_test.py:994: in test_func
result = loop.run_sync(
/usr/lib64/python3.10/site-packages/tornado/ioloop.py:530: in run_sync
return future_cell[0].result()
distributed/utils_test.py:953: in coro
result = await future
/usr/lib64/python3.10/asyncio/tasks.py:447: in wait_for
return fut.result()
distributed/tests/test_actor.py:132: in test
await c.gather(futures)
distributed/client.py:1831: in _gather
raise exception.with_traceback(traceback)
distributed/tests/test_actor.py:125: in f
future = counter.increment(separate_thread=separate_thread)
distributed/actor.py:171: in func
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
"""Event loop mixins."""
import threading
from . import events
_global_lock = threading.Lock()
# Used as a sentinel for loop parameter
_marker = object()
class _LoopBoundMixin:
_loop = None
def __init__(self, *, loop=_marker):
if loop is not _marker:
> raise TypeError(
f'As of 3.10, the *loop* parameter was removed from '
f'{type(self).__name__}() since it is no longer necessary'
)
E TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary
/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
----------------------------- Captured stderr call -----------------------------
distributed.worker - WARNING - Compute Failed
Function: f
args: (<Actor: Counter, key=Counter-caf6a97c-07e0-4712-9868-bc65337937c2>)
kwargs: {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')
distributed.worker - WARNING - Compute Failed
Function: f
args: (<Actor: Counter, key=Counter-caf6a97c-07e0-4712-9868-bc65337937c2>)
kwargs: {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')
distributed.worker - WARNING - Compute Failed
Function: f
args: (<Actor: Counter, key=Counter-caf6a97c-07e0-4712-9868-bc65337937c2>)
kwargs: {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')
distributed.worker - WARNING - Compute Failed
Function: f
args: (<Actor: Counter, key=Counter-caf6a97c-07e0-4712-9868-bc65337937c2>)
kwargs: {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')
____________________________ test_exceptions_method ____________________________
c = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:39615" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:33307', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:43653', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_exceptions_method(c, s, a, b):
class Foo:
def throw(self):
1 / 0
foo = await c.submit(Foo, actor=True)
with pytest.raises(ZeroDivisionError):
> await foo.throw()
distributed/tests/test_actor.py:202:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/actor.py:171: in func
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce0ccefe0>
def __init__(self, *, loop=_marker):
if loop is not _marker:
> raise TypeError(
f'As of 3.10, the *loop* parameter was removed from '
f'{type(self).__name__}() since it is no longer necessary'
)
E TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary
/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
__________________________________ test_sync ___________________________________
client = <Client: 'tcp://127.0.0.1:37671' processes=2 threads=2, memory=29.19 GiB>
def test_sync(client):
counter = client.submit(Counter, actor=True)
counter = counter.result()
assert counter.n == 0
> future = counter.increment()
distributed/tests/test_actor.py:270:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/actor.py:171: in func
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce27e7f10>
def __init__(self, *, loop=_marker):
if loop is not _marker:
> raise TypeError(
f'As of 3.10, the *loop* parameter was removed from '
f'{type(self).__name__}() since it is no longer necessary'
)
E TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary
/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
---------------------------- Captured stderr setup -----------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:37671
distributed.scheduler - INFO - dashboard at: 127.0.0.1:8787
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:35683
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:39187
distributed.worker - INFO - Listening to: tcp://127.0.0.1:35683
distributed.worker - INFO - Listening to: tcp://127.0.0.1:39187
distributed.worker - INFO - dashboard at: 127.0.0.1:41155
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:37671
distributed.worker - INFO - dashboard at: 127.0.0.1:45103
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:37671
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 14.60 GiB
distributed.worker - INFO - Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-78907aa0-75bd-40e2-b1b3-243974e7c2b1/dask-worker-space/worker-mvxlpz9l
distributed.worker - INFO - Memory: 14.60 GiB
distributed.worker - INFO - Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-6f4a430f-7fcf-4c68-af86-a61a18175dee/dask-worker-space/worker-t9pkf1a3
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:39187', name: tcp://127.0.0.1:39187, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:39187
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:37671
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:35683', name: tcp://127.0.0.1:35683, memory: 0, processing: 0>
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:35683
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:37671
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-0438483e-1f0d-11ec-96e6-94de8078e5f9
distributed.core - INFO - Starting established connection
--------------------------- Captured stderr teardown ---------------------------
distributed.scheduler - INFO - Remove client Client-0438483e-1f0d-11ec-96e6-94de8078e5f9
distributed.scheduler - INFO - Remove client Client-0438483e-1f0d-11ec-96e6-94de8078e5f9
distributed.scheduler - INFO - Close client connection: Client-0438483e-1f0d-11ec-96e6-94de8078e5f9
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:39187
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:35683
_____________________________ test_numpy_roundtrip _____________________________
c = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:46169" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:34601', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:35397', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_numpy_roundtrip(c, s, a, b):
np = pytest.importorskip("numpy")
server = await c.submit(ParameterServer, actor=True)
x = np.random.random(1000)
> await server.put("x", x)
distributed/tests/test_actor.py:312:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/actor.py:171: in func
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce0a40be0>
def __init__(self, *, loop=_marker):
if loop is not _marker:
> raise TypeError(
f'As of 3.10, the *loop* parameter was removed from '
f'{type(self).__name__}() since it is no longer necessary'
)
E TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary
/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
_________________________ test_numpy_roundtrip_getattr _________________________
c = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:38951" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:44113', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:38615', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_numpy_roundtrip_getattr(c, s, a, b):
np = pytest.importorskip("numpy")
counter = await c.submit(Counter, actor=True)
x = np.random.random(1000)
> await counter.add(x)
distributed/tests/test_actor.py:327:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/actor.py:171: in func
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce0a43100>
def __init__(self, *, loop=_marker):
if loop is not _marker:
> raise TypeError(
f'As of 3.10, the *loop* parameter was removed from '
f'{type(self).__name__}() since it is no longer necessary'
)
E TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary
/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
____________________________ test_many_computations ____________________________
c = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:35331" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:46325', 0, Status.closed, stored: 1, running: 0/1, ready: 0, comm: 0, waiting: -3>
b = <Worker: 'tcp://127.0.0.1:41811', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_many_computations(c, s, a, b):
counter = await c.submit(Counter, actor=True)
def add(n, counter):
for i in range(n):
counter.increment().result()
futures = c.map(add, range(10), counter=counter)
done = c.submit(lambda x: None, futures)
while not done.done():
assert len(s.processing) <= a.nthreads + b.nthreads
await asyncio.sleep(0.01)
> await done
distributed/tests/test_actor.py:370:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/client.py:240: in _result
raise exc.with_traceback(tb)
/usr/lib/python3.10/site-packages/dask/utils.py:35: in apply
return func(*args, **kwargs)
distributed/tests/test_actor.py:361: in add
counter.increment().result()
distributed/actor.py:171: in func
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
"""Event loop mixins."""
import threading
from . import events
_global_lock = threading.Lock()
# Used as a sentinel for loop parameter
_marker = object()
class _LoopBoundMixin:
_loop = None
def __init__(self, *, loop=_marker):
if loop is not _marker:
> raise TypeError(
f'As of 3.10, the *loop* parameter was removed from '
f'{type(self).__name__}() since it is no longer necessary'
)
E TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary
/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
----------------------------- Captured stderr call -----------------------------
distributed.worker - WARNING - Compute Failed
Function: execute_task
args: ((<function apply at 0x7f1d643b75b0>, <function test_many_computations.<locals>.add at 0x7f1ce0af16c0>, (<class 'tuple'>, [1]), {'counter': <Actor: Counter, key=Counter-c0d818c3-6c8c-4090-90d3-5df7d7aea5d2>}))
kwargs: {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')
distributed.worker - WARNING - Compute Failed
Function: execute_task
args: ((<function apply at 0x7f1d643b75b0>, <function test_many_computations.<locals>.add at 0x7f1ce0af1750>, (<class 'tuple'>, [2]), {'counter': <Actor: Counter, key=Counter-c0d818c3-6c8c-4090-90d3-5df7d7aea5d2>}))
kwargs: {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')
distributed.worker - WARNING - Compute Failed
Function: execute_task
args: ((<function apply at 0x7f1d643b75b0>, <function test_many_computations.<locals>.add at 0x7f1ce0af1870>, (<class 'tuple'>, [4]), {'counter': <Actor: Counter, key=Counter-c0d818c3-6c8c-4090-90d3-5df7d7aea5d2>}))
kwargs: {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')
distributed.worker - WARNING - Compute Failed
Function: execute_task
args: ((<function apply at 0x7f1d643b75b0>, <function test_many_computations.<locals>.add at 0x7f1ce0af17e0>, (<class 'tuple'>, [3]), {'counter': <Actor: Counter, key=Counter-c0d818c3-6c8c-4090-90d3-5df7d7aea5d2>}))
kwargs: {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')
distributed.worker - WARNING - Compute Failed
Function: execute_task
args: ((<function apply at 0x7f1d643b75b0>, <function test_many_computations.<locals>.add at 0x7f1ce0af1990>, (<class 'tuple'>, [9]), {'counter': <Actor: Counter, key=Counter-c0d818c3-6c8c-4090-90d3-5df7d7aea5d2>}))
kwargs: {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')
distributed.worker - WARNING - Compute Failed
Function: execute_task
args: ((<function apply at 0x7f1d643b75b0>, <function test_many_computations.<locals>.add at 0x7f1ce0af1900>, (<class 'tuple'>, [5]), {'counter': <Actor: Counter, key=Counter-c0d818c3-6c8c-4090-90d3-5df7d7aea5d2>}))
kwargs: {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')
distributed.worker - WARNING - Compute Failed
Function: execute_task
args: ((<function apply at 0x7f1d643b75b0>, <function test_many_computations.<locals>.add at 0x7f1ce0af1a20>, (<class 'tuple'>, [6]), {'counter': <Actor: Counter, key=Counter-c0d818c3-6c8c-4090-90d3-5df7d7aea5d2>}))
kwargs: {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')
______________________________ test_thread_safety ______________________________
c = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:38121" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:35701', 0, Status.closed, stored: 0, running: 0/5, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:42911', 1, Status.closed, stored: 0, running: 0/5, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True, nthreads=[("127.0.0.1", 5)] * 2)
async def test_thread_safety(c, s, a, b):
class Unsafe:
def __init__(self):
self.n = 0
def f(self):
assert self.n == 0
self.n += 1
for i in range(20):
sleep(0.002)
assert self.n == 1
self.n = 0
unsafe = await c.submit(Unsafe, actor=True)
> futures = [unsafe.f() for i in range(10)]
distributed/tests/test_actor.py:390:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/tests/test_actor.py:390: in <listcomp>
futures = [unsafe.f() for i in range(10)]
distributed/actor.py:171: in func
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce0ab33a0>
def __init__(self, *, loop=_marker):
if loop is not _marker:
> raise TypeError(
f'As of 3.10, the *loop* parameter was removed from '
f'{type(self).__name__}() since it is no longer necessary'
)
E TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary
/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
______________________________ test_compute_sync _______________________________
client = <Client: 'tcp://127.0.0.1:36651' processes=2 threads=2, memory=29.19 GiB>
def test_compute_sync(client):
@dask.delayed
def f(n, counter):
assert isinstance(counter, Actor), type(counter)
for i in range(n):
counter.increment().result()
@dask.delayed
def check(counter, blanks):
return counter.n
counter = dask.delayed(Counter)()
values = [f(i, counter) for i in range(5)]
final = check(counter, values)
> result = final.compute(actors=counter)
distributed/tests/test_actor.py:517:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/lib/python3.10/site-packages/dask/base.py:288: in compute
(result,) = compute(self, traverse=False, **kwargs)
/usr/lib/python3.10/site-packages/dask/base.py:570: in compute
results = schedule(dsk, keys, **kwargs)
distributed/client.py:2689: in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
distributed/client.py:1966: in gather
return self.sync(
distributed/client.py:860: in sync
return sync(
distributed/utils.py:326: in sync
raise exc.with_traceback(tb)
distributed/utils.py:309: in f
result[0] = yield future
/usr/lib64/python3.10/site-packages/tornado/gen.py:762: in run
value = future.result()
distributed/client.py:1831: in _gather
raise exception.with_traceback(traceback)
distributed/tests/test_actor.py:507: in f
counter.increment().result()
distributed/actor.py:171: in func
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
"""Event loop mixins."""
import threading
from . import events
_global_lock = threading.Lock()
# Used as a sentinel for loop parameter
_marker = object()
class _LoopBoundMixin:
_loop = None
def __init__(self, *, loop=_marker):
if loop is not _marker:
> raise TypeError(
f'As of 3.10, the *loop* parameter was removed from '
f'{type(self).__name__}() since it is no longer necessary'
)
E TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary
/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
---------------------------- Captured stderr setup -----------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:36651
distributed.scheduler - INFO - dashboard at: 127.0.0.1:8787
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:33679
distributed.worker - INFO - Listening to: tcp://127.0.0.1:33679
distributed.worker - INFO - dashboard at: 127.0.0.1:40821
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:45345
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:36651
distributed.worker - INFO - Listening to: tcp://127.0.0.1:45345
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - dashboard at: 127.0.0.1:34307
distributed.worker - INFO - Memory: 14.60 GiB
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:36651
distributed.worker - INFO - Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-1fffd77f-f100-4a17-9e5a-d77117486e22/dask-worker-space/worker-l9zckx4t
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Memory: 14.60 GiB
distributed.worker - INFO - Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-8c6e926f-438f-4afc-ad01-374c2004961e/dask-worker-space/worker-t278frlt
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:45345', name: tcp://127.0.0.1:45345, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:45345
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:36651
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:33679', name: tcp://127.0.0.1:33679, memory: 0, processing: 0>
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:33679
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:36651
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-06bc3801-1f0d-11ec-96e6-94de8078e5f9
distributed.core - INFO - Starting established connection
----------------------------- Captured stderr call -----------------------------
distributed.worker - WARNING - Compute Failed
Function: f
args: (2, <Actor: Counter, key=Counter-c2657ca9-a782-49ff-bdf2-068eb69118fa>)
kwargs: {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')
distributed.worker - WARNING - Compute Failed
Function: f
args: (3, <Actor: Counter, key=Counter-c2657ca9-a782-49ff-bdf2-068eb69118fa>)
kwargs: {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')
--------------------------- Captured stderr teardown ---------------------------
distributed.scheduler - INFO - Receive client connection: Client-worker-06c2427c-1f0d-11ec-ac0c-94de8078e5f9
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-06bc3801-1f0d-11ec-96e6-94de8078e5f9
distributed.scheduler - INFO - Remove client Client-06bc3801-1f0d-11ec-96e6-94de8078e5f9
distributed.scheduler - INFO - Close client connection: Client-06bc3801-1f0d-11ec-96e6-94de8078e5f9
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:45345
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:33679
distributed.scheduler - INFO - Remove client Client-worker-06c2427c-1f0d-11ec-ac0c-94de8078e5f9
distributed.scheduler - INFO - Remove client Client-worker-06c2427c-1f0d-11ec-ac0c-94de8078e5f9
distributed.scheduler - INFO - Close client connection: Client-worker-06c2427c-1f0d-11ec-ac0c-94de8078e5f9
distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:45345', name: tcp://127.0.0.1:45345, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:45345
____________________________ test_actors_in_profile ____________________________
c = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:44303" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:41441', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)],
config={"distributed.worker.profile.interval": "1ms"},
)
async def test_actors_in_profile(c, s, a):
class Sleeper:
def sleep(self, time):
sleep(time)
sleeper = await c.submit(Sleeper, actor=True)
for i in range(5):
> await sleeper.sleep(0.200)
distributed/tests/test_actor.py:542:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/actor.py:171: in func
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce29fa800>
def __init__(self, *, loop=_marker):
if loop is not _marker:
> raise TypeError(
f'As of 3.10, the *loop* parameter was removed from '
f'{type(self).__name__}() since it is no longer necessary'
)
E TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary
/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
_________________________________ test_waiter __________________________________
c = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:38473" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:36645', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:41535', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_waiter(c, s, a, b):
from tornado.locks import Event
class Waiter:
def __init__(self):
self.event = Event()
async def set(self):
self.event.set()
async def wait(self):
await self.event.wait()
waiter = await c.submit(Waiter, actor=True)
> futures = [waiter.wait() for _ in range(5)] # way more than we have actor threads
distributed/tests/test_actor.py:567:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/tests/test_actor.py:567: in <listcomp>
futures = [waiter.wait() for _ in range(5)] # way more than we have actor threads
distributed/actor.py:171: in func
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce08c2e00>
def __init__(self, *, loop=_marker):
if loop is not _marker:
> raise TypeError(
f'As of 3.10, the *loop* parameter was removed from '
f'{type(self).__name__}() since it is no longer necessary'
)
E TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary
/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
___________________________ test_one_thread_deadlock ___________________________
def test_one_thread_deadlock():
with cluster(nworkers=2) as (cl, w):
client = Client(cl["address"])
ac = client.submit(Counter, actor=True).result()
ac2 = client.submit(UsesCounter, actor=True, workers=[ac._address]).result()
> assert ac2.do_inc(ac).result() == 1
distributed/tests/test_actor.py:638:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/actor.py:171: in func
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce0806800>
def __init__(self, *, loop=_marker):
if loop is not _marker:
> raise TypeError(
f'As of 3.10, the *loop* parameter was removed from '
f'{type(self).__name__}() since it is no longer necessary'
)
E TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary
/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
----------------------------- Captured stderr call -----------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:33349
distributed.scheduler - INFO - dashboard at: 127.0.0.1:8787
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:33771
distributed.worker - INFO - Listening to: tcp://127.0.0.1:33771
distributed.worker - INFO - dashboard at: 127.0.0.1:44169
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:33349
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 14.60 GiB
distributed.worker - INFO - Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-838106ce-ee65-41e3-840a-128a2f1effa2/dask-worker-space/worker-ptpi26ti
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:38899
distributed.worker - INFO - Listening to: tcp://127.0.0.1:38899
distributed.worker - INFO - dashboard at: 127.0.0.1:44161
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:33349
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 14.60 GiB
distributed.worker - INFO - Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-b2caa31c-60c9-4f36-8188-d3ab297a0e43/dask-worker-space/worker-xbwfvymy
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:33771', name: tcp://127.0.0.1:33771, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:33771
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:33349
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:38899', name: tcp://127.0.0.1:38899, memory: 0, processing: 0>
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:38899
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:33349
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-0943bc49-1f0d-11ec-96e6-94de8078e5f9
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:38899
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:33771
_____________________________ test_async_deadlock ______________________________
client = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:38717" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:40435', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:46501', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_async_deadlock(client, s, a, b):
ac = await client.submit(Counter, actor=True)
ac2 = await client.submit(UsesCounter, actor=True, workers=[ac._address])
> assert (await ac2.ado_inc(ac)) == 1
distributed/tests/test_actor.py:646:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/actor.py:171: in func
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce0c326b0>
def __init__(self, *, loop=_marker):
if loop is not _marker:
> raise TypeError(
f'As of 3.10, the *loop* parameter was removed from '
f'{type(self).__name__}() since it is no longer necessary'
)
E TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary
/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
________________________________ test_exception ________________________________
def test_exception():
class MyException(Exception):
pass
class Broken:
def method(self):
raise MyException
@property
def prop(self):
raise MyException
with cluster(nworkers=2) as (cl, w):
client = Client(cl["address"])
ac = client.submit(Broken, actor=True).result()
> acfut = ac.method()
distributed/tests/test_actor.py:664:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/actor.py:171: in func
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce294fe20>
def __init__(self, *, loop=_marker):
if loop is not _marker:
> raise TypeError(
f'As of 3.10, the *loop* parameter was removed from '
f'{type(self).__name__}() since it is no longer necessary'
)
E TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary
/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
----------------------------- Captured stderr call -----------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:46347
distributed.scheduler - INFO - dashboard at: 127.0.0.1:8787
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:35825
distributed.worker - INFO - Listening to: tcp://127.0.0.1:35825
distributed.worker - INFO - dashboard at: 127.0.0.1:39041
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:46347
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 14.60 GiB
distributed.worker - INFO - Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-0469507d-635a-4bda-84b1-cc78dc041b8d/dask-worker-space/worker-1ssb7imh
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:42665
distributed.worker - INFO - Listening to: tcp://127.0.0.1:42665
distributed.worker - INFO - dashboard at: 127.0.0.1:34853
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:46347
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 14.60 GiB
distributed.worker - INFO - Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-3e305d6e-ccf9-43b1-8d37-f83b0aed071b/dask-worker-space/worker-9572kryt
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:35825', name: tcp://127.0.0.1:35825, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:35825
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:46347
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:42665', name: tcp://127.0.0.1:42665, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:42665
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:46347
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-0a0243bd-1f0d-11ec-96e6-94de8078e5f9
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:42665
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:35825
_____________________________ test_exception_async _____________________________
client = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:45745" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:42413', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:41479', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_exception_async(client, s, a, b):
class MyException(Exception):
pass
class Broken:
def method(self):
raise MyException
@property
def prop(self):
raise MyException
ac = await client.submit(Broken, actor=True)
> acfut = ac.method()
distributed/tests/test_actor.py:686:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/actor.py:171: in func
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce0804fa0>
def __init__(self, *, loop=_marker):
if loop is not _marker:
> raise TypeError(
f'As of 3.10, the *loop* parameter was removed from '
f'{type(self).__name__}() since it is no longer necessary'
)
E TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary
/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
______________________________ test_as_completed _______________________________
client = <Client: 'tcp://127.0.0.1:43751' processes=2 threads=2, memory=29.19 GiB>
def test_as_completed(client):
ac = client.submit(Counter, actor=True).result()
> futures = [ac.increment() for _ in range(10)]
distributed/tests/test_actor.py:696:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/tests/test_actor.py:696: in <listcomp>
futures = [ac.increment() for _ in range(10)]
distributed/actor.py:171: in func
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce291dea0>
def __init__(self, *, loop=_marker):
if loop is not _marker:
> raise TypeError(
f'As of 3.10, the *loop* parameter was removed from '
f'{type(self).__name__}() since it is no longer necessary'
)
E TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary
/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
---------------------------- Captured stderr setup -----------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:43751
distributed.scheduler - INFO - dashboard at: 127.0.0.1:8787
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:39947
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:45859
distributed.worker - INFO - Listening to: tcp://127.0.0.1:39947
distributed.worker - INFO - Listening to: tcp://127.0.0.1:45859
distributed.worker - INFO - dashboard at: 127.0.0.1:46029
distributed.worker - INFO - dashboard at: 127.0.0.1:46737
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:43751
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:43751
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 14.60 GiB
distributed.worker - INFO - Memory: 14.60 GiB
distributed.worker - INFO - Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-3d0a84ef-157a-4e6b-bb77-c6d910536ac5/dask-worker-space/worker-p7yismsg
distributed.worker - INFO - Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-ed20cd89-e988-4711-a24d-5a989a7cb262/dask-worker-space/worker-_36j_yjl
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:39947', name: tcp://127.0.0.1:39947, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:39947
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:43751
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:45859', name: tcp://127.0.0.1:45859, memory: 0, processing: 0>
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:45859
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:43751
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-0ac04121-1f0d-11ec-96e6-94de8078e5f9
distributed.core - INFO - Starting established connection
--------------------------- Captured stderr teardown ---------------------------
distributed.scheduler - INFO - Remove client Client-0ac04121-1f0d-11ec-96e6-94de8078e5f9
distributed.scheduler - INFO - Remove client Client-0ac04121-1f0d-11ec-96e6-94de8078e5f9
distributed.scheduler - INFO - Close client connection: Client-0ac04121-1f0d-11ec-96e6-94de8078e5f9
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:45859
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:39947
_________________________ test_actor_future_awaitable __________________________
client = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:37203" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:34641', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:38447', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True, timeout=3)
async def test_actor_future_awaitable(client, s, a, b):
ac = await client.submit(Counter, actor=True)
> futures = [ac.increment() for _ in range(10)]
distributed/tests/test_actor.py:711:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/tests/test_actor.py:711: in <listcomp>
futures = [ac.increment() for _ in range(10)]
distributed/actor.py:171: in func
q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce08c8a90>
def __init__(self, *, loop=_marker):
if loop is not _marker:
> raise TypeError(
f'As of 3.10, the *loop* parameter was removed from '
f'{type(self).__name__}() since it is no longer necessary'
)
E TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary
/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
I believe this may be the cause of these failures on Python 3.10 as well, but I'm not sure:
Additional failures
______________________ test_client_gather_semaphore_loop _______________________
s = <Scheduler: "tcp://127.0.0.1:46375" workers: 0 cores: 0, tasks: 0>
@gen_cluster(nthreads=[])
async def test_client_gather_semaphore_loop(s):
async with Client(s.address, asynchronous=True) as c:
> assert c._gather_semaphore._loop is c.loop.asyncio_loop
E AssertionError: assert None is <_UnixSelectorEventLoop running=True closed=False debug=False>
E + where None = <asyncio.locks.Semaphore object at 0x7f4627863fa0 [unlocked, value:5]>._loop
E + where <asyncio.locks.Semaphore object at 0x7f4627863fa0 [unlocked, value:5]> = <Client: 'tcp://127.0.0.1:46375' processes=0 threads=0, memory=0 B>._gather_semaphore
E + and <_UnixSelectorEventLoop running=True closed=False debug=False> = <tornado.platform.asyncio.AsyncIOLoop object at 0x7f462614cbe0>.asyncio_loop
E + where <tornado.platform.asyncio.AsyncIOLoop object at 0x7f462614cbe0> = <Client: 'tcp://127.0.0.1:46375' processes=0 threads=0, memory=0 B>.loop
distributed/tests/test_client.py:6306: AssertionError
_______________________ test_as_completed_condition_loop _______________________
c = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:45711" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:38873', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:37335', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_as_completed_condition_loop(c, s, a, b):
seq = c.map(inc, range(5))
ac = as_completed(seq)
> assert ac.condition._loop == c.loop.asyncio_loop
E assert None == <_UnixSelectorEventLoop running=True closed=False debug=False>
E +None
E -<_UnixSelectorEventLoop running=True closed=False debug=False>
distributed/tests/test_client.py:6313: AssertionError
__________________ test_client_connectionpool_semaphore_loop ___________________
s = {'address': 'tcp://127.0.0.1:43473'}
a = {'address': 'tcp://127.0.0.1:33689', 'proc': <weakref at 0x7f4627d41530; to 'SpawnProcess' at 0x7f462644eb30>}
b = {'address': 'tcp://127.0.0.1:39419', 'proc': <weakref at 0x7f4627d422f0; to 'SpawnProcess' at 0x7f462644d660>}
def test_client_connectionpool_semaphore_loop(s, a, b):
with Client(s["address"]) as c:
> assert c.rpc.semaphore._loop is c.loop.asyncio_loop
E AssertionError: assert None is <_UnixSelectorEventLoop running=True closed=False debug=False>
E + where None = <asyncio.locks.Semaphore object at 0x7f462644d750 [unlocked, value:511]>._loop
E + where <asyncio.locks.Semaphore object at 0x7f462644d750 [unlocked, value:511]> = <ConnectionPool: open=1, active=0, connecting=0>.semaphore
E + where <ConnectionPool: open=1, active=0, connecting=0> = <Client: 'tcp://127.0.0.1:43473' processes=2 threads=2, memory=29.19 GiB>.rpc
E + and <_UnixSelectorEventLoop running=True closed=False debug=False> = <tornado.platform.asyncio.AsyncIOLoop object at 0x7f462644e530>.asyncio_loop
E + where <tornado.platform.asyncio.AsyncIOLoop object at 0x7f462644e530> = <Client: 'tcp://127.0.0.1:43473' processes=2 threads=2, memory=29.19 GiB>.loop
distributed/tests/test_client.py:6318: AssertionError
---------------------------- Captured stderr setup -----------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:43473
distributed.scheduler - INFO - dashboard at: 127.0.0.1:8787
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:33689
distributed.worker - INFO - Listening to: tcp://127.0.0.1:33689
distributed.worker - INFO - dashboard at: 127.0.0.1:41495
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:43473
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 14.60 GiB
distributed.worker - INFO - Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-dffa0c91-c988-4176-a341-b68d74cf767c/dask-worker-space/worker-ufu1wyyt
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:39419
distributed.worker - INFO - Listening to: tcp://127.0.0.1:39419
distributed.worker - INFO - dashboard at: 127.0.0.1:34383
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:43473
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 14.60 GiB
distributed.worker - INFO - Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-b875dfbf-ec80-4e6c-9187-c2e45ce5ba0f/dask-worker-space/worker-vzoxfiuz
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:33689', name: tcp://127.0.0.1:33689, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:33689
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:43473
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:39419', name: tcp://127.0.0.1:39419, memory: 0, processing: 0>
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:39419
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:43473
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
----------------------------- Captured stderr call -----------------------------
distributed.scheduler - INFO - Receive client connection: Client-8d7b49bd-1f09-11ec-a046-94de8078e5f9
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-8d7b49bd-1f09-11ec-a046-94de8078e5f9
distributed.scheduler - INFO - Remove client Client-8d7b49bd-1f09-11ec-a046-94de8078e5f9
distributed.scheduler - INFO - Close client connection: Client-8d7b49bd-1f09-11ec-a046-94de8078e5f9
--------------------------- Captured stderr teardown ---------------------------
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:39419
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:33689
What you expected to happen:
Tests pass.
Minimal Complete Verifiable Example:
$ DISABLE_IPV6=1 PYTHONDONTWRITEBYTECODE=1 \
pytest -m 'not avoid_ci and not slow and not asyncio' -vra
Anything else we need to know?:
Environment:
- Dask version: 2021.9.1
- Python version: 3.10
- Operating System: Fedora Rawhide
- Install method (conda, pip, source): Source
Metadata
Metadata
Assignees
Labels
No labels