-
-
Notifications
You must be signed in to change notification settings - Fork 750
Description
This is a task part of a bigger epic #5736
The WorkerState interface as described in #5736 (comment) will accept and emit events the Worker (server) needs to listen to. These events may require us to send a message, schedule a coroutine, schedule a task on the threadpool, etc.
These event handlers themselves emit again a set of events that needs to be put into the WorkerState. This callback system works similar to how our RPC implementation works, pseudo code below
class WorkerBase(abc.ABC):
batched_stream: BatchedSend
state: WorkerState
instruction_history: list[StateMachineEvent]
def _handle_stimulus_from_future(self, fut):
try:
stim = fut.result()
except Exception:
# This must never happen and the handlers must implement exception handling.
# If we implement error handling here, this should raise some exception that
# can be immediately filed as a bug report
raise
for s in stim:
self._handle_stimulus(s)
def _handle_stimulus(self, stim: StateMachineEvent):
self.instruction_history.append(stim)
instructions = self.state.handle_stimulus(stim)
for inst in instructions:
task = None
# TODO: collect all futures and await/cancel when closing?
# TODO: This dispatch should be easily modifiable to allow for easier testing
if isinstance(inst, GatherDep):
task = asyncio.create_task(self._gather_data(inst))
elif isinstance(inst, Execute):
task = asyncio.create_task(self.execute(inst))
elif isinstance(inst, SendMsg):
self.batched_stream.send(inst.payload)
else:
raise RuntimeError("Unknown instruction")
if task:
task.add_done_callback(self._handle_stimulus_from_future)
@abc.abstractmethod
async def execute(self, inst: Execute) -> Collection[StateMachineEvent]:
raise NotImplementedError
@abc.abstractmethod
async def _gather_data(self, inst: GatherDep) -> Collection[StateMachineEvent]:
raise NotImplementedError- A definition of input/output events exit (in the design document called
StateMachineEventsandInstructions) - Worker._handle_stimuls implements an dispatch system dispatching events to handlers
- The handlers follow a strict API of
method(self, input_event) -> Collection[StateMachineEvent] - The handler output is fed again into the state machine
- This callback mechanism has dedicated unit tests for sync and async handlers
- Event handlers are not allowed to raise exceptions but instead need to return appropriate events
To incorporate this callback method into our current code, the signatures of the transition functions can be changed to return a tuple tuple[Recommendations, Collection[Instructions]] (i.e. instructions instead of messages) where the scheduler messages are replaced by appropriate Instructions