Skip to content

The loop parameter to Queue is deprecated/removed #5350

@QuLogic

Description

@QuLogic

What happened:

Starting with Python 3.8, there have been deprecation warnings about the loop parameter to asyncio.queues.Queue.

distributed/tests/test_actor.py: 218 warnings
  /builddir/build/BUILD/distributed-2021.09.1/distributed/actor.py:171: DeprecationWarning: The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10.
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)

distributed/tests/test_actor.py: 218 warnings
  /usr/lib64/python3.9/asyncio/queues.py:48: DeprecationWarning: The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10.
    self._finished = locks.Event(loop=loop)

With 3.10, the parameter has been removed and causes several test failures:

Python 3.10 `Queue` failures
__________________________ test_client_actions[True] ___________________________

direct_to_workers = True

    @pytest.mark.parametrize("direct_to_workers", [True, False])
    def test_client_actions(direct_to_workers):
        @gen_cluster(client=True)
        async def test(c, s, a, b):
            c = await Client(
                s.address, asynchronous=True, direct_to_workers=direct_to_workers
            )
    
            counter = c.submit(Counter, workers=[a.address], actor=True)
            assert isinstance(counter, Future)
            counter = await counter
            assert counter._address
            assert hasattr(counter, "increment")
            assert hasattr(counter, "add")
            assert hasattr(counter, "n")
    
            n = await counter.n
            assert n == 0
    
            assert counter._address == a.address
    
            assert isinstance(a.actors[counter.key], Counter)
            assert s.tasks[counter.key].actor
    
            await asyncio.gather(counter.increment(), counter.increment())
    
            n = await counter.n
            assert n == 2
    
            counter.add(10)
            while (await counter.n) != 10 + 2:
                n = await counter.n
                await asyncio.sleep(0.01)
    
            await c.close()
    
>       test()

distributed/tests/test_actor.py:109: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/utils_test.py:994: in test_func
    result = loop.run_sync(
/usr/lib64/python3.10/site-packages/tornado/ioloop.py:530: in run_sync
    return future_cell[0].result()
distributed/utils_test.py:953: in coro
    result = await future
/usr/lib64/python3.10/asyncio/tasks.py:447: in wait_for
    return fut.result()
distributed/tests/test_actor.py:97: in test
    await asyncio.gather(counter.increment(), counter.increment())
distributed/actor.py:171: in func
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
    super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce2a068c0>

    def __init__(self, *, loop=_marker):
        if loop is not _marker:
>           raise TypeError(
                f'As of 3.10, the *loop* parameter was removed from '
                f'{type(self).__name__}() since it is no longer necessary'
            )
E           TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary

/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
__________________________ test_client_actions[False] __________________________

direct_to_workers = False

    @pytest.mark.parametrize("direct_to_workers", [True, False])
    def test_client_actions(direct_to_workers):
        @gen_cluster(client=True)
        async def test(c, s, a, b):
            c = await Client(
                s.address, asynchronous=True, direct_to_workers=direct_to_workers
            )
    
            counter = c.submit(Counter, workers=[a.address], actor=True)
            assert isinstance(counter, Future)
            counter = await counter
            assert counter._address
            assert hasattr(counter, "increment")
            assert hasattr(counter, "add")
            assert hasattr(counter, "n")
    
            n = await counter.n
            assert n == 0
    
            assert counter._address == a.address
    
            assert isinstance(a.actors[counter.key], Counter)
            assert s.tasks[counter.key].actor
    
            await asyncio.gather(counter.increment(), counter.increment())
    
            n = await counter.n
            assert n == 2
    
            counter.add(10)
            while (await counter.n) != 10 + 2:
                n = await counter.n
                await asyncio.sleep(0.01)
    
            await c.close()
    
>       test()

distributed/tests/test_actor.py:109: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/utils_test.py:994: in test_func
    result = loop.run_sync(
/usr/lib64/python3.10/site-packages/tornado/ioloop.py:530: in run_sync
    return future_cell[0].result()
distributed/utils_test.py:953: in coro
    result = await future
/usr/lib64/python3.10/asyncio/tasks.py:447: in wait_for
    return fut.result()
distributed/tests/test_actor.py:97: in test
    await asyncio.gather(counter.increment(), counter.increment())
distributed/actor.py:171: in func
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
    super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce29e7ca0>

    def __init__(self, *, loop=_marker):
        if loop is not _marker:
>           raise TypeError(
                f'As of 3.10, the *loop* parameter was removed from '
                f'{type(self).__name__}() since it is no longer necessary'
            )
E           TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary

/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
__________________________ test_worker_actions[False] __________________________

separate_thread = False

    @pytest.mark.parametrize("separate_thread", [False, True])
    def test_worker_actions(separate_thread):
        @gen_cluster(client=True)
        async def test(c, s, a, b):
            counter = c.submit(Counter, workers=[a.address], actor=True)
            a_address = a.address
    
            def f(counter):
                start = counter.n
    
                assert type(counter) is Actor
                assert counter._address == a_address
    
                future = counter.increment(separate_thread=separate_thread)
                assert isinstance(future, ActorFuture)
                assert "Future" in type(future).__name__
                end = future.result(timeout=1)
                assert end > start
    
            futures = [c.submit(f, counter, pure=False) for _ in range(10)]
            await c.gather(futures)
    
            counter = await counter
            assert await counter.n == 10
    
>       test()

distributed/tests/test_actor.py:137: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/utils_test.py:994: in test_func
    result = loop.run_sync(
/usr/lib64/python3.10/site-packages/tornado/ioloop.py:530: in run_sync
    return future_cell[0].result()
distributed/utils_test.py:953: in coro
    result = await future
/usr/lib64/python3.10/asyncio/tasks.py:447: in wait_for
    return fut.result()
distributed/tests/test_actor.py:132: in test
    await c.gather(futures)
distributed/client.py:1831: in _gather
    raise exception.with_traceback(traceback)
distributed/tests/test_actor.py:125: in f
    future = counter.increment(separate_thread=separate_thread)
distributed/actor.py:171: in func
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
    super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    """Event loop mixins."""
    
    import threading
    from . import events
    
    _global_lock = threading.Lock()
    
    # Used as a sentinel for loop parameter
    _marker = object()
    
    
    class _LoopBoundMixin:
        _loop = None
    
        def __init__(self, *, loop=_marker):
            if loop is not _marker:
>               raise TypeError(
                    f'As of 3.10, the *loop* parameter was removed from '
                    f'{type(self).__name__}() since it is no longer necessary'
                )
E               TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary

/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
----------------------------- Captured stderr call -----------------------------
distributed.worker - WARNING - Compute Failed
Function:  f
args:      (<Actor: Counter, key=Counter-7720d2e1-493f-4a52-b833-667ee96b2cd8>)
kwargs:    {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')

distributed.worker - WARNING - Compute Failed
Function:  f
args:      (<Actor: Counter, key=Counter-7720d2e1-493f-4a52-b833-667ee96b2cd8>)
kwargs:    {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')

distributed.worker - WARNING - Compute Failed
Function:  f
args:      (<Actor: Counter, key=Counter-7720d2e1-493f-4a52-b833-667ee96b2cd8>)
kwargs:    {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')

distributed.worker - WARNING - Compute Failed
Function:  f
args:      (<Actor: Counter, key=Counter-7720d2e1-493f-4a52-b833-667ee96b2cd8>)
kwargs:    {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')

__________________________ test_worker_actions[True] ___________________________

separate_thread = True

    @pytest.mark.parametrize("separate_thread", [False, True])
    def test_worker_actions(separate_thread):
        @gen_cluster(client=True)
        async def test(c, s, a, b):
            counter = c.submit(Counter, workers=[a.address], actor=True)
            a_address = a.address
    
            def f(counter):
                start = counter.n
    
                assert type(counter) is Actor
                assert counter._address == a_address
    
                future = counter.increment(separate_thread=separate_thread)
                assert isinstance(future, ActorFuture)
                assert "Future" in type(future).__name__
                end = future.result(timeout=1)
                assert end > start
    
            futures = [c.submit(f, counter, pure=False) for _ in range(10)]
            await c.gather(futures)
    
            counter = await counter
            assert await counter.n == 10
    
>       test()

distributed/tests/test_actor.py:137: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/utils_test.py:994: in test_func
    result = loop.run_sync(
/usr/lib64/python3.10/site-packages/tornado/ioloop.py:530: in run_sync
    return future_cell[0].result()
distributed/utils_test.py:953: in coro
    result = await future
/usr/lib64/python3.10/asyncio/tasks.py:447: in wait_for
    return fut.result()
distributed/tests/test_actor.py:132: in test
    await c.gather(futures)
distributed/client.py:1831: in _gather
    raise exception.with_traceback(traceback)
distributed/tests/test_actor.py:125: in f
    future = counter.increment(separate_thread=separate_thread)
distributed/actor.py:171: in func
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
    super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    """Event loop mixins."""
    
    import threading
    from . import events
    
    _global_lock = threading.Lock()
    
    # Used as a sentinel for loop parameter
    _marker = object()
    
    
    class _LoopBoundMixin:
        _loop = None
    
        def __init__(self, *, loop=_marker):
            if loop is not _marker:
>               raise TypeError(
                    f'As of 3.10, the *loop* parameter was removed from '
                    f'{type(self).__name__}() since it is no longer necessary'
                )
E               TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary

/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
----------------------------- Captured stderr call -----------------------------
distributed.worker - WARNING - Compute Failed
Function:  f
args:      (<Actor: Counter, key=Counter-caf6a97c-07e0-4712-9868-bc65337937c2>)
kwargs:    {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')

distributed.worker - WARNING - Compute Failed
Function:  f
args:      (<Actor: Counter, key=Counter-caf6a97c-07e0-4712-9868-bc65337937c2>)
kwargs:    {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')

distributed.worker - WARNING - Compute Failed
Function:  f
args:      (<Actor: Counter, key=Counter-caf6a97c-07e0-4712-9868-bc65337937c2>)
kwargs:    {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')

distributed.worker - WARNING - Compute Failed
Function:  f
args:      (<Actor: Counter, key=Counter-caf6a97c-07e0-4712-9868-bc65337937c2>)
kwargs:    {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')

____________________________ test_exceptions_method ____________________________

c = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:39615" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:33307', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:43653', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_exceptions_method(c, s, a, b):
        class Foo:
            def throw(self):
                1 / 0
    
        foo = await c.submit(Foo, actor=True)
        with pytest.raises(ZeroDivisionError):
>           await foo.throw()

distributed/tests/test_actor.py:202: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/actor.py:171: in func
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
    super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce0ccefe0>

    def __init__(self, *, loop=_marker):
        if loop is not _marker:
>           raise TypeError(
                f'As of 3.10, the *loop* parameter was removed from '
                f'{type(self).__name__}() since it is no longer necessary'
            )
E           TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary

/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
__________________________________ test_sync ___________________________________

client = <Client: 'tcp://127.0.0.1:37671' processes=2 threads=2, memory=29.19 GiB>

    def test_sync(client):
        counter = client.submit(Counter, actor=True)
        counter = counter.result()
    
        assert counter.n == 0
    
>       future = counter.increment()

distributed/tests/test_actor.py:270: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/actor.py:171: in func
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
    super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce27e7f10>

    def __init__(self, *, loop=_marker):
        if loop is not _marker:
>           raise TypeError(
                f'As of 3.10, the *loop* parameter was removed from '
                f'{type(self).__name__}() since it is no longer necessary'
            )
E           TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary

/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
---------------------------- Captured stderr setup -----------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:37671
distributed.scheduler - INFO -   dashboard at:            127.0.0.1:8787
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:35683
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:39187
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:35683
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:39187
distributed.worker - INFO -          dashboard at:            127.0.0.1:41155
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:37671
distributed.worker - INFO -          dashboard at:            127.0.0.1:45103
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:37671
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                  14.60 GiB
distributed.worker - INFO -       Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-78907aa0-75bd-40e2-b1b3-243974e7c2b1/dask-worker-space/worker-mvxlpz9l
distributed.worker - INFO -                Memory:                  14.60 GiB
distributed.worker - INFO -       Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-6f4a430f-7fcf-4c68-af86-a61a18175dee/dask-worker-space/worker-t9pkf1a3
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:39187', name: tcp://127.0.0.1:39187, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:39187
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:37671
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:35683', name: tcp://127.0.0.1:35683, memory: 0, processing: 0>
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:35683
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:37671
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-0438483e-1f0d-11ec-96e6-94de8078e5f9
distributed.core - INFO - Starting established connection
--------------------------- Captured stderr teardown ---------------------------
distributed.scheduler - INFO - Remove client Client-0438483e-1f0d-11ec-96e6-94de8078e5f9
distributed.scheduler - INFO - Remove client Client-0438483e-1f0d-11ec-96e6-94de8078e5f9
distributed.scheduler - INFO - Close client connection: Client-0438483e-1f0d-11ec-96e6-94de8078e5f9
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:39187
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:35683
_____________________________ test_numpy_roundtrip _____________________________

c = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:46169" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:34601', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:35397', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_numpy_roundtrip(c, s, a, b):
        np = pytest.importorskip("numpy")
    
        server = await c.submit(ParameterServer, actor=True)
    
        x = np.random.random(1000)
>       await server.put("x", x)

distributed/tests/test_actor.py:312: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/actor.py:171: in func
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
    super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce0a40be0>

    def __init__(self, *, loop=_marker):
        if loop is not _marker:
>           raise TypeError(
                f'As of 3.10, the *loop* parameter was removed from '
                f'{type(self).__name__}() since it is no longer necessary'
            )
E           TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary

/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
_________________________ test_numpy_roundtrip_getattr _________________________

c = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:38951" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:44113', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:38615', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_numpy_roundtrip_getattr(c, s, a, b):
        np = pytest.importorskip("numpy")
    
        counter = await c.submit(Counter, actor=True)
    
        x = np.random.random(1000)
    
>       await counter.add(x)

distributed/tests/test_actor.py:327: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/actor.py:171: in func
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
    super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce0a43100>

    def __init__(self, *, loop=_marker):
        if loop is not _marker:
>           raise TypeError(
                f'As of 3.10, the *loop* parameter was removed from '
                f'{type(self).__name__}() since it is no longer necessary'
            )
E           TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary

/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
____________________________ test_many_computations ____________________________

c = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:35331" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:46325', 0, Status.closed, stored: 1, running: 0/1, ready: 0, comm: 0, waiting: -3>
b = <Worker: 'tcp://127.0.0.1:41811', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_many_computations(c, s, a, b):
        counter = await c.submit(Counter, actor=True)
    
        def add(n, counter):
            for i in range(n):
                counter.increment().result()
    
        futures = c.map(add, range(10), counter=counter)
        done = c.submit(lambda x: None, futures)
    
        while not done.done():
            assert len(s.processing) <= a.nthreads + b.nthreads
            await asyncio.sleep(0.01)
    
>       await done

distributed/tests/test_actor.py:370: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:240: in _result
    raise exc.with_traceback(tb)
/usr/lib/python3.10/site-packages/dask/utils.py:35: in apply
    return func(*args, **kwargs)
distributed/tests/test_actor.py:361: in add
    counter.increment().result()
distributed/actor.py:171: in func
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
    super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    """Event loop mixins."""
    
    import threading
    from . import events
    
    _global_lock = threading.Lock()
    
    # Used as a sentinel for loop parameter
    _marker = object()
    
    
    class _LoopBoundMixin:
        _loop = None
    
        def __init__(self, *, loop=_marker):
            if loop is not _marker:
>               raise TypeError(
                    f'As of 3.10, the *loop* parameter was removed from '
                    f'{type(self).__name__}() since it is no longer necessary'
                )
E               TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary

/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
----------------------------- Captured stderr call -----------------------------
distributed.worker - WARNING - Compute Failed
Function:  execute_task
args:      ((<function apply at 0x7f1d643b75b0>, <function test_many_computations.<locals>.add at 0x7f1ce0af16c0>, (<class 'tuple'>, [1]), {'counter': <Actor: Counter, key=Counter-c0d818c3-6c8c-4090-90d3-5df7d7aea5d2>}))
kwargs:    {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')

distributed.worker - WARNING - Compute Failed
Function:  execute_task
args:      ((<function apply at 0x7f1d643b75b0>, <function test_many_computations.<locals>.add at 0x7f1ce0af1750>, (<class 'tuple'>, [2]), {'counter': <Actor: Counter, key=Counter-c0d818c3-6c8c-4090-90d3-5df7d7aea5d2>}))
kwargs:    {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')

distributed.worker - WARNING - Compute Failed
Function:  execute_task
args:      ((<function apply at 0x7f1d643b75b0>, <function test_many_computations.<locals>.add at 0x7f1ce0af1870>, (<class 'tuple'>, [4]), {'counter': <Actor: Counter, key=Counter-c0d818c3-6c8c-4090-90d3-5df7d7aea5d2>}))
kwargs:    {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')

distributed.worker - WARNING - Compute Failed
Function:  execute_task
args:      ((<function apply at 0x7f1d643b75b0>, <function test_many_computations.<locals>.add at 0x7f1ce0af17e0>, (<class 'tuple'>, [3]), {'counter': <Actor: Counter, key=Counter-c0d818c3-6c8c-4090-90d3-5df7d7aea5d2>}))
kwargs:    {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')

distributed.worker - WARNING - Compute Failed
Function:  execute_task
args:      ((<function apply at 0x7f1d643b75b0>, <function test_many_computations.<locals>.add at 0x7f1ce0af1990>, (<class 'tuple'>, [9]), {'counter': <Actor: Counter, key=Counter-c0d818c3-6c8c-4090-90d3-5df7d7aea5d2>}))
kwargs:    {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')

distributed.worker - WARNING - Compute Failed
Function:  execute_task
args:      ((<function apply at 0x7f1d643b75b0>, <function test_many_computations.<locals>.add at 0x7f1ce0af1900>, (<class 'tuple'>, [5]), {'counter': <Actor: Counter, key=Counter-c0d818c3-6c8c-4090-90d3-5df7d7aea5d2>}))
kwargs:    {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')

distributed.worker - WARNING - Compute Failed
Function:  execute_task
args:      ((<function apply at 0x7f1d643b75b0>, <function test_many_computations.<locals>.add at 0x7f1ce0af1a20>, (<class 'tuple'>, [6]), {'counter': <Actor: Counter, key=Counter-c0d818c3-6c8c-4090-90d3-5df7d7aea5d2>}))
kwargs:    {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')

______________________________ test_thread_safety ______________________________

c = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:38121" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:35701', 0, Status.closed, stored: 0, running: 0/5, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:42911', 1, Status.closed, stored: 0, running: 0/5, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True, nthreads=[("127.0.0.1", 5)] * 2)
    async def test_thread_safety(c, s, a, b):
        class Unsafe:
            def __init__(self):
                self.n = 0
    
            def f(self):
                assert self.n == 0
                self.n += 1
    
                for i in range(20):
                    sleep(0.002)
                    assert self.n == 1
                self.n = 0
    
        unsafe = await c.submit(Unsafe, actor=True)
    
>       futures = [unsafe.f() for i in range(10)]

distributed/tests/test_actor.py:390: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/tests/test_actor.py:390: in <listcomp>
    futures = [unsafe.f() for i in range(10)]
distributed/actor.py:171: in func
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
    super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce0ab33a0>

    def __init__(self, *, loop=_marker):
        if loop is not _marker:
>           raise TypeError(
                f'As of 3.10, the *loop* parameter was removed from '
                f'{type(self).__name__}() since it is no longer necessary'
            )
E           TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary

/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
______________________________ test_compute_sync _______________________________

client = <Client: 'tcp://127.0.0.1:36651' processes=2 threads=2, memory=29.19 GiB>

    def test_compute_sync(client):
        @dask.delayed
        def f(n, counter):
            assert isinstance(counter, Actor), type(counter)
            for i in range(n):
                counter.increment().result()
    
        @dask.delayed
        def check(counter, blanks):
            return counter.n
    
        counter = dask.delayed(Counter)()
        values = [f(i, counter) for i in range(5)]
        final = check(counter, values)
    
>       result = final.compute(actors=counter)

distributed/tests/test_actor.py:517: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/lib/python3.10/site-packages/dask/base.py:288: in compute
    (result,) = compute(self, traverse=False, **kwargs)
/usr/lib/python3.10/site-packages/dask/base.py:570: in compute
    results = schedule(dsk, keys, **kwargs)
distributed/client.py:2689: in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
distributed/client.py:1966: in gather
    return self.sync(
distributed/client.py:860: in sync
    return sync(
distributed/utils.py:326: in sync
    raise exc.with_traceback(tb)
distributed/utils.py:309: in f
    result[0] = yield future
/usr/lib64/python3.10/site-packages/tornado/gen.py:762: in run
    value = future.result()
distributed/client.py:1831: in _gather
    raise exception.with_traceback(traceback)
distributed/tests/test_actor.py:507: in f
    counter.increment().result()
distributed/actor.py:171: in func
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
    super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    """Event loop mixins."""
    
    import threading
    from . import events
    
    _global_lock = threading.Lock()
    
    # Used as a sentinel for loop parameter
    _marker = object()
    
    
    class _LoopBoundMixin:
        _loop = None
    
        def __init__(self, *, loop=_marker):
            if loop is not _marker:
>               raise TypeError(
                    f'As of 3.10, the *loop* parameter was removed from '
                    f'{type(self).__name__}() since it is no longer necessary'
                )
E               TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary

/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
---------------------------- Captured stderr setup -----------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:36651
distributed.scheduler - INFO -   dashboard at:            127.0.0.1:8787
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:33679
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:33679
distributed.worker - INFO -          dashboard at:            127.0.0.1:40821
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:45345
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:36651
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:45345
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -          dashboard at:            127.0.0.1:34307
distributed.worker - INFO -                Memory:                  14.60 GiB
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:36651
distributed.worker - INFO -       Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-1fffd77f-f100-4a17-9e5a-d77117486e22/dask-worker-space/worker-l9zckx4t
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -                Memory:                  14.60 GiB
distributed.worker - INFO -       Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-8c6e926f-438f-4afc-ad01-374c2004961e/dask-worker-space/worker-t278frlt
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:45345', name: tcp://127.0.0.1:45345, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:45345
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:36651
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:33679', name: tcp://127.0.0.1:33679, memory: 0, processing: 0>
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:33679
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:36651
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-06bc3801-1f0d-11ec-96e6-94de8078e5f9
distributed.core - INFO - Starting established connection
----------------------------- Captured stderr call -----------------------------
distributed.worker - WARNING - Compute Failed
Function:  f
args:      (2, <Actor: Counter, key=Counter-c2657ca9-a782-49ff-bdf2-068eb69118fa>)
kwargs:    {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')

distributed.worker - WARNING - Compute Failed
Function:  f
args:      (3, <Actor: Counter, key=Counter-c2657ca9-a782-49ff-bdf2-068eb69118fa>)
kwargs:    {}
Exception: TypeError('As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary')

--------------------------- Captured stderr teardown ---------------------------
distributed.scheduler - INFO - Receive client connection: Client-worker-06c2427c-1f0d-11ec-ac0c-94de8078e5f9
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-06bc3801-1f0d-11ec-96e6-94de8078e5f9
distributed.scheduler - INFO - Remove client Client-06bc3801-1f0d-11ec-96e6-94de8078e5f9
distributed.scheduler - INFO - Close client connection: Client-06bc3801-1f0d-11ec-96e6-94de8078e5f9
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:45345
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:33679
distributed.scheduler - INFO - Remove client Client-worker-06c2427c-1f0d-11ec-ac0c-94de8078e5f9
distributed.scheduler - INFO - Remove client Client-worker-06c2427c-1f0d-11ec-ac0c-94de8078e5f9
distributed.scheduler - INFO - Close client connection: Client-worker-06c2427c-1f0d-11ec-ac0c-94de8078e5f9
distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:45345', name: tcp://127.0.0.1:45345, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:45345
____________________________ test_actors_in_profile ____________________________

c = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:44303" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:41441', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(
        client=True,
        nthreads=[("127.0.0.1", 1)],
        config={"distributed.worker.profile.interval": "1ms"},
    )
    async def test_actors_in_profile(c, s, a):
        class Sleeper:
            def sleep(self, time):
                sleep(time)
    
        sleeper = await c.submit(Sleeper, actor=True)
    
        for i in range(5):
>           await sleeper.sleep(0.200)

distributed/tests/test_actor.py:542: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/actor.py:171: in func
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
    super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce29fa800>

    def __init__(self, *, loop=_marker):
        if loop is not _marker:
>           raise TypeError(
                f'As of 3.10, the *loop* parameter was removed from '
                f'{type(self).__name__}() since it is no longer necessary'
            )
E           TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary

/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
_________________________________ test_waiter __________________________________

c = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:38473" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:36645', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:41535', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_waiter(c, s, a, b):
        from tornado.locks import Event
    
        class Waiter:
            def __init__(self):
                self.event = Event()
    
            async def set(self):
                self.event.set()
    
            async def wait(self):
                await self.event.wait()
    
        waiter = await c.submit(Waiter, actor=True)
    
>       futures = [waiter.wait() for _ in range(5)]  # way more than we have actor threads

distributed/tests/test_actor.py:567: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/tests/test_actor.py:567: in <listcomp>
    futures = [waiter.wait() for _ in range(5)]  # way more than we have actor threads
distributed/actor.py:171: in func
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
    super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce08c2e00>

    def __init__(self, *, loop=_marker):
        if loop is not _marker:
>           raise TypeError(
                f'As of 3.10, the *loop* parameter was removed from '
                f'{type(self).__name__}() since it is no longer necessary'
            )
E           TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary

/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
___________________________ test_one_thread_deadlock ___________________________

    def test_one_thread_deadlock():
        with cluster(nworkers=2) as (cl, w):
            client = Client(cl["address"])
            ac = client.submit(Counter, actor=True).result()
            ac2 = client.submit(UsesCounter, actor=True, workers=[ac._address]).result()
    
>           assert ac2.do_inc(ac).result() == 1

distributed/tests/test_actor.py:638: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/actor.py:171: in func
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
    super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce0806800>

    def __init__(self, *, loop=_marker):
        if loop is not _marker:
>           raise TypeError(
                f'As of 3.10, the *loop* parameter was removed from '
                f'{type(self).__name__}() since it is no longer necessary'
            )
E           TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary

/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
----------------------------- Captured stderr call -----------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:33349
distributed.scheduler - INFO -   dashboard at:            127.0.0.1:8787
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:33771
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:33771
distributed.worker - INFO -          dashboard at:            127.0.0.1:44169
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:33349
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                  14.60 GiB
distributed.worker - INFO -       Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-838106ce-ee65-41e3-840a-128a2f1effa2/dask-worker-space/worker-ptpi26ti
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:38899
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:38899
distributed.worker - INFO -          dashboard at:            127.0.0.1:44161
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:33349
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                  14.60 GiB
distributed.worker - INFO -       Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-b2caa31c-60c9-4f36-8188-d3ab297a0e43/dask-worker-space/worker-xbwfvymy
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:33771', name: tcp://127.0.0.1:33771, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:33771
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:33349
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:38899', name: tcp://127.0.0.1:38899, memory: 0, processing: 0>
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:38899
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:33349
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-0943bc49-1f0d-11ec-96e6-94de8078e5f9
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:38899
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:33771
_____________________________ test_async_deadlock ______________________________

client = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:38717" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:40435', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:46501', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_async_deadlock(client, s, a, b):
        ac = await client.submit(Counter, actor=True)
        ac2 = await client.submit(UsesCounter, actor=True, workers=[ac._address])
    
>       assert (await ac2.ado_inc(ac)) == 1

distributed/tests/test_actor.py:646: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/actor.py:171: in func
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
    super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce0c326b0>

    def __init__(self, *, loop=_marker):
        if loop is not _marker:
>           raise TypeError(
                f'As of 3.10, the *loop* parameter was removed from '
                f'{type(self).__name__}() since it is no longer necessary'
            )
E           TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary

/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
________________________________ test_exception ________________________________

    def test_exception():
        class MyException(Exception):
            pass
    
        class Broken:
            def method(self):
                raise MyException
    
            @property
            def prop(self):
                raise MyException
    
        with cluster(nworkers=2) as (cl, w):
            client = Client(cl["address"])
            ac = client.submit(Broken, actor=True).result()
>           acfut = ac.method()

distributed/tests/test_actor.py:664: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/actor.py:171: in func
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
    super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce294fe20>

    def __init__(self, *, loop=_marker):
        if loop is not _marker:
>           raise TypeError(
                f'As of 3.10, the *loop* parameter was removed from '
                f'{type(self).__name__}() since it is no longer necessary'
            )
E           TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary

/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
----------------------------- Captured stderr call -----------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:46347
distributed.scheduler - INFO -   dashboard at:            127.0.0.1:8787
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:35825
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:35825
distributed.worker - INFO -          dashboard at:            127.0.0.1:39041
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:46347
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                  14.60 GiB
distributed.worker - INFO -       Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-0469507d-635a-4bda-84b1-cc78dc041b8d/dask-worker-space/worker-1ssb7imh
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:42665
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:42665
distributed.worker - INFO -          dashboard at:            127.0.0.1:34853
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:46347
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                  14.60 GiB
distributed.worker - INFO -       Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-3e305d6e-ccf9-43b1-8d37-f83b0aed071b/dask-worker-space/worker-9572kryt
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:35825', name: tcp://127.0.0.1:35825, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:35825
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:46347
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:42665', name: tcp://127.0.0.1:42665, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:42665
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:46347
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-0a0243bd-1f0d-11ec-96e6-94de8078e5f9
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:42665
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:35825
_____________________________ test_exception_async _____________________________

client = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:45745" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:42413', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:41479', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_exception_async(client, s, a, b):
        class MyException(Exception):
            pass
    
        class Broken:
            def method(self):
                raise MyException
    
            @property
            def prop(self):
                raise MyException
    
        ac = await client.submit(Broken, actor=True)
>       acfut = ac.method()

distributed/tests/test_actor.py:686: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/actor.py:171: in func
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
    super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce0804fa0>

    def __init__(self, *, loop=_marker):
        if loop is not _marker:
>           raise TypeError(
                f'As of 3.10, the *loop* parameter was removed from '
                f'{type(self).__name__}() since it is no longer necessary'
            )
E           TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary

/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
______________________________ test_as_completed _______________________________

client = <Client: 'tcp://127.0.0.1:43751' processes=2 threads=2, memory=29.19 GiB>

    def test_as_completed(client):
        ac = client.submit(Counter, actor=True).result()
>       futures = [ac.increment() for _ in range(10)]

distributed/tests/test_actor.py:696: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/tests/test_actor.py:696: in <listcomp>
    futures = [ac.increment() for _ in range(10)]
distributed/actor.py:171: in func
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
    super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce291dea0>

    def __init__(self, *, loop=_marker):
        if loop is not _marker:
>           raise TypeError(
                f'As of 3.10, the *loop* parameter was removed from '
                f'{type(self).__name__}() since it is no longer necessary'
            )
E           TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary

/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError
---------------------------- Captured stderr setup -----------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:43751
distributed.scheduler - INFO -   dashboard at:            127.0.0.1:8787
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:39947
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:45859
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:39947
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:45859
distributed.worker - INFO -          dashboard at:            127.0.0.1:46029
distributed.worker - INFO -          dashboard at:            127.0.0.1:46737
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:43751
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:43751
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                  14.60 GiB
distributed.worker - INFO -                Memory:                  14.60 GiB
distributed.worker - INFO -       Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-3d0a84ef-157a-4e6b-bb77-c6d910536ac5/dask-worker-space/worker-p7yismsg
distributed.worker - INFO -       Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-ed20cd89-e988-4711-a24d-5a989a7cb262/dask-worker-space/worker-_36j_yjl
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:39947', name: tcp://127.0.0.1:39947, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:39947
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:43751
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:45859', name: tcp://127.0.0.1:45859, memory: 0, processing: 0>
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:45859
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:43751
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-0ac04121-1f0d-11ec-96e6-94de8078e5f9
distributed.core - INFO - Starting established connection
--------------------------- Captured stderr teardown ---------------------------
distributed.scheduler - INFO - Remove client Client-0ac04121-1f0d-11ec-96e6-94de8078e5f9
distributed.scheduler - INFO - Remove client Client-0ac04121-1f0d-11ec-96e6-94de8078e5f9
distributed.scheduler - INFO - Close client connection: Client-0ac04121-1f0d-11ec-96e6-94de8078e5f9
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:45859
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:39947
_________________________ test_actor_future_awaitable __________________________

client = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:37203" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:34641', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:38447', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True, timeout=3)
    async def test_actor_future_awaitable(client, s, a, b):
        ac = await client.submit(Counter, actor=True)
>       futures = [ac.increment() for _ in range(10)]

distributed/tests/test_actor.py:711: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/tests/test_actor.py:711: in <listcomp>
    futures = [ac.increment() for _ in range(10)]
distributed/actor.py:171: in func
    q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
/usr/lib64/python3.10/asyncio/queues.py:33: in __init__
    super().__init__(loop=loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <[AttributeError("'Queue' object has no attribute '_maxsize'") raised in repr()] Queue object at 0x7f1ce08c8a90>

    def __init__(self, *, loop=_marker):
        if loop is not _marker:
>           raise TypeError(
                f'As of 3.10, the *loop* parameter was removed from '
                f'{type(self).__name__}() since it is no longer necessary'
            )
E           TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary

/usr/lib64/python3.10/asyncio/mixins.py:17: TypeError

I believe this may be the cause of these failures on Python 3.10 as well, but I'm not sure:

Additional failures
______________________ test_client_gather_semaphore_loop _______________________

s = <Scheduler: "tcp://127.0.0.1:46375" workers: 0 cores: 0, tasks: 0>

    @gen_cluster(nthreads=[])
    async def test_client_gather_semaphore_loop(s):
        async with Client(s.address, asynchronous=True) as c:
>           assert c._gather_semaphore._loop is c.loop.asyncio_loop
E           AssertionError: assert None is <_UnixSelectorEventLoop running=True closed=False debug=False>
E            +  where None = <asyncio.locks.Semaphore object at 0x7f4627863fa0 [unlocked, value:5]>._loop
E            +    where <asyncio.locks.Semaphore object at 0x7f4627863fa0 [unlocked, value:5]> = <Client: 'tcp://127.0.0.1:46375' processes=0 threads=0, memory=0 B>._gather_semaphore
E            +  and   <_UnixSelectorEventLoop running=True closed=False debug=False> = <tornado.platform.asyncio.AsyncIOLoop object at 0x7f462614cbe0>.asyncio_loop
E            +    where <tornado.platform.asyncio.AsyncIOLoop object at 0x7f462614cbe0> = <Client: 'tcp://127.0.0.1:46375' processes=0 threads=0, memory=0 B>.loop

distributed/tests/test_client.py:6306: AssertionError
_______________________ test_as_completed_condition_loop _______________________

c = <Client: No scheduler connected>
s = <Scheduler: "tcp://127.0.0.1:45711" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:38873', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:37335', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_as_completed_condition_loop(c, s, a, b):
        seq = c.map(inc, range(5))
        ac = as_completed(seq)
>       assert ac.condition._loop == c.loop.asyncio_loop
E       assert None == <_UnixSelectorEventLoop running=True closed=False debug=False>
E         +None
E         -<_UnixSelectorEventLoop running=True closed=False debug=False>

distributed/tests/test_client.py:6313: AssertionError
__________________ test_client_connectionpool_semaphore_loop ___________________

s = {'address': 'tcp://127.0.0.1:43473'}
a = {'address': 'tcp://127.0.0.1:33689', 'proc': <weakref at 0x7f4627d41530; to 'SpawnProcess' at 0x7f462644eb30>}
b = {'address': 'tcp://127.0.0.1:39419', 'proc': <weakref at 0x7f4627d422f0; to 'SpawnProcess' at 0x7f462644d660>}

    def test_client_connectionpool_semaphore_loop(s, a, b):
        with Client(s["address"]) as c:
>           assert c.rpc.semaphore._loop is c.loop.asyncio_loop
E           AssertionError: assert None is <_UnixSelectorEventLoop running=True closed=False debug=False>
E            +  where None = <asyncio.locks.Semaphore object at 0x7f462644d750 [unlocked, value:511]>._loop
E            +    where <asyncio.locks.Semaphore object at 0x7f462644d750 [unlocked, value:511]> = <ConnectionPool: open=1, active=0, connecting=0>.semaphore
E            +      where <ConnectionPool: open=1, active=0, connecting=0> = <Client: 'tcp://127.0.0.1:43473' processes=2 threads=2, memory=29.19 GiB>.rpc
E            +  and   <_UnixSelectorEventLoop running=True closed=False debug=False> = <tornado.platform.asyncio.AsyncIOLoop object at 0x7f462644e530>.asyncio_loop
E            +    where <tornado.platform.asyncio.AsyncIOLoop object at 0x7f462644e530> = <Client: 'tcp://127.0.0.1:43473' processes=2 threads=2, memory=29.19 GiB>.loop

distributed/tests/test_client.py:6318: AssertionError
---------------------------- Captured stderr setup -----------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:43473
distributed.scheduler - INFO -   dashboard at:            127.0.0.1:8787
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:33689
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:33689
distributed.worker - INFO -          dashboard at:            127.0.0.1:41495
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:43473
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                  14.60 GiB
distributed.worker - INFO -       Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-dffa0c91-c988-4176-a341-b68d74cf767c/dask-worker-space/worker-ufu1wyyt
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:39419
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:39419
distributed.worker - INFO -          dashboard at:            127.0.0.1:34383
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:43473
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                  14.60 GiB
distributed.worker - INFO -       Local Directory: /builddir/build/BUILD/distributed-2021.09.1/_test_worker-b875dfbf-ec80-4e6c-9187-c2e45ce5ba0f/dask-worker-space/worker-vzoxfiuz
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:33689', name: tcp://127.0.0.1:33689, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:33689
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:43473
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:39419', name: tcp://127.0.0.1:39419, memory: 0, processing: 0>
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:39419
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:43473
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
----------------------------- Captured stderr call -----------------------------
distributed.scheduler - INFO - Receive client connection: Client-8d7b49bd-1f09-11ec-a046-94de8078e5f9
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-8d7b49bd-1f09-11ec-a046-94de8078e5f9
distributed.scheduler - INFO - Remove client Client-8d7b49bd-1f09-11ec-a046-94de8078e5f9
distributed.scheduler - INFO - Close client connection: Client-8d7b49bd-1f09-11ec-a046-94de8078e5f9
--------------------------- Captured stderr teardown ---------------------------
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:39419
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:33689

What you expected to happen:
Tests pass.

Minimal Complete Verifiable Example:

$ DISABLE_IPV6=1 PYTHONDONTWRITEBYTECODE=1 \
    pytest -m 'not avoid_ci and not slow and not asyncio' -vra

Anything else we need to know?:

Environment:

  • Dask version: 2021.9.1
  • Python version: 3.10
  • Operating System: Fedora Rawhide
  • Install method (conda, pip, source): Source

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions