Skip to content

Conversation

@madsbk
Copy link
Contributor

@madsbk madsbk commented Mar 23, 2020

This PR implements direct communication of spilled device data. It uses get_from_dev_or_host_buffer() to access work data if available.

@mrocklin
Copy link
Member

My gut reaction is that this is probably overly specific to a particular type of data. An approach like this is, I think, unlikely to be accepted (although I also see that you marked this as WIP, so maybe you're just prototyping here)

@jakirkham
Copy link
Member

jakirkham commented Mar 23, 2020

Good to know Matt. What sorts of things do you think could be accepted?

For context the gist of what we are looking for is a way to access the spilled data without causing it to be "unspilled". The idea being we could communicate spilled data between workers without having to "unspill" it first, which should help deal with some memory pressure issues we are running into. Expect this feature might be valuable outside of our use case (though please weigh in here) 😉

@mrocklin
Copy link
Member

Good to know Matt. What sorts of things do you think could be accepted?

I don't know. Right now we don't assume any API beyond that of a MutableMapping. It's this sort of API structure that lets us be generic enough that projects like dask-cuda are able to build their own systems and know that they'll work well.

I don't have an immediate answer on what to do here. That would probably take me some time to think about (which I may not have in the near-to-moderate future).

For context the gist of what we are looking for is a way to access the spilled data without causing it to be "unspilled". The idea being we could communicate spilled data between workers without having to "unspill" it first, which should help deal with some memory pressure issues we are running into.

Thank you for adding context. That helps.

Well, it's not ideal but you could subclass Worker. That introduces a large maintenance burden on you all though, and reduces composability.

For send_task_state_to_scheduler it looks like we don't always use value, particularly if we've already stored nbytes and type (which I think is the common case). So in that case you could probably avoid this cost 99% of the time by only getting from self.data if one of those values aren't available.

I don't know what to do with get_data right now though. I suspect that there is a clean solution, but it will probably take some thought to find.

@mrocklin
Copy link
Member

Expect this feature might be valuable outside of our use case (though please weigh in here) wink

Yeah, certainly one way to motivate a fringe feature is to make it less fringe :)

Today I only know of a couple of use cases of people using custom Worker.data MutableMappings, so unfortunately there isn't much ammunition here to make a case for a new convention.

One, perhaps indirect, approach here though would be to build a more general purpose use case. In particular, I know a few people who would like a Worker.data MutableMapping that did automatic in-memory compression. I think that that this use case might have a similar need as what you're describing (we would already have serialized and compressed data in memory, so we might as well use that when we send data across a wire).

So, one solution would be to build that mapping, get people excited about this (this might not be too hard), and then use that broader use case as justification for the convention that you want to add.

Plus, I think that building automatic compression would be fun.

@jakirkham
Copy link
Member

For send_task_state_to_scheduler it looks like we don't always use value, particularly if we've already stored nbytes and type (which I think is the common case). So in that case you could probably avoid this cost 99% of the time by only getting from self.data if one of those values aren't available.

I gave this a go in PR ( #3628 ) if you want to take a look 😉

@jakirkham
Copy link
Member

Thanks for giving this some thought Matt! 😄

Yeah that makes sense. Standard interfaces are very useful. It's worth considering if we can mold what we want to fit that interface.

No worries 😉

Yeah it's worth considering subclassing Worker if that's what we need in the near term. Maybe there is some way to pull that apart and upstream later? Though agree it would be better to pursue a different option if there is one.

The compression idea is interesting. My guess is this is something we could considering in the medium term, but probably not short term. Happy to be corrected in case others have different thoughts. 😉

Yep am hopeful there is a clean solution as well. Will keep mulling this over as well 🙂

@mrocklin
Copy link
Member

The compression idea is interesting. My guess is this is something we could considering in the medium term, but probably not short term. Happy to be corrected in case others have different thoughts. wink

I don't know how on-fire you all are right now, but I think that this wouldn't be that hard for someone like you or @pentschev . Also, I think that it would benefit maybe lots of folks upstream, which presumably we all signed up for a little bit.

@mrocklin
Copy link
Member

Here's a janky start ?

import collections
from typing import Dict, Tuple, Callable

class TypeCompression(collections.MutableMapping):
    def __init__(
        self,
        types: Dict[type, Tuple[Callable, Callable]],
        storage=dict
    ):
        self.types = type
        self.storage = collections.defaultdict(storage)

    def __setitem__(self, key, value):
        typ = type(key)
        if typ in self.types:
            compress, decompress = self.types[typ]
            value = compress(value)
        self.storage[typ] = value

    def __getitem__(self, key):
        for typ, d in self.storage.items():
            if key in d:
                value = d[key]
                break
        else:
            raise KeyError(key)

        if typ in self.types:
            compress, decompress = self.types[typ]
            value = decompress(value)

        return value

There are probably better ways to arrange this though.

@mrocklin
Copy link
Member

We would then make compress/decompress functions that use dask's serialize/compression functions.

That's a pretty far digression from this PR though, so I'll stop pushing it.

@madsbk
Copy link
Contributor Author

madsbk commented Mar 24, 2020

Yeah, I suspected that this would be too specific :)

Making the feature more useful by implementing a compression dict makes a lot of sense.

An alternative approach is to make cuDF or RMM support spilled device buffers so that Dask/Distributed wouldn't have to do the unspilling.

However, right now I am tracking down an memory issue in Distributed where data isn't spilled when doing a dataframe repartition. Will report back when I know exactly what is going on :)

@jakirkham
Copy link
Member

Right I guess Dask doesn't do this already. I think this is something Dask-CUDA is doing already as part of spilling. Suppose it could be broken out an upstreamed with some effort?

@mrocklin
Copy link
Member

I've raised #3656 for automatic compression MutableMappings.

@mrocklin
Copy link
Member

mrocklin commented Apr 5, 2020

This appears to have stalled. Should we close this? Perhaps it would make sense to raise an issue with the problem that you're trying to solve and talk about possible solutions there more broadly?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants