Skip to content

Conversation

@prasunanand
Copy link
Contributor

Fixes: #3656

  • Add TypeCompression class
  • Added setitem and getitem
  • Need to get rid of maybe_compress in distributed/protocol/core.py and distributed/protocol/serialize.py(currently trying to understand headers and comm)

cc: @mrocklin

@mrocklin
Copy link
Member

Thanks for starting on this @prasunanand . I've added more context to the original issue. I hope that that helps to clarify the objectives: #3656

@prasunanand
Copy link
Contributor Author

Thanks @mrocklin :) .

@prasunanand
Copy link
Contributor Author

@mrocklin I request you to review again

compression_type = "zlib"
if compression_type == "auto":
compression_type = default_compression
self.storage[key] = self.compressions[compression_type]["compress"](value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that this won't work for values that don't satisfy the buffer protocol.

Compression functions generally consume bytes-like objects and return bytes-like objects. Your test probably passes because Numpy arrays happen to satisfy this protocol, but other objects, like custom user classes, probably wouldn't.

I recommend trying out your solution with a wider variety of workloads. I suspect that it will have difficulty on many of them.

You will probably need to look into how Dask serializes objects to understand how to convert arbitrary Python objects into bytes-like objects.

@prasunanand
Copy link
Contributor Author

prasunanand commented Apr 23, 2020

Thanks @mrocklin. I have a doubt. Does Serialization and compression mean the same for `large results or Python specific data, like NumPy arrays or Pandas. If yes, Should I call serialize() function on Python specific datasets which looks for different serializers registered ?

if type(key)  == np.ndarray or type(key)  ==  pd.Dataframe:
    compress_func = serialize
    decompress_func = serialize

Sorry for replying late .

@prasunanand
Copy link
Contributor Author

Looking at the code in dumps , are both serialize() and compress() called on the data ?

@TomAugspurger
Copy link
Member

@prasunanand I would have a look at

data: MutableMapping, type, None
The object to use for storage, builds a disk-backed LRU dict by default
.

I think the idea is to have Worker.data be the mutable mapping that implements automatic compression.

Compressing data prior to communication (in distributed/protocol) is also an interesting idea, but might be more involved.

@prasunanand prasunanand changed the title WIP: Autocompression Autocompression Apr 24, 2020
@prasunanand prasunanand marked this pull request as ready for review April 24, 2020 11:22
Copy link
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few questions

  1. How do we expose this to the user? Asking them to provide theTypeCompressor as a data= argument doesn't seem good. Perhaps a compress_data or compress_persisted_data boolean argument to Worker, with an associated config setting?
  2. Can this be combined with a zict.Buffer to enable in-memory compression & spill to disk at the same time?

import dask


class TypeCompressor(collections.abc.MutableMapping):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably shouldn't be in distributed.protocol, since it isn't strictly related to the protocol.

Longer-term, this may be best to put in https://github.com/dask/zict/, but I'm happy to experiment with it here. This could maybe go in a new distributed/utils_data.py module.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably tricky to do that as this depends on maybe_compress, which is in Distributed. That said, maybe there are ways to build this out of functionality in Zict. Thus making this more compact (maybe not even needing this class).

@mrocklin
Copy link
Member

Thanks @mrocklin. I have a doubt. Does Serialization and compression mean the same for `large results or Python specific data, like NumPy arrays or Pandas. If yes, Should I call serialize() function on Python specific datasets which looks for different serializers registered ?

if type(key)  == np.ndarray or type(key)  ==  pd.Dataframe:
    compress_func = serialize
    decompress_func = serialize

It should work on any python object that is passed in. To do this well you will need to understand more about how dask handles serialization and compression.

In general, I get the sense that this issue may be beyond your current understanding of Dask. I was probably wrong to mark this as a good first issue.

@prasunanand
Copy link
Contributor Author

Apologies @mrocklin . I studied zict codebase and worker.py as I was unable to get my head around what this feature demanded. Now I have a better understanding. Hence, I have tried once more and hope it solves the issue(or the solution is close).

If not I will close this PR and work on other issues.

self.data = TypeCompressor()
for k, v in data.values():
self.data[k] = data[k]
elif isinstance(data, TypeCompressor):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what's going on here. Was it necessary to special-case TypeCompressor here? Ideally we only depend on the MutableMapping interface and don't have to worry too much about every possible type that someone might provide individually. That can make future maintenance an issue.

header, (compression, compressed) = self.storage[key]

frames = decompress({"compression": {compression}}, compressed)
return deserialize(header, frames)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle this looks good @prasunanand . Thank you for sticking with this.

The next thing to do is probably to try it out in practice a bit and see how it performs. There are optimizations I can think of, like it might make sense to only compress values that we know are small using the sizeof function, but that would just be a guess. If you have time to try out a few different kinds of computations, maybe taken from examples.dask.org that might be a good way to get more information here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prasunanand , have you had any time to try the suggestions here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this is what maybe_compress already does. Meaning it checks to see whether something is worth compressing. If so, it compresses it.

@jakirkham
Copy link
Member

cc @madsbk (as this may be of interest 🙂)

@jakirkham
Copy link
Member

Any thoughts or questions on the feedback so far @prasunanand ? 🙂

@prasunanand
Copy link
Contributor Author

prasunanand commented Jul 14, 2020

Sorry I have been busy at work. Last week I have been trying to measure the performance using different workloads. Found out that Numpy array were not getting deserialized properly. Corrected that. Regarding measung performance for different workloads I have been trying to use dask diagnostics.

Please let me know if its the right approach.
In the following code, I am trying to compute over an ndarray of size (1000, 1000).

import asyncio

async def test_compression():
	import numpy as np
	import dask.array as da
	from distributed.protocol import TypeCompressor
	from distributed import Scheduler, Worker, Client, wait, performance_report

	x = np.ones(1000000)  # a large but compressible piece of data

	d = TypeCompressor()

	d["x"] = x  # put Python object into d
	out = d["x"]  # get the object back out
	d["z"] = 3

	assert str(out) == str(x)
	np.testing.assert_allclose(
	    np.frombuffer(out, x.dtype), x
	)  # the two objects should match

	# assuming here that the underlying bytes are
	# stored in something like a `.storage` attribute, but this isn't required
	# we check that the amount of actual data stored is small
	assert sum(map(len, d.storage.values())) < x.nbytes

	async with Scheduler() as s:
	    async with Worker(s.address, data=d) as worker:
	        async with Client(s.address, asynchronous=True) as c:
	        	async with performance_report(filename="dask-report.html"): #measure performance
	        		x = da.ones((1000, 1000))
	        		y = await x.persist()  # put data in memory
	        		y = await (x + x.T).mean().persist()  # do some work
	        		future = c.compute(y)
	        		await wait(future)
	        		assert sum(map(len, worker.data.storage.values())) < x.nbytes

asyncio.run(test_compression())

Attached is an html file that I got as result.(To view change the extension from .txt to .html)
dask-report.txt

@prasunanand
Copy link
Contributor Author

prasunanand commented Jul 14, 2020

Apart from this:
What are the other data-types I should investigate with (numpy.ndarray, string, pandas.Series) ?

@jakirkham
Copy link
Member

I tried another approach to this problem (hope that is ok 🙂) in PR ( #3968 ). We need not go with that. Just trying to see if we are able to get the behavior we want out of Zict alone. Would be curious to hear what people think of it.

Base automatically changed from master to main March 8, 2021 19:04
@prasunanand prasunanand requested a review from fjetter as a code owner January 23, 2024 10:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Create MutableMapping for automatic compression

5 participants