-
-
Notifications
You must be signed in to change notification settings - Fork 750
Description
Split out from #4937 (comment).
What happened:
When using a local cluster in async mode, get_worker always returns the same Worker instance, no matter which worker it's being called within.
What you expected to happen:
get_worker
Minimal Complete Verifiable Example:
@gen_cluster(client=True)
async def test_get_worker_async_local(c, s, *workers):
assert len(workers) > 1
def whoami():
return get_worker().address
results = await c.run(whoami)
assert len(set(results.values())) == len(workers), results
E AssertionError: {'tcp://127.0.0.1:59774': 'tcp://127.0.0.1:59776', 'tcp://127.0.0.1:59776': 'tcp://127.0.0.1:59776'}
E assert 1 == 2
E +1
E -2Anything else we need to know?:
This probably almost never affects users directly. But since most tests use an async local cluster with @gen_cluster, I'm concerned what edge-case behavior we might testing incorrectly.
Also, note that the same issue affects get_client. This feels a tiny bit less bad (at least it's always the right client, unlikeget_worker), but still can have some strange effects. In multiple places, worker code updates the default Client instance assuming it's in a separate process. With multiple workers trampling the default client, I wonder if this affects tests around advanced secede/client-within-task workloads.
I feel like the proper solution here would be to set a contextvar for the current worker that's updated as we context-switch in and out of that worker. Identifying the points where those switches have to happen seems tricky though.
I also think it would be reasonable for get_worker to error if len(Worker._instances) > 1.
Environment:
- Dask version: ac35e0f
- Python version: 3.9.5
- Operating System: macOS
- Install method (conda, pip, source): source