-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Add **kwargs support to Bag.map and Bag.map_partitions #1054
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
dask/bag/core.py
Outdated
| Useful for deferring a partial(func, **kwargs) with eventually-computed | ||
| kwargs in a dask graph. | ||
| TODO: if core supported splicing args for us, we could replace usage of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feedback welcome here, this smelled like "too much technology" similarly to #1044 and so I wrote it down as scope creep for this PR. Either way (be it computed functions or arg splicing) I put it to you that you're missing a crucial part in your "incomplete lisp" around this pain point ;-)
99fc7e7 to
88a9939
Compare
|
Supercedes #1041 |
| '%s not supported as kwarg value to Bag.map_partitions' | ||
| % type(val).__name__) | ||
| kw_pairs.append([key, val]) | ||
| return dsk, kw_pairs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function strikes me as very special purpose. I can't imagine how it would be useful outside of bag.map. I wonder if, instead, we could generalize and reuse to_task_dasks to serve our purpose here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aye, shall I salvage as mentioned in #1041 (comment) first? I was perhaps over recalibrating here since I am indeed prone to over solving things ;-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe? What is the smallest possible change we can make to to_task_dasks
to achieve the desired effect?
On Tue, Mar 15, 2016 at 2:52 PM, Joshua T Corbin [email protected]
wrote:
In dask/bag/core.py
#1054 (comment):
- a dask graph to pass the final computed **kwargs to a function.
- Currently only dask.bag.Item and python literal values are supported.
- """
- kw_pairs = []
- for key, val in iteritems(kwargs):
if isinstance(val, Item):dsk = merge(dsk, val.dask)val = val.key# TODO elif isinstance(val, Value):elif isinstance(val, Base):raise NotImplementedError('%s not supported as kwarg value to Bag.map_partitions'% type(val).**name**)kw_pairs.append([key, val])- return dsk, kw_pairs
Aye, shall I salvage as mentioned in #1041 (comment)
#1041 (comment) first? I
was perhaps over recalibrating here since I am indeed prone to over solving
things ;-)—
You are receiving this because you commented.
Reply to this email directly or view it on GitHub
https://github.com/dask/dask/pull/1054/files/99fc7e7df80eb1def4471dda1611e5e7a577e248#r56247599
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not certain that we actually want the full machinery of to_task_dasks here, unless we really do want people to be able to pass fully-realized bags, dataframes, arrays, etc along... which seems like a good way to get horrible performance presuming you don't blow out memory ;-)
But let's say we do:
-to_task_dasks is from the imperative side at present, it only treats Values natively (everything else is optimize/inlined) which causes expensive computations (e.g. shard loading) to get duplicated in my realistic two-pass use-case
- this was why in https://github.com/dask/dask/pull/1041/commits I first factored a generalized core out of the imperative version, and then introduced a bag-native version reusing the same python-object-walking core
Maybe what's needed is more clarity around the to_task_dasks work started in #1041. For example rather than the weird, but Works For Me "special" support of the new base.normalize_to_dasks core:
- it should have a way to differentiate between which dask types get referenced by key (error if a referenceable type has more than one key) vs optimized and inlined
- it could have a way to blacklist certain types (presuming there's no better way to say "all large dasks" if that's a thing we want)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, that makes sense. Thoughts on the following?
def unpack_kwargs_for_map(kwargs):
dsk = {}
for k, v in kwargs.items():
if hasattr(v, 'dask'):
if hasattr(v, 'key'):
kwargs[k] = v.key
dsk.update(v.dask)
else:
raise ValueError(...)
return dsk, kwargsThis has the benefit of supporting both Values and Items and it's also pure and so easily testable.
Unfortunately, this still lacks support for any sort of nesting, so if someone does the following then they'll get unfriendly behavior.
b.map(func, keyword=[b.sum()])
This nesting case is where something more general could really be handy. I agree with you though, that the fact that we want to limit ourselves by the contained dask collections makes things trickier.
I wonder if @jcrist has a suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, returning a new dict that the caller could then merge is an easy change.... what's the tradeoff between putting a dict-literal in the graph and a (dict, [[k, v]...]) which is how to_task_dasks represents eventual-dicts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm dubious about loosening the referencing mechanism to be duck-typed because of the schedule-boundary-crossing problem. We can always loosen it later, and/or reconcile this function and to_task_dasks going forward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also note that the duck typing isn't even sufficient for all "natural" values currently; e.g. dataframe.Scalar lacks a key property...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, makes sense. Can I put you on the hook to handle this if a user ever asks about why this doesn't work with Value objects?
Regarding dict it looks like that was a mistake on my part. I was assuming that the dask scheduler walked down dict objects. Looks like it doesn't at the moment.
In [1]: import dask
In [2]: def inc(x):
return x + 1
...:
In [3]: dsk = {'x': 1, 'y': {'a': 'x'}}
In [4]: dask.get(dsk, 'y')
Out[4]: {'a': 'x'}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing, count me on hook :-)
dask/bag/core.py
Outdated
| >>> b.map( | ||
| ... lambda x, total=0: float(x) / total, | ||
| ... total=b.sum()).sum().compute() | ||
| 1.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This example takes me a moment to grok and so, while it's a fine test, it's a bit odd for documentation. I wonder if an example like the following might be easier on the reader.
>>> def add(x=0, y=0):
... return x + y
>>> b = db.from_sequence(range(5))
>>> b.map(add, y=10).compute()
(10, 11, 12, 13, 14)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure I can totally see having an example in doc string with a constant python value, but we also want one with a dask value as well, I think simply using temporary variables to make the code clearer, rather than going along with the penchant for heavy inlining that dask's source code has would help:
>>> nums = db.from_sequence(...)
>>> nums_total = nums.sum()
>>> normalized = b.map(lambda x, total=0: float(x) / total, total=nums_total)
>>> normalized.sum()
1.0There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should start with the literal value. This gets the common case
out first, which I think is as if not more important.
Adding something like b.sum() sounds fine too. I recommend keeping it
inline but moving the lambda out, perhaps replace it with a function div?
I wouldn't worry about float(x), from future import division should
have you covered.
On Tue, Mar 15, 2016 at 2:58 PM, Joshua T Corbin [email protected]
wrote:
In dask/bag/core.py
#1054 (comment):""" Map a function across all elements in collection >>> import dask.bag as db >>> b = db.from_sequence(range(5)) >>> list(b.map(lambda x: x * 10)) # doctest: +SKIP [0, 10, 20, 30, 40]
Any **kwargs are passed through to func along with the (maybe unpacked)item argument(s). Values in **kwargs must either be a non-dask pythonliteral or a dask.bag.Item.
If a dask.Bag.Item is used its computed value is passed to the mapfunction. This means that whatever computations are implied by anyItem values block any of the map work. Example:
>>> b = db.from_sequence(range(5))>>> b.map(... lambda x, total=0: float(x) / total,... total=b.sum()).sum().compute()1.0Sure I can totally see having an example in doc string with a constant
python value, but we also want one with a dask value as well, I think
simply using temporary variables to make the code clearer, rather than
going along with the penchant for heavy inlining that dask's source code
has would help:nums = db.from_sequence(...)>>> nums_total = nums.sum()>>> normalized = b.map(lambda x, total=0: float(x) / total, total=nums_total)>>> normalized.sum()1.0
—
You are receiving this because you commented.
Reply to this email directly or view it on GitHub
https://github.com/dask/dask/pull/1054/files/99fc7e7df80eb1def4471dda1611e5e7a577e248#r56248514
|
Updated the docstrings, will squash before merge. |
|
Okay, turns out that |
|
Okay, reworked merge_kwargs => unpack_kwargs (#willsquashbeforemerge); I didn't name it with the suggested |
dd8571f to
e92e848
Compare
|
Okay rebased and squashed on latest master. |
| def test_map_with_kwargs(): | ||
| b = db.from_sequence(range(100), npartitions=10) | ||
| assert b.map(lambda x, total=0: float(x) / total, | ||
| total=b.sum()).sum().compute() == 1.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should test with literals as well
|
This is all looking pretty good to me. I left some final comments on testing. |
|
Updated tests:
Will squash before final merge modulo feedback. |
|
+1 @jcrist any comments? |
dask/bag/core.py
Outdated
| kw_pairs = [] | ||
| for key, val in iteritems(kwargs): | ||
| if isinstance(val, Item): | ||
| dsk = merge(dsk, val.dask) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick, could actually be dsk.update(val.dask) - merge creates a new dict and copies the old values over. Slightly more efficient, but not super important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will change, I was only using merge out of prevailing idiom :-)
|
Sorry for coming in late and complaining after you've fixed things :). I've left some comments, but none of them are 100% blocking. |
Will squash pending review once we're happy with the new docstrings |
|
This looks good (and useful) to me. Thanks for making those changes. +1, will merged once squashed. |
b0db8ad to
60cacc7
Compare
|
Squashed, merge at will. |
Add **kwargs support to Bag.map and Bag.map_partitions
|
I'm very glad to see this in. Thanks @jcorbin ! |
Allows to pass non-dask python literals and
dask.bag.Items as values in **kwargs which pass through to the map func.