-
-
Notifications
You must be signed in to change notification settings - Fork 750
Description
What happened: I've been trying to debug a 2-node system, and figure out why only 1 node gets any work. The first node gets a few long-running tasks (which never exit), all of which secede. The second node ends up computing all subsequent work. The first never steals any, nor does anything of significance. I think this is an occupancy issue, as one machine has an occupancy of 3125586.02 and the other only 13805.84.
What you expected to happen: Both nodes should be fully loaded with work all the time. The long-running tasks call secede() correctly, and spawn a bunch of short-term tasks, but all work ends up on the worker without the long-running tasks after some time.
Minimal Complete Verifiable Example:
import collections
import dask.distributed
import subprocess
import time
def long_task():
# Note that we secede!
print(f'long_task on {dask.distributed.get_worker().address}')
with dask.distributed.worker_client() as client:
# Every second, emit some tasks
while True:
time.sleep(1.)
futures = [
client.submit(short_task, pure=False)
for _ in range(100)]
workers = collections.defaultdict(int)
for f in futures:
workers[f.result()] += 1
print(workers)
def short_task():
w = dask.distributed.get_worker()
time.sleep(0.1)
return w.address
def main():
ps = subprocess.Popen(['dask-scheduler', '--host', '127.0.0.1',
'--port', '2005'])
pa = subprocess.Popen(['dask-worker', '--nthreads', '2',
'--local-directory', '/tmp/a', '127.0.0.1:2005'])
pb = subprocess.Popen(['dask-worker', '--nthreads', '2',
'--local-directory', '/tmp/b', '127.0.0.1:2005'])
c = dask.distributed.Client('127.0.0.1:2005')
time.sleep(1)
f = c.submit(long_task, pure=False)
time.sleep(60)
pa.kill(); pa.wait()
pb.kill(); pb.wait()
ps.kill(); ps.wait()
if __name__ == '__main__':
main()Output:
long_task on tcp://127.0.0.1:41187
distributed.scheduler - INFO - Receive client connection: Client-worker-5bd327ac-1ae5-11ec-92f6-c0b8837e1558
distributed.core - INFO - Starting established connection
defaultdict(<class 'int'>, {'tcp://127.0.0.1:44785': 70, 'tcp://127.0.0.1:41187': 30})
defaultdict(<class 'int'>, {'tcp://127.0.0.1:44785': 99, 'tcp://127.0.0.1:41187': 1})
defaultdict(<class 'int'>, {'tcp://127.0.0.1:44785': 99, 'tcp://127.0.0.1:41187': 1})
defaultdict(<class 'int'>, {'tcp://127.0.0.1:44785': 100})
defaultdict(<class 'int'>, {'tcp://127.0.0.1:44785': 100})
defaultdict(<class 'int'>, {'tcp://127.0.0.1:44785': 100})
defaultdict(<class 'int'>, {'tcp://127.0.0.1:44785': 100})
defaultdict(<class 'int'>, {'tcp://127.0.0.1:44785': 100})
defaultdict(<class 'int'>, {'tcp://127.0.0.1:44785': 100})
Anything else we need to know?: Latest dask/distributed