Skip to content

Rust implementation of dask-scheduler #3139

@spirali

Description

@spirali

Hello,

We (I and @Kobzol) are working on a Rust implementation of dask-scheduler as an experimental drop-in replacement of dask-scheduler without any modification on worker/client side. It is an experiment for (a) evaluate performance gain of non-Python scheduler scheduler and (b) allow experimentation with different schedulers. Here I would like to report preliminary results for (a).

I am sorry for abusing Github issues, if there is a better place for contacting the community, please redirect us.

Repository: https://github.com/spirali/rsds/tree/master/src
Project status:

  • Server is able to accept client and worker connections and redistribute simple task graphs.
  • rsds distinguishes "runtime" (=part that communicates with workers/clients and maintain service information) and "scheduler" (= part that decides where tasks will run). The scheduler is asynchronous and offloaded into a separated thread. It communicates with a simple procotol with the runtime. The protocol is serializable, hence in the future, the scheduler may be written in a different language than Rust.
  • The current version have only random scheduler that randomly assigns workers to tasks.
  • Propagating exceptions from workers to a client is implemented
  • Failure of client/worker is not yet correctly implemented
  • Many things in the protocol is not implemented
  • We did not actively profile or optimize the code, we are reporting the first running version

Benchmark

We were running the following simple code as a benchmark of a server runtime overhead.

    from dask.distributed import Client                                                                                                                                                                          
    from dask import delayed                                                                                                                                                                                     
                                                                                                                                                                                                                
    client = Client("tcp://localhost:7070")                                                                                                                                                                      
    print("CLIENT", client)                                                                                                                                                                                      
                                                                                                                                                                                                                 
    @delayed                                                                                                                                                                                                     
    def do_something(x):                                                                                                                                                                                         
        return x * 10                                                                                                                                                                                            
                                                                                                                                                                                                                 
    @delayed                                                                                                                                                                                                     
    def merge(*args):                                                                                                                                                                                            
       return sum(args)                                                                                                                                                                                         
                                                                                                                                                                                                                 
   xs = [do_something(x) for x in range(80000)]                                                                                                                                                                 
   result = merge(*xs)                                                                                                                                                                                          
   print(result.compute())                                                                                                                                                                                      

Results

Times are obtained through "time -p python test.py".

1 node / 23 workers
rsds          : 19.09 +/- 0.17
dask-scheduler: 39.19 +/- 1.01


8 nodes / 191 workers (7x24 + 23)
rsds          : 20.74 +/- 2.46
dask-scheduler: 215.19 +/- 20.07

We are aware that the benchmark is far from ideal from many aspects, we would be happy for pointing us on a code that does a better job.

Metadata

Metadata

Assignees

No one assigned

    Labels

    discussionDiscussing a topic with no specific actions yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions