Skip to content

Conversation

@Kobzol
Copy link
Contributor

@Kobzol Kobzol commented May 18, 2020

This PR attempts to simplify the low-level encoding of Dask protocol messages into frames to make it easier to implement handling of the protocol in a statically typed language (like Rust). There are of course multiple ways of solving this issue, in this PR I'll attempt to describe the issue and offer a possible solution that we have used for our Rust scheduler.

The issue

The current implementation of the dumps function in protocol/cores.py takes a Python message (usually a dictionary), extracts serialized/serializable values out of the message, puts them into a series of frames and creates headers that describe how to reconstruct the original message from the frame stream. For example, if the input message has a "function" attribute which contains serialized data, Dask will remove the attribute, create a frame with its serialized data and describe how to put the "function" attribute back into the message in a header which is stored into a separate frame. This is displayed here in the top right corner:
image

The problematic part is message reconstruction during deserialization. A natural representation of Dask message types in a statically typed language is to use data structures which strictly describe the various message types, as opposed to loosely typed dictionaries with arbitrary types inside. An example from Rust (simplified):

struct DaskUpdateGraphTask {
   function: FunctionDefinition,
   arguments: ArgumentsDefinition,
   ...
}

If Dask clients/workers can remove arbitrary attributes from these messages and we have to support arbitrary data structure modification queries like put <something> into "key1" field of the 0th element of the "key2" field of the input message, we are pretty much left with no choice than to go back to untyped dictionaries, which defeats one of the benefits of statically typed languages, especially for such critical component of the scheduler. It's trivial to support these queries in a dynamic language, but in e.g. Rust it's incredibly painful.

Proposed solution

Instead of removing the attributes of the Dask protocol messages and then putting them back in, we propose to keep the original message structure, but replace the attribute values with placeholders (see example image above, bottom right corner). A value which has to be offloaded will be replaced with a placeholder dictionary on the wire, which describes from which frames should the value be reconstructed (frame index + frame count) and how (header). With this approach we can model the original message structure and use a generic enumeration (like InPlace/Offloaded) type for attributes that can be offloaded (potentially for all attributes). Most importantly, we don't have to implement arbitrary data structure modification queries. In our specific implementation, we have two versions of message types that can contain offloadable attributes which can be converted between each other if you provide them a frame array. Here's a pseudocode of how that could look like:

struct TaskDefinitionOffloaded {
     function: Offloadable<FunctionDefinition>
}
struct TaskDefinition {
     function: FunctionDefinition
}

impl TaskDefinitionOffloaded {
     fn to_task_def(frames: &[Frame]) {
         // reconstruct TaskDefinition from TaskDefinitionOffloaded by reading data
         // from `frames`
    }
}

Implementation

In our implementation, the placeholder is a dictionary with three "magic" keys, the number of frames of the offloaded value, the index of the first frame in the resulting frame stream and the Dask header containing information about compression and other stuff.

During serialization, we replace each offloaded value with this placeholder.
During deserialization, we check if a value if offloaded (by looking for these magic keys) and if yes, we read the corresponding frame range and reconstruct the original value.

The names of the magic keys are rather arbitrary, they can be probably improved. In our implementation, we send one frame less than in the current encoding, because we do not need the intermediate frame with headers describing how to reconstruct offloaded values. The headers are stored directly in the placeholders. This should not increase message size by itself, as all of the headers are also present in the current encoding, just in a different location. I'm not sure about the frame size limitations though. So far in our experiments it has not caused problems.

Even though our changes were motivated by our inability to implement the old protocol encoding elegantly and performantly in Rust, I'm certain that potential implementations in other statically typed languages (like C++) would face identical problems. I am not sure if our proposed solution is the right one, but I think that in order to make implementations of the scheduler (or other Dask parts) easier in statically typed languages, Dask should use a less "hostile" encoding. It would also help in documenting and understanding of the protocol itself.

Performance

This change is NOT motivated by performance. We have benchmarked Dask both with and without this change in a series of use cases (https://github.com/spirali/rsds/blob/master/scripts/usecases.py) and haven't found regressions, but it probably warrants further investigation. It would be great if some benchmarking infrastructure containing real pipelines was available in CI to test these changes easily (#3783).

Code comments

  • The magic key containing the number of frames can be left out, because it is also present in the header. However since the header is a pretty complex type, I wanted to avoid any need to read it on the server. With a separate key containing the count, the header can be treated as a blackbox on the server.
  • Frames are converted to a list during deserialization unconditionally, because some tests access the frame list even after deserialization. The previous code always converted the frames to a list too, and then reverted the frame list.
  • Originally there was a useful fast path check in deserialization (loads):
if len(frames) < 3:
    return msg

However there are some tests (like distributed/protocol/test_serialize.py::test_empty) that serialize a value without any frames and to make these work I had to omit the check, because I no longer include any administrative frame after the first two frames. It it useful to support serialized values that do not use any frames?

@Kobzol Kobzol marked this pull request as draft May 18, 2020 11:45
@mrocklin
Copy link
Member

Thank you for the PR @Kobzol and for the informative description. I apologize for the delay in response.

In principle I don't object to this change. However, it would be good to get feedback from the NVIDIA folks on this topic. I would prefer to avoid changing the core protocol frequently if we can avoid it, so it would be good to get their thoughts before moving forward.

cc @kkraus14 @quasiben @jacobtomlinson

@jacobtomlinson
Copy link
Member

I understand the motivation behind this and it seems like a good thing to do from that viewpoint. I can't comment on whether we should make other changes. I'll let others weigh in.

cc @madsbk

Base automatically changed from master to main March 8, 2021 19:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants