Skip to content

No parallelism with long-running, seceded tasks; high occupancy prevents work assignment? #5332

@wwoods

Description

@wwoods

What happened: I've been trying to debug a 2-node system, and figure out why only 1 node gets any work. The first node gets a few long-running tasks (which never exit), all of which secede. The second node ends up computing all subsequent work. The first never steals any, nor does anything of significance. I think this is an occupancy issue, as one machine has an occupancy of 3125586.02 and the other only 13805.84.

What you expected to happen: Both nodes should be fully loaded with work all the time. The long-running tasks call secede() correctly, and spawn a bunch of short-term tasks, but all work ends up on the worker without the long-running tasks after some time.

Minimal Complete Verifiable Example:

import collections
import dask.distributed
import subprocess
import time

def long_task():
    # Note that we secede!
    print(f'long_task on {dask.distributed.get_worker().address}')
    with dask.distributed.worker_client() as client:
        # Every second, emit some tasks
        while True:
            time.sleep(1.)
            futures = [
                    client.submit(short_task, pure=False)
                    for _ in range(100)]
            workers = collections.defaultdict(int)
            for f in futures:
                workers[f.result()] += 1
            print(workers)



def short_task():
    w = dask.distributed.get_worker()
    time.sleep(0.1)
    return w.address


def main():
    ps = subprocess.Popen(['dask-scheduler', '--host', '127.0.0.1',
            '--port', '2005'])
    pa = subprocess.Popen(['dask-worker', '--nthreads', '2',
            '--local-directory', '/tmp/a', '127.0.0.1:2005'])
    pb = subprocess.Popen(['dask-worker', '--nthreads', '2',
            '--local-directory', '/tmp/b', '127.0.0.1:2005'])

    c = dask.distributed.Client('127.0.0.1:2005')
    time.sleep(1)
    f = c.submit(long_task, pure=False)
    time.sleep(60)

    pa.kill(); pa.wait()
    pb.kill(); pb.wait()
    ps.kill(); ps.wait()


if __name__ == '__main__':
    main()

Output:

long_task on tcp://127.0.0.1:41187
distributed.scheduler - INFO - Receive client connection: Client-worker-5bd327ac-1ae5-11ec-92f6-c0b8837e1558
distributed.core - INFO - Starting established connection
defaultdict(<class 'int'>, {'tcp://127.0.0.1:44785': 70, 'tcp://127.0.0.1:41187': 30})
defaultdict(<class 'int'>, {'tcp://127.0.0.1:44785': 99, 'tcp://127.0.0.1:41187': 1})
defaultdict(<class 'int'>, {'tcp://127.0.0.1:44785': 99, 'tcp://127.0.0.1:41187': 1})
defaultdict(<class 'int'>, {'tcp://127.0.0.1:44785': 100})
defaultdict(<class 'int'>, {'tcp://127.0.0.1:44785': 100})
defaultdict(<class 'int'>, {'tcp://127.0.0.1:44785': 100})
defaultdict(<class 'int'>, {'tcp://127.0.0.1:44785': 100})
defaultdict(<class 'int'>, {'tcp://127.0.0.1:44785': 100})
defaultdict(<class 'int'>, {'tcp://127.0.0.1:44785': 100})

Anything else we need to know?: Latest dask/distributed

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions