Skip to content

Commit 853a59f

Browse files
committed
More fixes
1 parent b404dd4 commit 853a59f

File tree

3 files changed

+47
-107
lines changed

3 files changed

+47
-107
lines changed

distributed/tests/test_worker.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3047,12 +3047,10 @@ async def test_task_flight_compute_oserror(c, s, a, b):
30473047
),
30483048
# inc is lost and needs to be recomputed. Therefore, sum is released
30493049
("free-keys", ("f1",)),
3050-
("f1", "purge-state"),
30513050
# The recommendations here are hard to predict. Whatever key is
30523051
# currently scheduled to be fetched, if any, will be recommended to be
30533052
# released.
30543053
("f1", "waiting", "released", "released", lambda msg: msg["f1"] == "forgotten"),
3055-
("f1", "purge-state"),
30563054
("f1", "released", "forgotten", "forgotten", {}),
30573055
# Now, we actually compute the task *once*. This must not cycle back
30583056
("f1", "compute-task", "released"),

distributed/tests/test_worker_state_machine.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,7 @@ async def test_missing_handle_compute_dependency(c, s, w1, w2, w3):
505505

506506
f1 = c.submit(inc, 1, key="f1", workers=[w1.address])
507507
f2 = c.submit(inc, 2, key="f2", workers=[w1.address])
508+
await wait_for_state(f1.key, "memory", w1)
508509

509510
w3.handle_acquire_replicas(
510511
keys=[f1.key], who_has={f1.key: [w2.address]}, stimulus_id="acquire"
@@ -515,3 +516,22 @@ async def test_missing_handle_compute_dependency(c, s, w1, w2, w3):
515516
f3 = c.submit(sum, [f1, f2], key="f3", workers=[w3.address])
516517

517518
await f3
519+
520+
521+
@gen_cluster(client=True, nthreads=[("", 1)] * 3)
522+
async def test_missing_to_waiting(c, s, w1, w2, w3):
523+
w3.periodic_callbacks["find-missing"].stop()
524+
525+
f1 = c.submit(inc, 1, key="f1", workers=[w1.address], allow_other_workers=True)
526+
await wait_for_state(f1.key, "memory", w1)
527+
528+
w3.handle_acquire_replicas(
529+
keys=[f1.key], who_has={f1.key: [w2.address]}, stimulus_id="acquire"
530+
)
531+
532+
await wait_for_state(f1.key, "missing", w3)
533+
534+
await w2.close()
535+
await w1.close()
536+
537+
await f1

distributed/worker.py

Lines changed: 27 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -2058,12 +2058,21 @@ def transition_generic_fetch(self, ts: TaskState, stimulus_id: str) -> RecsInstr
20582058
# _select_keys_for_gather().
20592059
return {}, [EnsureCommunicatingAfterTransitions(stimulus_id=stimulus_id)]
20602060

2061+
def transition_missing_waiting(
2062+
self, ts: TaskState, *, stimulus_id: str
2063+
) -> RecsInstrs:
2064+
self._missing_dep_flight.discard(ts)
2065+
self._purge_state(ts)
2066+
return self.transition_released_waiting(ts, stimulus_id=stimulus_id)
2067+
20612068
def transition_missing_fetch(
20622069
self, ts: TaskState, *, stimulus_id: str
20632070
) -> RecsInstrs:
20642071
if self.validate:
20652072
assert ts.state == "missing"
2066-
assert ts.who_has
2073+
2074+
if not ts.who_has:
2075+
return {}, []
20672076

20682077
self._missing_dep_flight.discard(ts)
20692078
return self.transition_generic_fetch(ts, stimulus_id=stimulus_id)
@@ -2105,7 +2114,7 @@ def transition_released_fetch(
21052114
def transition_generic_released(
21062115
self, ts: TaskState, *, stimulus_id: str
21072116
) -> RecsInstrs:
2108-
self._purge_state(ts.key, stimulus_id=stimulus_id)
2117+
self._purge_state(ts)
21092118
recs: Recs = {}
21102119
for dependency in ts.dependencies:
21112120
if (
@@ -2127,7 +2136,6 @@ def transition_released_waiting(
21272136
self, ts: TaskState, *, stimulus_id: str
21282137
) -> RecsInstrs:
21292138
if self.validate:
2130-
assert ts.state == "released"
21312139
assert all(d.key in self.tasks for d in ts.dependencies)
21322140

21332141
recommendations: Recs = {}
@@ -2136,10 +2144,7 @@ def transition_released_waiting(
21362144
if dep_ts.state != "memory":
21372145
ts.waiting_for_data.add(dep_ts)
21382146
dep_ts.waiters.add(ts)
2139-
# TODO: main branch claims this shouldn't be conditional since a
2140-
# recent change
2141-
if dep_ts.state not in {"fetch", "flight", "missing"}:
2142-
recommendations[dep_ts] = "fetch"
2147+
recommendations[dep_ts] = "fetch"
21432148

21442149
if ts.waiting_for_data:
21452150
self.waiting_for_data_count += 1
@@ -2447,7 +2452,6 @@ def transition_cancelled_released(
24472452
self, ts: TaskState, *, stimulus_id: str
24482453
) -> RecsInstrs:
24492454
if not ts.done:
2450-
ts._next = "released"
24512455
return {}, []
24522456
self._executing.discard(ts)
24532457
self._in_flight_tasks.discard(ts)
@@ -2656,7 +2660,7 @@ def transition_released_forgotten(
26562660
dep.dependents.discard(ts)
26572661
if dep.state == "released" and not dep.dependents:
26582662
recommendations[dep] = "forgotten"
2659-
self._purge_state(ts.key, stimulus_id=stimulus_id)
2663+
self._purge_state(ts)
26602664
# Mark state as forgotten in case it is still referenced
26612665
ts.state = "forgotten"
26622666
self.tasks.pop(ts.key, None)
@@ -2708,6 +2712,7 @@ def transition_released_forgotten(
27082712
("missing", "fetch"): transition_missing_fetch,
27092713
("missing", "released"): transition_missing_released,
27102714
("missing", "error"): transition_generic_error,
2715+
("missing", "waiting"): transition_missing_waiting,
27112716
("ready", "error"): transition_generic_error,
27122717
("ready", "executing"): transition_ready_executing,
27132718
("ready", "released"): transition_generic_released,
@@ -2793,7 +2798,9 @@ def _transition(
27932798
(recs, instructions),
27942799
self._transition(ts, finish, *args, stimulus_id=stimulus_id),
27952800
)
2796-
except InvalidTransition:
2801+
# ValueError may be raised by merge_recs_instructions
2802+
# TODO: should merge_recs raise InvalidTransition?
2803+
except (ValueError, InvalidTransition):
27972804
self.log_event(
27982805
"invalid-worker-transition",
27992806
{
@@ -3528,25 +3535,14 @@ def handle_worker_status_change(self, status: str, stimulus_id: str) -> None:
35283535
# Update status and send confirmation to the Scheduler (see status.setter)
35293536
self.status = new_status
35303537

3531-
def _purge_state(
3532-
self,
3533-
key: str,
3534-
*,
3535-
stimulus_id: str,
3536-
) -> None:
3538+
def _purge_state(self, ts: TaskState) -> None:
35373539
"""Ensure that TaskState attributes are reset to a neutral default and
35383540
Worker-level state associated to the provided key is cleared (e.g.
35393541
who_has)
35403542
This is idempotent
35413543
"""
3542-
ts = self.tasks[key]
3543-
logger.debug(
3544-
"Purge task key: %s state: %s; stimulus_id=%s",
3545-
ts.key,
3546-
ts.state,
3547-
stimulus_id,
3548-
)
3549-
self.log.append((key, "purge-state", stimulus_id, time()))
3544+
key = ts.key
3545+
logger.debug("Purge task key: %s state: %s; stimulus_id=%s", ts.key, ts.state)
35503546
self.data.pop(key, None)
35513547
self.actors.pop(key, None)
35523548

@@ -3976,7 +3972,8 @@ def _(self, ev: FindMissingEvent) -> RecsInstrs:
39763972
return {}, []
39773973

39783974
if self.validate:
3979-
assert not any(ts.who_has for ts in self._missing_dep_flight)
3975+
for ts in self._missing_dep_flight:
3976+
assert not ts.who_has, self.story(ts)
39803977

39813978
smsg = RequestRefreshWhoHasMsg(
39823979
keys=[ts.key for ts in self._missing_dep_flight],
@@ -4193,6 +4190,9 @@ def validate_task_executing(self, ts):
41934190
assert ts.run_spec is not None
41944191
assert ts.key not in self.data
41954192
assert not ts.waiting_for_data
4193+
for dep in ts.dependencies:
4194+
assert dep.state == "memory", self.story(dep)
4195+
assert dep.key in self.data or dep.key in self.actors
41964196

41974197
def validate_task_ready(self, ts):
41984198
assert ts.key in pluck(1, self.ready)
@@ -4240,7 +4240,8 @@ def validate_task_missing(self, ts):
42404240
def validate_task_cancelled(self, ts):
42414241
assert ts.key not in self.data
42424242
assert ts._previous in {"long-running", "executing", "flight"}
4243-
assert ts._next is None # We'll always transition to released after it is done
4243+
# We'll always transition to released after it is done
4244+
assert ts._next is None, (ts.key, ts._next, self.story(ts))
42444245

42454246
def validate_task_resumed(self, ts):
42464247
assert ts.key not in self.data
@@ -5173,82 +5174,3 @@ async def benchmark_network(
51735174

51745175
out[size_str] = total / (time() - start)
51755176
return out
5176-
5177-
5178-
[
5179-
(
5180-
"f1",
5181-
"ensure-task-exists",
5182-
"released",
5183-
"compute-task-1654160270.4501",
5184-
1654160270.450457,
5185-
),
5186-
(
5187-
"f1",
5188-
"released",
5189-
"fetch",
5190-
"fetch",
5191-
{},
5192-
"compute-task-1654160270.4501",
5193-
1654160270.450478,
5194-
),
5195-
(
5196-
"gather-dependencies",
5197-
"tcp://127.0.0.1:54108",
5198-
{"f1"},
5199-
"compute-task-1654160270.4501",
5200-
1654160270.45051,
5201-
),
5202-
(
5203-
"f1",
5204-
"fetch",
5205-
"flight",
5206-
"flight",
5207-
{},
5208-
"compute-task-1654160270.4501",
5209-
1654160270.4505181,
5210-
),
5211-
(
5212-
"request-dep",
5213-
"tcp://127.0.0.1:54108",
5214-
{"f1"},
5215-
"compute-task-1654160270.4501",
5216-
1654160270.450558,
5217-
),
5218-
(
5219-
"busy-gather",
5220-
"tcp://127.0.0.1:54108",
5221-
{"f1"},
5222-
"compute-task-1654160270.4501",
5223-
1654160270.4519591,
5224-
),
5225-
(
5226-
"f1",
5227-
"flight",
5228-
"fetch",
5229-
"fetch",
5230-
{},
5231-
"compute-task-1654160270.4501",
5232-
1654160270.4519749,
5233-
),
5234-
("f1", "purge-state", "worker-connect-1654160270.3552", 1654160270.4605172),
5235-
(
5236-
"f1",
5237-
"fetch",
5238-
"released",
5239-
"released",
5240-
{"f1": "forgotten"},
5241-
"worker-connect-1654160270.3552",
5242-
1654160270.460523,
5243-
),
5244-
("f1", "purge-state", "worker-connect-1654160270.3552", 1654160270.460525),
5245-
(
5246-
"f1",
5247-
"released",
5248-
"forgotten",
5249-
"forgotten",
5250-
{},
5251-
"worker-connect-1654160270.3552",
5252-
1654160270.4605281,
5253-
),
5254-
]

0 commit comments

Comments
 (0)