Skip to content

Conversation

@fjetter
Copy link
Member

@fjetter fjetter commented Feb 11, 2025

This is a prototype to wrap HLGs in an expression. This helps with streamlining our compute calls and the serialization path to the scheduler.

I only tested this on a single test so far using a simple threaded scheduler. There are likely still tons of issues and the new expression classes have to be cleaned up, etc.

The top-level compute is implementing the new path and includes some comments on how this will look like when migrated to the client

sibling dask/distributed#9008

Related PRs

Closes dask/distributed#7964
Closes dask/dask-expr#14

@fjetter fjetter marked this pull request as draft February 11, 2025 16:33
@github-actions
Copy link
Contributor

github-actions bot commented Feb 11, 2025

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

      9 files  ± 0        9 suites  ±0   3h 25m 53s ⏱️ - 10m 45s
 17 818 tests  -  5   16 603 ✅ +  266   1 215 💤 ±0  0 ❌  -   271 
159 436 runs   - 45  147 321 ✅ +2 193  12 115 💤  - 3  0 ❌  - 2 235 

Results for commit 2d208a1. ± Comparison against base commit bd3a7d6.

This pull request removes 6 and adds 1 tests. Note that renamed tests count towards both.
dask.dataframe.dask_expr.io.tests.test_distributed ‑ test_parquet_distriuted[arrow]
dask.dataframe.dask_expr.tests.test_distributed ‑ test_compute_concatenates[False]
dask.dataframe.dask_expr.tests.test_distributed ‑ test_compute_concatenates[True]
dask.tests.test_base ‑ test_optimizations_keyword
dask.tests.test_delayed ‑ test_delayed_optimize
dask.tests.test_distributed ‑ test_get_scheduler_with_distributed_active
dask.dataframe.dask_expr.io.tests.test_distributed ‑ test_parquet_distributed[arrow]

♻️ This comment has been updated with latest results.

sut = db.from_sequence(seq, partition_size=9)
li = list(random.choices(sut, k=10).compute())
assert sut.map_partitions(len).compute() == (9, 1)
assert sut.map_partitions(len).compute() == [9, 1]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the bag output type has been a bit inconsistent so far depending on whether the output is a single partition or not (which in itself is not trivial for bags). Now, the output is always a list

dask/base.py Outdated


def collections_to_dsk(collections, optimize_graph=True, optimizations=(), **kwargs):
def collections_to_dsk(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intention is to rename this new function. We'll have the option of keeping the old one for backwards compat if necessary (with a deprecationwarning).

Copy link
Member Author

@fjetter fjetter Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi - I'm sure I've misunderstood :), but I just wondered what you mean by "just drop it". I rely on this function in one of my libraries (that is in the GitHub search you linked to). Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query yields 32 hits and most of the hits are vendored code. I don't know where you are using it but I'm not willing to keep this function around for one library (or very few).

We're internally moving away from HighLevelGraph objects so this function as it currently is, is useless and I do not intend to maintain it.
The function that is replacing this will likely be internal.

What exactly are you doing with this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it helps we can keep the code around for a while (and raise a deprecation warning). That's easy to do but long term, usage of this function is likely not viable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,
Thanks for the quick response. It is, of course, fine to get rid of it :) I just wondered what the implications for my work would be ...

I use it to get at a dictionary view (let's call it dsk) of a task graph (usually non-optimised, but after culling) that I can easily loop through for inspection of the data definitions, or to modify the data definitions. In the latter case I then convert the dictionary back to Dask array (dx = da.Array(dsk, ...)) and can carry on.

Our primary use case for this is management of file locations. We need to a) know which files on disk contribute to a computed result, and b) modify file locations when the actual datasets have (or will have) moved since the file names were logged.

If I can replicate this functionality in a new framework, I'll be very happy :)

David

@fjetter
Copy link
Member Author

fjetter commented Feb 27, 2025

I believe all the test cases are now working fine except of some distribtued integration tests. CI seems to not install the right version of the distributed branch since they pass locally.
There are many smaller changes that make sense unrelated to the HLG pickle change and I will start breaking those out into separate PRs to put this one in a reviewable state.

@fjetter
Copy link
Member Author

fjetter commented Mar 20, 2025

While CI is green, there are a bunch of things that are only tested in distributed so this isn't done, yet

@fjetter
Copy link
Member Author

fjetter commented Mar 20, 2025

On distributed side, there is one (maybe two) issues left that relate to how futures are serialized and registered. I doubt this will have a major impact on the code here.
I'll give it another pass to see if there are smaller changes that are worth breaking out but review can start already.

@fjetter
Copy link
Member Author

fjetter commented Mar 20, 2025

I summarized the high level changes in the changelog already. I recommend to start there to get an idea of the changes

@fjetter
Copy link
Member Author

fjetter commented Mar 20, 2025

Also important in case somebody wants to test: To get a meaningful kick out of this when working with arrays we will first have to rework da.store since that is still explicitly optimizing and materializing (the only place in the code base doing this differently). While previous attempts to migrate this to blockwise failed, wrapping it up in these classes is simpler and possible. I already have an early prototype and will look into that once the final test cases are wrapped up.

(i.e. coiled benchmarks are using to_zarr / store in almost all cases so to measure the difference here the benchmarks have to be rewritten a bit. I could already confirm there that it is indeed not materializing locally which saves transmission time and removes any relevant constraints on memory usage on client side we had on the very large examples)

@fjetter fjetter force-pushed the wrap_hlg_expr branch 2 times, most recently from 6c2aa8d to 26270e5 Compare March 21, 2025 09:56
@fjetter fjetter force-pushed the wrap_hlg_expr branch 4 times, most recently from a7e6df6 to 90a01d6 Compare March 21, 2025 11:42
- jinja2
- pip
- pip:
- git+https://github.com/dask/distributed
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • revert before merge

Copy link
Member

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @fjetter!

@fjetter fjetter merged commit b2a4a21 into dask:main Mar 24, 2025
19 of 22 checks passed
@fjetter fjetter deleted the wrap_hlg_expr branch March 31, 2025 09:26
HiromuHota added a commit to snorkel-ai/ray that referenced this pull request May 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Scheduler gather should warn or abort requests if data is too large Distributed protocol

4 participants