-
-
Notifications
You must be signed in to change notification settings - Fork 750
Autocompression #3702
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Autocompression #3702
Conversation
|
Thanks for starting on this @prasunanand . I've added more context to the original issue. I hope that that helps to clarify the objectives: #3656 |
|
Thanks @mrocklin :) . |
|
@mrocklin I request you to review again |
distributed/protocol/compression.py
Outdated
| compression_type = "zlib" | ||
| if compression_type == "auto": | ||
| compression_type = default_compression | ||
| self.storage[key] = self.compressions[compression_type]["compress"](value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect that this won't work for values that don't satisfy the buffer protocol.
Compression functions generally consume bytes-like objects and return bytes-like objects. Your test probably passes because Numpy arrays happen to satisfy this protocol, but other objects, like custom user classes, probably wouldn't.
I recommend trying out your solution with a wider variety of workloads. I suspect that it will have difficulty on many of them.
You will probably need to look into how Dask serializes objects to understand how to convert arbitrary Python objects into bytes-like objects.
|
Thanks @mrocklin. I have a doubt. Does Serialization and compression mean the same for `large results or Python specific data, like NumPy arrays or Pandas. If yes, Should I call serialize() function on Python specific datasets which looks for different serializers registered ? Sorry for replying late . |
|
Looking at the code in |
|
@prasunanand I would have a look at distributed/distributed/worker.py Lines 233 to 234 in 6db09f3
I think the idea is to have Compressing data prior to communication (in |
TomAugspurger
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few questions
- How do we expose this to the user? Asking them to provide the
TypeCompressoras adata=argument doesn't seem good. Perhaps acompress_dataorcompress_persisted_databoolean argument to Worker, with an associated config setting? - Can this be combined with a
zict.Bufferto enable in-memory compression & spill to disk at the same time?
| import dask | ||
|
|
||
|
|
||
| class TypeCompressor(collections.abc.MutableMapping): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably shouldn't be in distributed.protocol, since it isn't strictly related to the protocol.
Longer-term, this may be best to put in https://github.com/dask/zict/, but I'm happy to experiment with it here. This could maybe go in a new distributed/utils_data.py module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably tricky to do that as this depends on maybe_compress, which is in Distributed. That said, maybe there are ways to build this out of functionality in Zict. Thus making this more compact (maybe not even needing this class).
It should work on any python object that is passed in. To do this well you will need to understand more about how dask handles serialization and compression. In general, I get the sense that this issue may be beyond your current understanding of Dask. I was probably wrong to mark this as a good first issue. |
6dd14ff to
f31e53f
Compare
|
Apologies @mrocklin . I studied If not I will close this PR and work on other issues. |
| self.data = TypeCompressor() | ||
| for k, v in data.values(): | ||
| self.data[k] = data[k] | ||
| elif isinstance(data, TypeCompressor): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what's going on here. Was it necessary to special-case TypeCompressor here? Ideally we only depend on the MutableMapping interface and don't have to worry too much about every possible type that someone might provide individually. That can make future maintenance an issue.
| header, (compression, compressed) = self.storage[key] | ||
|
|
||
| frames = decompress({"compression": {compression}}, compressed) | ||
| return deserialize(header, frames) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In principle this looks good @prasunanand . Thank you for sticking with this.
The next thing to do is probably to try it out in practice a bit and see how it performs. There are optimizations I can think of, like it might make sense to only compress values that we know are small using the sizeof function, but that would just be a guess. If you have time to try out a few different kinds of computations, maybe taken from examples.dask.org that might be a good way to get more information here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@prasunanand , have you had any time to try the suggestions here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought this is what maybe_compress already does. Meaning it checks to see whether something is worth compressing. If so, it compresses it.
|
cc @madsbk (as this may be of interest 🙂) |
|
Any thoughts or questions on the feedback so far @prasunanand ? 🙂 |
|
Sorry I have been busy at work. Last week I have been trying to measure the performance using different workloads. Found out that Numpy array were not getting deserialized properly. Corrected that. Regarding measung performance for different workloads I have been trying to use dask diagnostics. Please let me know if its the right approach. Attached is an html file that I got as result.(To view change the extension from .txt to .html) |
|
Apart from this: |
|
I tried another approach to this problem (hope that is ok 🙂) in PR ( #3968 ). We need not go with that. Just trying to see if we are able to get the behavior we want out of Zict alone. Would be curious to hear what people think of it. |
Fixes: #3656
distributed/protocol/core.pyanddistributed/protocol/serialize.py(currently trying to understand headers and comm)cc: @mrocklin