Skip to content

Dataframe Shuffle #1177

@mrocklin

Description

@mrocklin

Currently we use partd to perform on-disk shuffles for dask.dataframe and dask.bag. This works well in the single-machine case but doesn't carry over to the distributed case.

We don't have a dedicated library for distributed shuffle, but we can still accomplish it by abusing the task scheduling framework. If done poorly, this results in n_in_partitions * n_out_partitions tasks, which can be quite large, even with fast task scheduling. 1000 partitions results in 1000000 tasks which, at 250us per task is around 4 minutes of pure scheduling overhead.

Instead, we can replicate the approach of #417 and perform a short sequence of intermediate shuffles. This trades extra full-data memory copies for reduced scheduler overhead.

For example with 1000 in and out partitions we might first arrange each partition into 33 value ranges across 33 random seeds, creating a two-part index. Then we shuffle again and remove the random-seed part of the index but expand out to the full 1000 out-partitions. This happens in 66000 tasks, rather than 1000000. To build intuition, this is equivalent to the dask.array case where we go from row-chunks to column-chunks by passing through square-block-chunks of the same size.

We now have the problem of having a few different options for algorithms, depending on our scheduler. We already sort of have this problem for sorted column data introduced in #1143 . I propose adding a method= keyword argument to set_index that takes values from ['partd', 'sorted', 'tasks', None]. If None then we probably handle things with heuristics. Notably we can if we should use the sorted method by computing the min/max of the new index of each input partition as we compute the percentiles (we're already doing this.) Assuming a non-sorted new index we can select between partd and tasks based on the default scheduler (dask.context._globals['get']) perhaps.

This issue is somewhat tricky and performance sensitive, but would provide us with a reasonable solution for distributed shuffles, a high-value feature.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions