-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Wrap HLGs in an Expr to avoid Client side materialization #11736
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 9 files ± 0 9 suites ±0 3h 25m 53s ⏱️ - 10m 45s Results for commit 2d208a1. ± Comparison against base commit bd3a7d6. This pull request removes 6 and adds 1 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
| sut = db.from_sequence(seq, partition_size=9) | ||
| li = list(random.choices(sut, k=10).compute()) | ||
| assert sut.map_partitions(len).compute() == (9, 1) | ||
| assert sut.map_partitions(len).compute() == [9, 1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the bag output type has been a bit inconsistent so far depending on whether the output is a single partition or not (which in itself is not trivial for bags). Now, the output is always a list
dask/base.py
Outdated
|
|
||
|
|
||
| def collections_to_dsk(collections, optimize_graph=True, optimizations=(), **kwargs): | ||
| def collections_to_dsk( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intention is to rename this new function. We'll have the option of keeping the old one for backwards compat if necessary (with a deprecationwarning).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking at https://github.com/search?q=collections_to_dsk+language%3APython+NOT+path%3A%2F%5Edask%5C%2F%2F+NOT+path%3A%2F%5Edistributed%5C%2F%2F+NOT+path%3Asite-packages&type=code it seems nobody is using this function publicly so I will go ahead and just drop it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi - I'm sure I've misunderstood :), but I just wondered what you mean by "just drop it". I rely on this function in one of my libraries (that is in the GitHub search you linked to). Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This query yields 32 hits and most of the hits are vendored code. I don't know where you are using it but I'm not willing to keep this function around for one library (or very few).
We're internally moving away from HighLevelGraph objects so this function as it currently is, is useless and I do not intend to maintain it.
The function that is replacing this will likely be internal.
What exactly are you doing with this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it helps we can keep the code around for a while (and raise a deprecation warning). That's easy to do but long term, usage of this function is likely not viable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi,
Thanks for the quick response. It is, of course, fine to get rid of it :) I just wondered what the implications for my work would be ...
I use it to get at a dictionary view (let's call it dsk) of a task graph (usually non-optimised, but after culling) that I can easily loop through for inspection of the data definitions, or to modify the data definitions. In the latter case I then convert the dictionary back to Dask array (dx = da.Array(dsk, ...)) and can carry on.
Our primary use case for this is management of file locations. We need to a) know which files on disk contribute to a computed result, and b) modify file locations when the actual datasets have (or will have) moved since the file names were logged.
If I can replicate this functionality in a new framework, I'll be very happy :)
David
c05d5bb to
eda4bad
Compare
17cf997 to
aa17996
Compare
248ab86 to
66bb61d
Compare
92cd97b to
9e99ec1
Compare
8f4612d to
83b30b1
Compare
|
I believe all the test cases are now working fine except of some distribtued integration tests. CI seems to not install the right version of the distributed branch since they pass locally. |
83b30b1 to
e308b91
Compare
e308b91 to
86727b8
Compare
|
While CI is green, there are a bunch of things that are only tested in distributed so this isn't done, yet |
|
On distributed side, there is one (maybe two) issues left that relate to how futures are serialized and registered. I doubt this will have a major impact on the code here. |
|
I summarized the high level changes in the changelog already. I recommend to start there to get an idea of the changes |
|
Also important in case somebody wants to test: To get a meaningful kick out of this when working with arrays we will first have to rework da.store since that is still explicitly optimizing and materializing (the only place in the code base doing this differently). While previous attempts to migrate this to blockwise failed, wrapping it up in these classes is simpler and possible. I already have an early prototype and will look into that once the final test cases are wrapped up. (i.e. coiled benchmarks are using to_zarr / store in almost all cases so to measure the difference here the benchmarks have to be rewritten a bit. I could already confirm there that it is indeed not materializing locally which saves transmission time and removes any relevant constraints on memory usage on client side we had on the very large examples) |
6c2aa8d to
26270e5
Compare
a7e6df6 to
90a01d6
Compare
| - jinja2 | ||
| - pip | ||
| - pip: | ||
| - git+https://github.com/dask/distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- revert before merge
hendrikmakait
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @fjetter!
This is a prototype to wrap HLGs in an expression. This helps with streamlining our
computecalls and the serialization path to the scheduler.I only tested this on a single test so far using a simple threaded scheduler. There are likely still tons of issues and the new expression classes have to be cleaned up, etc.
The top-level
computeis implementing the new path and includes some comments on how this will look like when migrated to the clientsibling dask/distributed#9008
Related PRs
Closes dask/distributed#7964
Closes dask/dask-expr#14