Skip to content

Conversation

@jcorbin
Copy link

@jcorbin jcorbin commented Mar 15, 2016

Allows to pass non-dask python literals and dask.bag.Items as values in **kwargs which pass through to the map func.

dask/bag/core.py Outdated
Useful for deferring a partial(func, **kwargs) with eventually-computed
kwargs in a dask graph.
TODO: if core supported splicing args for us, we could replace usage of
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feedback welcome here, this smelled like "too much technology" similarly to #1044 and so I wrote it down as scope creep for this PR. Either way (be it computed functions or arg splicing) I put it to you that you're missing a crucial part in your "incomplete lisp" around this pain point ;-)

@jcorbin
Copy link
Author

jcorbin commented Mar 15, 2016

Supercedes #1041

'%s not supported as kwarg value to Bag.map_partitions'
% type(val).__name__)
kw_pairs.append([key, val])
return dsk, kw_pairs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function strikes me as very special purpose. I can't imagine how it would be useful outside of bag.map. I wonder if, instead, we could generalize and reuse to_task_dasks to serve our purpose here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aye, shall I salvage as mentioned in #1041 (comment) first? I was perhaps over recalibrating here since I am indeed prone to over solving things ;-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe? What is the smallest possible change we can make to to_task_dasks
to achieve the desired effect?

On Tue, Mar 15, 2016 at 2:52 PM, Joshua T Corbin [email protected]
wrote:

In dask/bag/core.py
#1054 (comment):

  • a dask graph to pass the final computed **kwargs to a function.
  • Currently only dask.bag.Item and python literal values are supported.
  • """
  • kw_pairs = []
  • for key, val in iteritems(kwargs):
  •    if isinstance(val, Item):
    
  •        dsk = merge(dsk, val.dask)
    
  •        val = val.key
    
  •    # TODO elif isinstance(val, Value):
    
  •    elif isinstance(val, Base):
    
  •        raise NotImplementedError(
    
  •            '%s not supported as kwarg value to Bag.map_partitions'
    
  •            % type(val).**name**)
    
  •    kw_pairs.append([key, val])
    
  • return dsk, kw_pairs

Aye, shall I salvage as mentioned in #1041 (comment)
#1041 (comment) first? I
was perhaps over recalibrating here since I am indeed prone to over solving
things ;-)


You are receiving this because you commented.
Reply to this email directly or view it on GitHub
https://github.com/dask/dask/pull/1054/files/99fc7e7df80eb1def4471dda1611e5e7a577e248#r56247599

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not certain that we actually want the full machinery of to_task_dasks here, unless we really do want people to be able to pass fully-realized bags, dataframes, arrays, etc along... which seems like a good way to get horrible performance presuming you don't blow out memory ;-)

But let's say we do:
-to_task_dasks is from the imperative side at present, it only treats Values natively (everything else is optimize/inlined) which causes expensive computations (e.g. shard loading) to get duplicated in my realistic two-pass use-case

Maybe what's needed is more clarity around the to_task_dasks work started in #1041. For example rather than the weird, but Works For Me "special" support of the new base.normalize_to_dasks core:

  • it should have a way to differentiate between which dask types get referenced by key (error if a referenceable type has more than one key) vs optimized and inlined
  • it could have a way to blacklist certain types (presuming there's no better way to say "all large dasks" if that's a thing we want)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, that makes sense. Thoughts on the following?

def unpack_kwargs_for_map(kwargs):
    dsk = {}
    for k, v in kwargs.items():
        if hasattr(v, 'dask'):
            if hasattr(v, 'key'):
                kwargs[k] = v.key
                dsk.update(v.dask)
            else:
                raise ValueError(...)
    return dsk, kwargs

This has the benefit of supporting both Values and Items and it's also pure and so easily testable.

Unfortunately, this still lacks support for any sort of nesting, so if someone does the following then they'll get unfriendly behavior.

b.map(func, keyword=[b.sum()])

This nesting case is where something more general could really be handy. I agree with you though, that the fact that we want to limit ourselves by the contained dask collections makes things trickier.

I wonder if @jcrist has a suggestion.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, returning a new dict that the caller could then merge is an easy change.... what's the tradeoff between putting a dict-literal in the graph and a (dict, [[k, v]...]) which is how to_task_dasks represents eventual-dicts?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm dubious about loosening the referencing mechanism to be duck-typed because of the schedule-boundary-crossing problem. We can always loosen it later, and/or reconcile this function and to_task_dasks going forward.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also note that the duck typing isn't even sufficient for all "natural" values currently; e.g. dataframe.Scalar lacks a key property...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, makes sense. Can I put you on the hook to handle this if a user ever asks about why this doesn't work with Value objects?

Regarding dict it looks like that was a mistake on my part. I was assuming that the dask scheduler walked down dict objects. Looks like it doesn't at the moment.

In [1]: import dask

In [2]: def inc(x):
    return x + 1
   ...: 

In [3]: dsk = {'x': 1, 'y': {'a': 'x'}}

In [4]: dask.get(dsk, 'y')
Out[4]: {'a': 'x'}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing, count me on hook :-)

dask/bag/core.py Outdated
>>> b.map(
... lambda x, total=0: float(x) / total,
... total=b.sum()).sum().compute()
1.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example takes me a moment to grok and so, while it's a fine test, it's a bit odd for documentation. I wonder if an example like the following might be easier on the reader.

>>> def add(x=0, y=0):
...     return x + y

>>> b = db.from_sequence(range(5))
>>> b.map(add, y=10).compute()
(10, 11, 12, 13, 14)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I can totally see having an example in doc string with a constant python value, but we also want one with a dask value as well, I think simply using temporary variables to make the code clearer, rather than going along with the penchant for heavy inlining that dask's source code has would help:

>>> nums = db.from_sequence(...)
>>> nums_total = nums.sum()
>>> normalized = b.map(lambda x, total=0: float(x) / total, total=nums_total)
>>> normalized.sum()
1.0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should start with the literal value. This gets the common case
out first, which I think is as if not more important.

Adding something like b.sum() sounds fine too. I recommend keeping it
inline but moving the lambda out, perhaps replace it with a function div?
I wouldn't worry about float(x), from future import division should
have you covered.

On Tue, Mar 15, 2016 at 2:58 PM, Joshua T Corbin [email protected]
wrote:

In dask/bag/core.py
#1054 (comment):

     """ Map a function across all elements in collection

     >>> import dask.bag as db
     >>> b = db.from_sequence(range(5))
     >>> list(b.map(lambda x: x * 10))  # doctest: +SKIP
     [0, 10, 20, 30, 40]
  •    Any **kwargs are passed through to func along with the (maybe unpacked)
    
  •    item argument(s).  Values in **kwargs must either be a non-dask python
    
  •    literal or a dask.bag.Item.
    
  •    If a dask.Bag.Item is used its computed value is passed to the map
    
  •    function.  This means that whatever computations are implied by any
    
  •    Item values block any of the map work.  Example:
    
  •    >>> b = db.from_sequence(range(5))
    
  •    >>> b.map(
    
  •    ...     lambda x, total=0: float(x) / total,
    
  •    ...     total=b.sum()).sum().compute()
    
  •    1.0
    

Sure I can totally see having an example in doc string with a constant
python value, but we also want one with a dask value as well, I think
simply using temporary variables to make the code clearer, rather than
going along with the penchant for heavy inlining that dask's source code
has would help:

nums = db.from_sequence(...)>>> nums_total = nums.sum()>>> normalized = b.map(lambda x, total=0: float(x) / total, total=nums_total)>>> normalized.sum()1.0


You are receiving this because you commented.
Reply to this email directly or view it on GitHub
https://github.com/dask/dask/pull/1054/files/99fc7e7df80eb1def4471dda1611e5e7a577e248#r56248514

@jcorbin
Copy link
Author

jcorbin commented Mar 15, 2016

Updated the docstrings, will squash before merge.

@jcorbin
Copy link
Author

jcorbin commented Mar 15, 2016

Okay, turns out that (with_kwargs, func, kwargs) is equivalent to (apply, partial, [func], kwargs); updated, will squash before merge.

@jcorbin
Copy link
Author

jcorbin commented Mar 15, 2016

Okay, reworked merge_kwargs => unpack_kwargs (#willsquashbeforemerge); I didn't name it with the suggested _for_map suffix since there's nothing map-specific about it, and a subsequent diff could extend reduction to allow passing kwargs for perpartition (however I'm intentionally keeping this one focused on the map functions).

@jcorbin
Copy link
Author

jcorbin commented Mar 16, 2016

Okay rebased and squashed on latest master.

def test_map_with_kwargs():
b = db.from_sequence(range(100), npartitions=10)
assert b.map(lambda x, total=0: float(x) / total,
total=b.sum()).sum().compute() == 1.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should test with literals as well

@mrocklin
Copy link
Member

This is all looking pretty good to me. I left some final comments on testing.

@jcorbin
Copy link
Author

jcorbin commented Mar 16, 2016

Updated tests:

  • added literal tests for Bag.map
  • shifted tests for Bag.map_partitions to be direct lifts of the Bag.map cases (dropped the prior corpus case)

Will squash before final merge modulo feedback.

@mrocklin
Copy link
Member

+1

@jcrist any comments?

dask/bag/core.py Outdated
kw_pairs = []
for key, val in iteritems(kwargs):
if isinstance(val, Item):
dsk = merge(dsk, val.dask)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick, could actually be dsk.update(val.dask) - merge creates a new dict and copies the old values over. Slightly more efficient, but not super important.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was groaning about impure use earlier, which is possibly why @jcorbin is doing it this way. I agree with @jcrist though that, because we're only mutating a locally created dict, rather than something that was passed in, update is probably slightly preferred, especially as graphs can become large.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change, I was only using merge out of prevailing idiom :-)

@jcrist
Copy link
Member

jcrist commented Mar 16, 2016

Sorry for coming in late and complaining after you've fixed things :). I've left some comments, but none of them are 100% blocking.

@jcorbin
Copy link
Author

jcorbin commented Mar 16, 2016

  • updated to use dict.update rather than merge where noted
  • new docstring based around numeric max / take(5)

Will squash pending review once we're happy with the new docstrings

@jcrist
Copy link
Member

jcrist commented Mar 17, 2016

This looks good (and useful) to me. Thanks for making those changes. +1, will merged once squashed.

@jcorbin
Copy link
Author

jcorbin commented Mar 17, 2016

Squashed, merge at will.

jcrist added a commit that referenced this pull request Mar 17, 2016
Add **kwargs support to Bag.map and Bag.map_partitions
@jcrist jcrist merged commit 2aaeef2 into dask:master Mar 17, 2016
@mrocklin
Copy link
Member

I'm very glad to see this in. Thanks @jcorbin !

@sinhrks sinhrks added this to the 0.8.2 milestone Apr 20, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants