Skip to content

StateMachine event dispatch mechanism #5894

@fjetter

Description

@fjetter

This is a task part of a bigger epic #5736

The WorkerState interface as described in #5736 (comment) will accept and emit events the Worker (server) needs to listen to. These events may require us to send a message, schedule a coroutine, schedule a task on the threadpool, etc.
These event handlers themselves emit again a set of events that needs to be put into the WorkerState. This callback system works similar to how our RPC implementation works, pseudo code below

class WorkerBase(abc.ABC):
    batched_stream: BatchedSend
    state: WorkerState
    instruction_history: list[StateMachineEvent]

    def _handle_stimulus_from_future(self, fut):
        try:
            stim = fut.result()
        except Exception:
            # This must never happen and the handlers must implement exception handling.
            # If we implement error handling here, this should raise some exception that
            # can be immediately filed as a bug report
            raise
        for s in stim:
            self._handle_stimulus(s)

    def _handle_stimulus(self, stim: StateMachineEvent):
        self.instruction_history.append(stim)
        instructions = self.state.handle_stimulus(stim)
        for inst in instructions:
            task = None
            # TODO: collect all futures and await/cancel when closing?
            # TODO: This dispatch should be easily modifiable to allow for easier testing
            if isinstance(inst, GatherDep):
                task = asyncio.create_task(self._gather_data(inst))
            elif isinstance(inst, Execute):
                task = asyncio.create_task(self.execute(inst))
            elif isinstance(inst, SendMsg):
                self.batched_stream.send(inst.payload)
            else:
                raise RuntimeError("Unknown instruction")
            if task:
                task.add_done_callback(self._handle_stimulus_from_future)

    @abc.abstractmethod
    async def execute(self, inst: Execute) -> Collection[StateMachineEvent]:
        raise NotImplementedError

    @abc.abstractmethod
    async def _gather_data(self, inst: GatherDep) -> Collection[StateMachineEvent]:
        raise NotImplementedError
  • A definition of input/output events exit (in the design document called StateMachineEvents and Instructions)
  • Worker._handle_stimuls implements an dispatch system dispatching events to handlers
  • The handlers follow a strict API of method(self, input_event) -> Collection[StateMachineEvent]
  • The handler output is fed again into the state machine
  • This callback mechanism has dedicated unit tests for sync and async handlers
  • Event handlers are not allowed to raise exceptions but instead need to return appropriate events

To incorporate this callback method into our current code, the signatures of the transition functions can be changed to return a tuple tuple[Recommendations, Collection[Instructions]] (i.e. instructions instead of messages) where the scheduler messages are replaced by appropriate Instructions

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions