@@ -2058,12 +2058,21 @@ def transition_generic_fetch(self, ts: TaskState, stimulus_id: str) -> RecsInstr
20582058 # _select_keys_for_gather().
20592059 return {}, [EnsureCommunicatingAfterTransitions (stimulus_id = stimulus_id )]
20602060
2061+ def transition_missing_waiting (
2062+ self , ts : TaskState , * , stimulus_id : str
2063+ ) -> RecsInstrs :
2064+ self ._missing_dep_flight .discard (ts )
2065+ self ._purge_state (ts )
2066+ return self .transition_released_waiting (ts , stimulus_id = stimulus_id )
2067+
20612068 def transition_missing_fetch (
20622069 self , ts : TaskState , * , stimulus_id : str
20632070 ) -> RecsInstrs :
20642071 if self .validate :
20652072 assert ts .state == "missing"
2066- assert ts .who_has
2073+
2074+ if not ts .who_has :
2075+ return {}, []
20672076
20682077 self ._missing_dep_flight .discard (ts )
20692078 return self .transition_generic_fetch (ts , stimulus_id = stimulus_id )
@@ -2105,7 +2114,7 @@ def transition_released_fetch(
21052114 def transition_generic_released (
21062115 self , ts : TaskState , * , stimulus_id : str
21072116 ) -> RecsInstrs :
2108- self ._purge_state (ts . key , stimulus_id = stimulus_id )
2117+ self ._purge_state (ts )
21092118 recs : Recs = {}
21102119 for dependency in ts .dependencies :
21112120 if (
@@ -2127,7 +2136,6 @@ def transition_released_waiting(
21272136 self , ts : TaskState , * , stimulus_id : str
21282137 ) -> RecsInstrs :
21292138 if self .validate :
2130- assert ts .state == "released"
21312139 assert all (d .key in self .tasks for d in ts .dependencies )
21322140
21332141 recommendations : Recs = {}
@@ -2136,10 +2144,7 @@ def transition_released_waiting(
21362144 if dep_ts .state != "memory" :
21372145 ts .waiting_for_data .add (dep_ts )
21382146 dep_ts .waiters .add (ts )
2139- # TODO: main branch claims this shouldn't be conditional since a
2140- # recent change
2141- if dep_ts .state not in {"fetch" , "flight" , "missing" }:
2142- recommendations [dep_ts ] = "fetch"
2147+ recommendations [dep_ts ] = "fetch"
21432148
21442149 if ts .waiting_for_data :
21452150 self .waiting_for_data_count += 1
@@ -2447,7 +2452,6 @@ def transition_cancelled_released(
24472452 self , ts : TaskState , * , stimulus_id : str
24482453 ) -> RecsInstrs :
24492454 if not ts .done :
2450- ts ._next = "released"
24512455 return {}, []
24522456 self ._executing .discard (ts )
24532457 self ._in_flight_tasks .discard (ts )
@@ -2656,7 +2660,7 @@ def transition_released_forgotten(
26562660 dep .dependents .discard (ts )
26572661 if dep .state == "released" and not dep .dependents :
26582662 recommendations [dep ] = "forgotten"
2659- self ._purge_state (ts . key , stimulus_id = stimulus_id )
2663+ self ._purge_state (ts )
26602664 # Mark state as forgotten in case it is still referenced
26612665 ts .state = "forgotten"
26622666 self .tasks .pop (ts .key , None )
@@ -2708,6 +2712,7 @@ def transition_released_forgotten(
27082712 ("missing" , "fetch" ): transition_missing_fetch ,
27092713 ("missing" , "released" ): transition_missing_released ,
27102714 ("missing" , "error" ): transition_generic_error ,
2715+ ("missing" , "waiting" ): transition_missing_waiting ,
27112716 ("ready" , "error" ): transition_generic_error ,
27122717 ("ready" , "executing" ): transition_ready_executing ,
27132718 ("ready" , "released" ): transition_generic_released ,
@@ -2793,7 +2798,9 @@ def _transition(
27932798 (recs , instructions ),
27942799 self ._transition (ts , finish , * args , stimulus_id = stimulus_id ),
27952800 )
2796- except InvalidTransition :
2801+ # ValueError may be raised by merge_recs_instructions
2802+ # TODO: should merge_recs raise InvalidTransition?
2803+ except (ValueError , InvalidTransition ):
27972804 self .log_event (
27982805 "invalid-worker-transition" ,
27992806 {
@@ -3528,25 +3535,14 @@ def handle_worker_status_change(self, status: str, stimulus_id: str) -> None:
35283535 # Update status and send confirmation to the Scheduler (see status.setter)
35293536 self .status = new_status
35303537
3531- def _purge_state (
3532- self ,
3533- key : str ,
3534- * ,
3535- stimulus_id : str ,
3536- ) -> None :
3538+ def _purge_state (self , ts : TaskState ) -> None :
35373539 """Ensure that TaskState attributes are reset to a neutral default and
35383540 Worker-level state associated to the provided key is cleared (e.g.
35393541 who_has)
35403542 This is idempotent
35413543 """
3542- ts = self .tasks [key ]
3543- logger .debug (
3544- "Purge task key: %s state: %s; stimulus_id=%s" ,
3545- ts .key ,
3546- ts .state ,
3547- stimulus_id ,
3548- )
3549- self .log .append ((key , "purge-state" , stimulus_id , time ()))
3544+ key = ts .key
3545+ logger .debug ("Purge task key: %s state: %s; stimulus_id=%s" , ts .key , ts .state )
35503546 self .data .pop (key , None )
35513547 self .actors .pop (key , None )
35523548
@@ -3976,7 +3972,8 @@ def _(self, ev: FindMissingEvent) -> RecsInstrs:
39763972 return {}, []
39773973
39783974 if self .validate :
3979- assert not any (ts .who_has for ts in self ._missing_dep_flight )
3975+ for ts in self ._missing_dep_flight :
3976+ assert not ts .who_has , self .story (ts )
39803977
39813978 smsg = RequestRefreshWhoHasMsg (
39823979 keys = [ts .key for ts in self ._missing_dep_flight ],
@@ -4193,6 +4190,9 @@ def validate_task_executing(self, ts):
41934190 assert ts .run_spec is not None
41944191 assert ts .key not in self .data
41954192 assert not ts .waiting_for_data
4193+ for dep in ts .dependencies :
4194+ assert dep .state == "memory" , self .story (dep )
4195+ assert dep .key in self .data or dep .key in self .actors
41964196
41974197 def validate_task_ready (self , ts ):
41984198 assert ts .key in pluck (1 , self .ready )
@@ -4240,7 +4240,8 @@ def validate_task_missing(self, ts):
42404240 def validate_task_cancelled (self , ts ):
42414241 assert ts .key not in self .data
42424242 assert ts ._previous in {"long-running" , "executing" , "flight" }
4243- assert ts ._next is None # We'll always transition to released after it is done
4243+ # We'll always transition to released after it is done
4244+ assert ts ._next is None , (ts .key , ts ._next , self .story (ts ))
42444245
42454246 def validate_task_resumed (self , ts ):
42464247 assert ts .key not in self .data
@@ -5173,82 +5174,3 @@ async def benchmark_network(
51735174
51745175 out [size_str ] = total / (time () - start )
51755176 return out
5176-
5177-
5178- [
5179- (
5180- "f1" ,
5181- "ensure-task-exists" ,
5182- "released" ,
5183- "compute-task-1654160270.4501" ,
5184- 1654160270.450457 ,
5185- ),
5186- (
5187- "f1" ,
5188- "released" ,
5189- "fetch" ,
5190- "fetch" ,
5191- {},
5192- "compute-task-1654160270.4501" ,
5193- 1654160270.450478 ,
5194- ),
5195- (
5196- "gather-dependencies" ,
5197- "tcp://127.0.0.1:54108" ,
5198- {"f1" },
5199- "compute-task-1654160270.4501" ,
5200- 1654160270.45051 ,
5201- ),
5202- (
5203- "f1" ,
5204- "fetch" ,
5205- "flight" ,
5206- "flight" ,
5207- {},
5208- "compute-task-1654160270.4501" ,
5209- 1654160270.4505181 ,
5210- ),
5211- (
5212- "request-dep" ,
5213- "tcp://127.0.0.1:54108" ,
5214- {"f1" },
5215- "compute-task-1654160270.4501" ,
5216- 1654160270.450558 ,
5217- ),
5218- (
5219- "busy-gather" ,
5220- "tcp://127.0.0.1:54108" ,
5221- {"f1" },
5222- "compute-task-1654160270.4501" ,
5223- 1654160270.4519591 ,
5224- ),
5225- (
5226- "f1" ,
5227- "flight" ,
5228- "fetch" ,
5229- "fetch" ,
5230- {},
5231- "compute-task-1654160270.4501" ,
5232- 1654160270.4519749 ,
5233- ),
5234- ("f1" , "purge-state" , "worker-connect-1654160270.3552" , 1654160270.4605172 ),
5235- (
5236- "f1" ,
5237- "fetch" ,
5238- "released" ,
5239- "released" ,
5240- {"f1" : "forgotten" },
5241- "worker-connect-1654160270.3552" ,
5242- 1654160270.460523 ,
5243- ),
5244- ("f1" , "purge-state" , "worker-connect-1654160270.3552" , 1654160270.460525 ),
5245- (
5246- "f1" ,
5247- "released" ,
5248- "forgotten" ,
5249- "forgotten" ,
5250- {},
5251- "worker-connect-1654160270.3552" ,
5252- 1654160270.4605281 ,
5253- ),
5254- ]
0 commit comments