Skip to content

Commit b872d45

Browse files
authored
Cluster dump utilities (#5920)
1 parent 4ddb006 commit b872d45

File tree

6 files changed

+487
-34
lines changed

6 files changed

+487
-34
lines changed

distributed/cluster_dump.py

Lines changed: 270 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,17 @@
22

33
from __future__ import annotations
44

5-
from typing import IO, Any, Awaitable, Callable, Literal
5+
from collections import defaultdict
6+
from collections.abc import Mapping
7+
from pathlib import Path
8+
from typing import IO, Any, Awaitable, Callable, Collection, Literal
69

710
import fsspec
811
import msgpack
912

1013
from distributed.compatibility import to_thread
14+
from distributed.stories import scheduler_story as _scheduler_story
15+
from distributed.stories import worker_story as _worker_story
1116

1217

1318
def _tuple_to_list(node):
@@ -57,3 +62,267 @@ def writer(state: dict, f: IO):
5762
# Write from a thread so we don't block the event loop quite as badly
5863
# (the writer will still hold the GIL a lot though).
5964
await to_thread(writer, state, f)
65+
66+
67+
def load_cluster_dump(url: str, **kwargs) -> dict:
68+
"""Loads a cluster dump from a disk artefact
69+
70+
Parameters
71+
----------
72+
url : str
73+
Name of the disk artefact. This should have either a
74+
``.msgpack.gz`` or ``yaml`` suffix, depending on the dump format.
75+
**kwargs :
76+
Extra arguments passed to :func:`fsspec.open`.
77+
78+
Returns
79+
-------
80+
state : dict
81+
The cluster state at the time of the dump.
82+
"""
83+
if url.endswith(".msgpack.gz"):
84+
mode = "rb"
85+
reader = msgpack.unpack
86+
elif url.endswith(".yaml"):
87+
import yaml
88+
89+
mode = "r"
90+
reader = yaml.safe_load
91+
else:
92+
raise ValueError(f"url ({url}) must have a .msgpack.gz or .yaml suffix")
93+
94+
kwargs.setdefault("compression", "infer")
95+
96+
with fsspec.open(url, mode, **kwargs) as f:
97+
return reader(f)
98+
99+
100+
class DumpArtefact(Mapping):
101+
"""
102+
Utility class for inspecting the state of a cluster dump
103+
104+
.. code-block:: python
105+
106+
dump = DumpArtefact.from_url("dump.msgpack.gz")
107+
memory_tasks = dump.scheduler_tasks("memory")
108+
executing_tasks = dump.worker_tasks("executing")
109+
"""
110+
111+
def __init__(self, state: dict):
112+
self.dump = state
113+
114+
@classmethod
115+
def from_url(cls, url: str, **kwargs) -> DumpArtefact:
116+
"""Loads a cluster dump from a disk artefact
117+
118+
Parameters
119+
----------
120+
url : str
121+
Name of the disk artefact. This should have either a
122+
``.msgpack.gz`` or ``yaml`` suffix, depending on the dump format.
123+
**kwargs :
124+
Extra arguments passed to :func:`fsspec.open`.
125+
126+
Returns
127+
-------
128+
state : dict
129+
The cluster state at the time of the dump.
130+
"""
131+
return DumpArtefact(load_cluster_dump(url, **kwargs))
132+
133+
def __getitem__(self, key):
134+
return self.dump[key]
135+
136+
def __iter__(self):
137+
return iter(self.dump)
138+
139+
def __len__(self):
140+
return len(self.dump)
141+
142+
def _extract_tasks(self, state: str | None, context: dict):
143+
if state:
144+
return [v for v in context.values() if v["state"] == state]
145+
else:
146+
return list(context.values())
147+
148+
def scheduler_tasks_in_state(self, state: str | None = None) -> list:
149+
"""
150+
Parameters
151+
----------
152+
state : optional, str
153+
If provided, only tasks in the given state are returned.
154+
Otherwise, all tasks are returned.
155+
156+
Returns
157+
-------
158+
tasks : list
159+
The list of scheduler tasks in ``state``.
160+
"""
161+
return self._extract_tasks(state, self.dump["scheduler"]["tasks"])
162+
163+
def worker_tasks_in_state(self, state: str | None = None) -> list:
164+
"""
165+
Parameters
166+
----------
167+
state : optional, str
168+
If provided, only tasks in the given state are returned.
169+
Otherwise, all tasks are returned.
170+
171+
Returns
172+
-------
173+
tasks : list
174+
The list of worker tasks in ``state``
175+
"""
176+
tasks = []
177+
178+
for worker_dump in self.dump["workers"].values():
179+
if isinstance(worker_dump, dict) and "tasks" in worker_dump:
180+
tasks.extend(self._extract_tasks(state, worker_dump["tasks"]))
181+
182+
return tasks
183+
184+
def scheduler_story(self, *key_or_stimulus_id: str) -> dict:
185+
"""
186+
Returns
187+
-------
188+
stories : dict
189+
A list of stories for the keys/stimulus ID's in ``*key_or_stimulus_id``.
190+
"""
191+
stories = defaultdict(list)
192+
193+
log = self.dump["scheduler"]["transition_log"]
194+
keys = set(key_or_stimulus_id)
195+
196+
for story in _scheduler_story(keys, log):
197+
stories[story[0]].append(tuple(story))
198+
199+
return dict(stories)
200+
201+
def worker_story(self, *key_or_stimulus_id: str) -> dict:
202+
"""
203+
Returns
204+
-------
205+
stories : dict
206+
A dict of stories for the keys/stimulus ID's in ``*key_or_stimulus_id`.`
207+
"""
208+
keys = set(key_or_stimulus_id)
209+
stories = defaultdict(list)
210+
211+
for worker_dump in self.dump["workers"].values():
212+
if isinstance(worker_dump, dict) and "log" in worker_dump:
213+
for story in _worker_story(keys, worker_dump["log"]):
214+
stories[story[0]].append(tuple(story))
215+
216+
return dict(stories)
217+
218+
def missing_workers(self) -> list:
219+
"""
220+
Returns
221+
-------
222+
missing : list
223+
A list of workers connected to the scheduler, but which
224+
did not respond to requests for a state dump.
225+
"""
226+
scheduler_workers = self.dump["scheduler"]["workers"]
227+
responsive_workers = self.dump["workers"]
228+
return [
229+
w
230+
for w in scheduler_workers
231+
if w not in responsive_workers
232+
or not isinstance(responsive_workers[w], dict)
233+
]
234+
235+
def _compact_state(self, state: dict, expand_keys: set[str]):
236+
"""Compacts ``state`` keys into a general key,
237+
unless the key is in ``expand_keys``"""
238+
assert "general" not in state
239+
result = {}
240+
general = {}
241+
242+
for k, v in state.items():
243+
if k in expand_keys:
244+
result[k] = v
245+
else:
246+
general[k] = v
247+
248+
result["general"] = general
249+
return result
250+
251+
def to_yamls(
252+
self,
253+
root_dir: str | Path | None = None,
254+
worker_expand_keys: Collection[str] = ("config", "log", "logs", "tasks"),
255+
scheduler_expand_keys: Collection[str] = (
256+
"events",
257+
"extensions",
258+
"log",
259+
"task_groups",
260+
"tasks",
261+
"transition_log",
262+
"workers",
263+
),
264+
):
265+
"""
266+
Splits the Dump Artefact into a tree of yaml files with
267+
``root_dir`` as it's base.
268+
269+
The root level of the tree contains a directory for the scheduler
270+
and directories for each individual worker.
271+
Each directory contains yaml files describing the state of the scheduler
272+
or worker when the artefact was created.
273+
274+
In general, keys associated with the state are compacted into a ``general.yaml``
275+
file, unless they are in ``scheduler_expand_keys`` and ``worker_expand_keys``.
276+
277+
Parameters
278+
----------
279+
root_dir : str or Path
280+
The root directory into which the tree is written.
281+
Defaults to the current working directory if ``None``.
282+
worker_expand_keys : iterable of str
283+
An iterable of artefact worker keys that will be expanded
284+
into separate yaml files.
285+
Keys that are not in this iterable are compacted into a
286+
`general.yaml` file.
287+
scheduler_expand_keys : iterable of str
288+
An iterable of artefact scheduler keys that will be expanded
289+
into separate yaml files.
290+
Keys that are not in this iterable are compacted into a
291+
``general.yaml`` file.
292+
"""
293+
import yaml
294+
295+
root_dir = Path(root_dir) if root_dir else Path.cwd()
296+
dumper = yaml.CSafeDumper
297+
scheduler_expand_keys = set(scheduler_expand_keys)
298+
worker_expand_keys = set(worker_expand_keys)
299+
300+
workers = self.dump["workers"]
301+
for info in workers.values():
302+
try:
303+
worker_id = info["id"]
304+
except KeyError:
305+
continue
306+
307+
worker_state = self._compact_state(info, worker_expand_keys)
308+
309+
log_dir = root_dir / worker_id
310+
log_dir.mkdir(parents=True, exist_ok=True)
311+
312+
for name, _logs in worker_state.items():
313+
filename = str(log_dir / f"{name}.yaml")
314+
with open(filename, "w") as fd:
315+
yaml.dump(_logs, fd, Dumper=dumper)
316+
317+
context = "scheduler"
318+
scheduler_state = self._compact_state(self.dump[context], scheduler_expand_keys)
319+
320+
log_dir = root_dir / context
321+
log_dir.mkdir(parents=True, exist_ok=True)
322+
# Compact smaller keys into a general dict
323+
324+
for name, _logs in scheduler_state.items():
325+
filename = str(log_dir / f"{name}.yaml")
326+
327+
with open(filename, "w") as fd:
328+
yaml.dump(_logs, fd, Dumper=dumper)

distributed/scheduler.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
from distributed.security import Security
8181
from distributed.semaphore import SemaphoreExtension
8282
from distributed.stealing import WorkStealing
83+
from distributed.stories import scheduler_story
8384
from distributed.utils import (
8485
All,
8586
TimeoutError,
@@ -7534,9 +7535,7 @@ def transitions(self, recommendations: dict):
75347535
def story(self, *keys):
75357536
"""Get all transitions that touch one of the input keys"""
75367537
keys = {key.key if isinstance(key, TaskState) else key for key in keys}
7537-
return [
7538-
t for t in self.transition_log if t[0] in keys or keys.intersection(t[3])
7539-
]
7538+
return scheduler_story(keys, self.transition_log)
75407539

75417540
transition_story = story
75427541

distributed/stories.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from typing import Iterable
2+
3+
4+
def scheduler_story(keys: set, transition_log: Iterable) -> list:
5+
"""Creates a story from the scheduler transition log given a set of keys
6+
describing tasks or stimuli.
7+
8+
Parameters
9+
----------
10+
keys : set
11+
A set of task `keys` or `stimulus_id`'s
12+
log : iterable
13+
The scheduler transition log
14+
15+
Returns
16+
-------
17+
story : list
18+
"""
19+
return [t for t in transition_log if t[0] in keys or keys.intersection(t[3])]
20+
21+
22+
def worker_story(keys: set, log: Iterable) -> list:
23+
"""Creates a story from the worker log given a set of keys
24+
describing tasks or stimuli.
25+
26+
Parameters
27+
----------
28+
keys : set
29+
A set of task `keys` or `stimulus_id`'s
30+
log : iterable
31+
The worker log
32+
33+
Returns
34+
-------
35+
story : list
36+
"""
37+
return [
38+
msg
39+
for msg in log
40+
if any(key in msg for key in keys)
41+
or any(
42+
key in c for key in keys for c in msg if isinstance(c, (tuple, list, set))
43+
)
44+
]

distributed/tests/test_client.py

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
tokenize,
6464
wait,
6565
)
66+
from distributed.cluster_dump import load_cluster_dump
6667
from distributed.comm import CommClosedError
6768
from distributed.compatibility import LINUX, WINDOWS
6869
from distributed.core import Status
@@ -7261,22 +7262,9 @@ def test_print_simple(capsys):
72617262

72627263

72637264
def _verify_cluster_dump(url, format: str, addresses: set[str]) -> dict:
7264-
fsspec = pytest.importorskip("fsspec")
7265-
7266-
url = str(url)
7267-
if format == "msgpack":
7268-
import msgpack
7269-
7270-
url += ".msgpack.gz"
7271-
loader = msgpack.unpack
7272-
else:
7273-
import yaml
7274-
7275-
url += ".yaml"
7276-
loader = yaml.safe_load
7277-
7278-
with fsspec.open(url, mode="rb", compression="infer") as f:
7279-
state = loader(f)
7265+
fsspec = pytest.importorskip("fsspec") # for load_cluster_dump
7266+
url = str(url) + (".msgpack.gz" if format == "msgpack" else ".yaml")
7267+
state = load_cluster_dump(url)
72807268

72817269
assert isinstance(state, dict)
72827270
assert "scheduler" in state

0 commit comments

Comments
 (0)