Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR attempts to simplify the low-level encoding of Dask protocol messages into frames to make it easier to implement handling of the protocol in a statically typed language (like Rust). There are of course multiple ways of solving this issue, in this PR I'll attempt to describe the issue and offer a possible solution that we have used for our Rust scheduler.
The issue
The current implementation of the

dumpsfunction inprotocol/cores.pytakes a Python message (usually a dictionary), extracts serialized/serializable values out of the message, puts them into a series of frames and creates headers that describe how to reconstruct the original message from the frame stream. For example, if the input message has a "function" attribute which contains serialized data, Dask will remove the attribute, create a frame with its serialized data and describe how to put the "function" attribute back into the message in a header which is stored into a separate frame. This is displayed here in the top right corner:The problematic part is message reconstruction during deserialization. A natural representation of Dask message types in a statically typed language is to use data structures which strictly describe the various message types, as opposed to loosely typed dictionaries with arbitrary types inside. An example from Rust (simplified):
If Dask clients/workers can remove arbitrary attributes from these messages and we have to support arbitrary data structure modification queries like
put <something> into "key1" field of the 0th element of the "key2" field of the input message, we are pretty much left with no choice than to go back to untyped dictionaries, which defeats one of the benefits of statically typed languages, especially for such critical component of the scheduler. It's trivial to support these queries in a dynamic language, but in e.g. Rust it's incredibly painful.Proposed solution
Instead of removing the attributes of the Dask protocol messages and then putting them back in, we propose to keep the original message structure, but replace the attribute values with placeholders (see example image above, bottom right corner). A value which has to be offloaded will be replaced with a placeholder dictionary on the wire, which describes from which frames should the value be reconstructed (frame index + frame count) and how (header). With this approach we can model the original message structure and use a generic enumeration (like
InPlace/Offloaded) type for attributes that can be offloaded (potentially for all attributes). Most importantly, we don't have to implement arbitrary data structure modification queries. In our specific implementation, we have two versions of message types that can contain offloadable attributes which can be converted between each other if you provide them a frame array. Here's a pseudocode of how that could look like:Implementation
In our implementation, the placeholder is a dictionary with three "magic" keys, the number of frames of the offloaded value, the index of the first frame in the resulting frame stream and the Dask header containing information about compression and other stuff.
During serialization, we replace each offloaded value with this placeholder.
During deserialization, we check if a value if offloaded (by looking for these magic keys) and if yes, we read the corresponding frame range and reconstruct the original value.
The names of the magic keys are rather arbitrary, they can be probably improved. In our implementation, we send one frame less than in the current encoding, because we do not need the intermediate frame with headers describing how to reconstruct offloaded values. The headers are stored directly in the placeholders. This should not increase message size by itself, as all of the headers are also present in the current encoding, just in a different location. I'm not sure about the frame size limitations though. So far in our experiments it has not caused problems.
Even though our changes were motivated by our inability to implement the old protocol encoding elegantly and performantly in Rust, I'm certain that potential implementations in other statically typed languages (like C++) would face identical problems. I am not sure if our proposed solution is the right one, but I think that in order to make implementations of the scheduler (or other Dask parts) easier in statically typed languages, Dask should use a less "hostile" encoding. It would also help in documenting and understanding of the protocol itself.
Performance
This change is NOT motivated by performance. We have benchmarked Dask both with and without this change in a series of use cases (https://github.com/spirali/rsds/blob/master/scripts/usecases.py) and haven't found regressions, but it probably warrants further investigation. It would be great if some benchmarking infrastructure containing real pipelines was available in CI to test these changes easily (#3783).
Code comments
loads):However there are some tests (like
distributed/protocol/test_serialize.py::test_empty) that serialize a value without any frames and to make these work I had to omit the check, because I no longer include any administrative frame after the first two frames. It it useful to support serialized values that do not use any frames?