Skip to content

Conversation

@crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Aug 25, 2023

This check is somewhat cruder than what was described in the issue above.

I tried setting up a system that graciously copes with heterogeneous clusters where a few workers mount much more memory than the rest (a somewhat common design). However

  1. it was resulting in unnecessarily complicated and fragile implementation (it would add new restrictions to tasks, which in turn would flip rootish tasks to non-rootish), and
  2. it didn't escape the fact that today dask/distributed has no clue about the output size of a task (whereas dask/dask does). In other words, the scheduler can't know in advance if the task is
  • a reduction that requires sum(dts.nbytes for dts in ts.dependencies) heap memory, or
  • a concatenation that requires twice as much, or
  • something else that requires arbitrary amounts of temporary memory (possibly oom-larger than the inputs).

So in the end I opted for a crude order-of-magnitude check and, in the case of heterogenous clusters, explicitly expect the user to use worker or resource constraints.

Note

This catches client.gather(client.compute(collection)), but not collection.compute() or dask.compute(collection), as only the first one has a finalizer; the other two fetch the individual chunks directly from the original workers to the client.

@crusaderky crusaderky self-assigned this Aug 25, 2023
@github-actions
Copy link
Contributor

github-actions bot commented Aug 25, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       21 files  +       17         21 suites  +17   10h 49m 29s ⏱️ + 9h 55m 16s
  3 795 tests +     415    3 688 ✔️ +     519     107 💤  -    104  0 ±0 
36 687 runs  +31 420  34 888 ✔️ +30 068  1 799 💤 +1 352  0 ±0 

Results for commit f3881d2. ± Comparison against base commit ef59142.

♻️ This comment has been updated with latest results.

@crusaderky crusaderky force-pushed the decide_worker_memlimit branch from b2d4a7d to 28b1a43 Compare August 29, 2023 16:05
@crusaderky crusaderky force-pushed the decide_worker_memlimit branch from 28b1a43 to f3881d2 Compare September 4, 2023 14:04
@crusaderky crusaderky changed the title [WIP] Auto-fail tasks with deps larger than the worker memory Auto-fail tasks with deps larger than the worker memory Sep 4, 2023
@crusaderky crusaderky marked this pull request as ready for review September 4, 2023 15:02
@crusaderky crusaderky requested a review from fjetter as a code owner September 4, 2023 15:02
@fjetter
Copy link
Member

fjetter commented Sep 5, 2023

it didn't escape the fact that today dask/distributed has no clue about the output size of a task (whereas dask/dask does). In other words, the scheduler can't know in advance if the task is

This is only true for array workloads atm

@crusaderky crusaderky merged commit dd56cc6 into dask:main Sep 5, 2023
@crusaderky crusaderky deleted the decide_worker_memlimit branch September 5, 2023 15:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Scheduler gather should warn or abort requests if data is too large

2 participants