154 views
<center> # ArchiveBox States & Lifecycles Specification **Draft Version 1.0** </center> ## Abstract This document specifies the process management and actor model implementation for ArchiveBox, defining the behavior of `Orchestrators`, `Actors`, and `Objects` within the system. ## Table of Contents A. [Glossary](#a-glossary) **Core Types:** B. [`Object`](#b-object) C. [`Orchestrator`](#c-orchestrator) D. [`Actor`](#d-actors) **Helper Methods:** E. [`Tick`](#e-tick) (`ActorType.tick(self, object)` methods) F. [`Transition`](#f-transition) (`ActorType.transition_from_abx_to_xyz(self, object)` methods) G. [`Read`](#g-read) (`archivebox.core.reads.read_<objecttype>_xyz(object)` funcs) H. [`Write`](#h-write) (`archivebox.core.writes.write_<objecttype>_xyz(object, **kwargs)` funcs) I. &nbsp;[`Side Effect`](#i-side-effects) (`archivebox.core.sideeffects.sideeffect_xyz(object, **kwargs)` funcs) <br/> ## A. Glossary The following terms are used throughout this specification: - **startup**: When a new process is spawned - **shutdown**: When a process is exiting - **start**: At the beginning of some Python code block - **end**: At the end of some Python code block - **queue**: A Django queryset of `Objects` of a single type that are waiting to be processed - **actor**: A long-running daemon process that wakes up and processes a single `Object` from a `queue` at a time - **plugin**: A Python package that defines hookimpls based on hookspecs exposed by ABX - **object**: An instance of a Django model that represents a single row in the database <br/> --- <br/> ## A. `Object` > An `object` is a DB row, an instance of a Django `django.db.Model` class. ### A.1. `Object` Requirements - MUST correspond to a single row in database table, defined by a Django `Model` - MUST have a finite set of possible states (aka be a finite state machine) - MUST have a `status` field holding the current state name (e.g., `status="queued"`) - MUST have a `retry_at` field holding the next check timestamp as a `datetime` - MUST have `__init__()`, `clean()`, and `save()` methods defined ### A.2. `Object` lifecycle hooks Exposed for plugins ```python abx.pm.hook.on_<objecttype>_init(object) # In-memory initialization abx.pm.hook.on_<objecttype>_clean(object) # Form field validation abx.pm.hook.on_<objecttype>_save(object) # Pre-save operations ``` <br/> ## B. `Orchestrator` > An [`Orchestrator`](#c-orchestrator) is a single long-running daemon process for the entire collection. It manages spawning [`Actors`](#d-actors) to process queues of [`Objects`](#b-object) to their next states. **Example `Orchestrator` implementation:** ```python from multiprocessing import Process class Orchestrator: pid: int = None @classmethod def spawn_orchestrator(cls): orchestrator = cls() orchestrator_bg_proc = Process(target=orchestrator.runloop) orchestrator_bg_proc.start() return orchestrator_bg_proc.pid @classmethod def get_all_actor_types(self) -> Dict[str, ActorType]: # returns a Dict of all discovered {actor_type_id: ActorType} ... return {'Snapshot': SnapshotActorType, 'ArchiveResult_chrome': ChromeActorType, ...} def runloop(self): self.pid = os.getpid() abx.pm.hook.on_orchestrator_startup(self) try: while True: actor_types = self.get_all_actor_types() all_queues = { actor_type.name: actor_type.get_queue() for actor_type in actor_types.values() } abx.pm.hook.on_orchestrator_tick_started(self, actor_types, all_queues) for actor_type, queue in all_queues.items(): running_actors = actor_type.get_running_actors() actors_to_spawn = actor_type.get_actors_to_spawn(queue, running_actors) for launch_kwargs in actors_to_spawn: actor_type.spawn_actor(**launch_kwargs) orphaned_objects = self.get_orphaned_objects(all_queues) if orphaned_objects: print('WARNING: some objects may will not be processed', orphaned_objects) if not any(len(queue) for queue in all_queues.values()): # we are idle abx.pm.hook.on_orchestrator_idle(self, actor_types) time.sleep(0.250) except (KeyboardInterrupt, SystemExit, PipeError): abx.pm.on_orchestrator_shutdown(self) ``` ### B.1. `Orchestrator` Lifecycle - An [`Orchestrator`](#c-orchestrator) MUST start when ArchiveBox starts and stop when ArchiveBox is killed - Only a maximum of ONE [`Orchestrator`](#c-orchestrator) process MAY run per collection, per machine - An [`Orchestrator`](#c-orchestrator) MUST be aware of *all* `ActorTypes` defined in the system ### B.2. `Orchestrator` Runloop The [`Orchestrator`](#c-orchestrator) MUST run a single runloop that continues looping forever until the parent ArchiveBox process is killed. On each loop iteration, the [`Orchestrator`](#c-orchestrator): A fetch the queue of pending [`Objects`](#b-object) via: `ActorType.get_queue()` 1. loops through each `ActorType` defined in the system: - get the list of currently running [`Actors`](#d-actors) via: `ActorType.get_running_actors()` - determine required new [`Actors`](#d-actors) via: `ActorType.get_actors_to_spawn(queue, current_actors)` - spawn a new [`Actors`](#d-actors) as a subprocess for each, via: `ActorType.spawn_actor(actors_to_spawn[0], block=False, double_fork=False)` A. check for orphaned [`Objects`](#b-object) not claimed by any `ActorType`: - MUST collect all [`Objects`](#b-object) in the DB with a `retry_at` time set but not in any `ActorType` queue - MUST raise a warning that these orphaned [`Objects`](#b-object) will never be processed 3. sleep for 0.1s before repeating the loop to reduce CPU load ### B.3. `Orchestrator` logic for spawning `Actor` subprocesses `ActorType.get_actors_to_spawn(queue, current_actors)` determines how many [`Actors`](#d-actors) to spawn and what their launch args should be based on a given queue of pending objects. A If a small number of `ArchiveResult` [`Objects`](#b-object) are in the queue, and all have the same persona + extractor ➡️ spawn a single [`Actor`](#d-actors) to handle them all A few examples of how this could work: - If a large number of `ArchiveResult` [`Objects`](#b-object) are pending ➡️ spawn one [`Actor`](#d-actors) per `persona`:`extractor` combination, up to `ActorType.MAX_CONCURRENT_ACTORS` total - If `len(current_actors) >= ActorType.MAX_CONCURRENT_ACTORS` ➡️ don't spawn any more ### B.4. `Orchestrator` Lifecycle hooks exposed for plugins The following `hookspec`s are defined for plugins to hook into the `Orchestrator` lifecycle. ```python abx.pm.hook.on_orchestrator_startup(all_actor_types) abx.pm.hook.on_orchestrator_tick_started(all_actor_types, all_queues, all_running_actors) abx.pm.hook.on_orchestrator_idle(all_actor_types) # only when no pending objects abx.pm.hook.on_orchestrator_shutdown(all_actor_types) ``` <br/> ## D. `Actor` > An `Actor` is a semi-long-lived process (an instance of an `ActorType` class). It calls a `tick()` method for each [`Object`](#b-object) in its queue, and exits when its queue is empty. **Example `ActorType` definition:** ```python from multiprocessing import Process class ChromeActor(ActorType): pid: int = None persona: Persona = None chrome_pid: int = None MAX_CONCURRENT_ACTORS: ClassVar[int] = 4 MAX_TICK_TIME: ClassVar[int] = 60 def __init__(self, persona=None): self.persona = persona @classmethod def spawn_actor(cls, **launch_kwargs): actor = cls(**launch_kwargs) bg_actor_proccess = Process(target=actor.runloop) bg_actor_proccess.start() return bg_actor_proccess.pid @classmethod def get_queue(cls): return ArchiveResult.objects.filter(status='queued', extractor__in=('pdf', 'dom', 'screenshot')) @classmethod def get_actors_to_spawn(cls, queue, running_actors): if not queue.exists(): return [] personas_needed = queue.values_list('persona', flat=True) for persona in personas_needed: existing_actors = [a for a in running_actors if a.persona == persona] max_spawnable = self.MAX_CONCURRENT_ACTORS - len(existing_actors) if len(queue) > 10: yield from max_spawnable * {'persona': persona} elif len(queue) > 5: yield from min(2, max_spawnable) * {'persona': persona} yield from min(1, max_spawnable) * {'persona': persona} def on_startup(self): self.pid = os.getpid() # spawn an instance of chrome in the background and save its PID self.chrome_pid = subprocess.Popen(['chrome', '--remote-debugging-port=9222', f'--user-data-dir={self.persona.user_data_dir}']).pid def on_shudown(self): # kill chrome worker and cleanup lockfile it sometimes leaves behind psutil.kill(self.chrome_pid) chrome_lockfile = self.persona.user_data_dir / 'Default' / 'SingletonLock' chrome_lockfile.unlink(missing_ok=True) def runloop(self): abx.pm.hook.on_actor_startup(self) self.on_startup() # loops until queue is empty while True: queue = self.get_queue() try: next_archiveresult_to_process = queue.last() except ArchiveResult.DoesNotExist: # stop looping and exit if queue is empty break abx.pm.hook.on_actor_tick_start(self, next_archiveresult_to_process) try: with TimedProgress(timeout=self.MAX_TICK_TIME): # run the tick function on the object self.tick(next_archiveresult_to_process) except Exception as err: abx.pm.hook.on_actor_tick_exception(self, next_archiveresult_to_process, err) abx.pm.hook.on_actor_tick_end(self, next_archiveresult_to_process) self.on_shutdown() abx.pm.hook.on_actor_shutdown(self) def tick(self, archiveresult): ... # see Tick section below ``` ### D.1. `Actor` Requirements - MUST be an instance of an `ActorType` class + some launch `kwargs` - MUST be a child process of an [`Orchestrator`](#c-orchestrator), and run a `runloop` to consume [`Objects`](#b-object) - MUST be started lazily by the [`Orchestrator`](#c-orchestrator) only when pending [`Objects`](#b-object) exist in its queue - MUST have the following methods on its `ActorType` class definition: - `ActorType.get_queue() -> List[Object]` - `ActorType.get_running_actors(filter_kwargs: Dict | None=None) -> List[Actor]` - `ActorType.get_actors_to_spawn(queue: List[Obj], running_actors: List[Act])` - `ActorType.spawn_actor(launch_kwargs: Dict, fork=True, double_fork=False)` ### D.2. `Actor` Lifecycle - MAY initialize shared resources on startup (e.g. spawn background worker processes) - MUST exit when its queue is empty - MAY release shared resources on exit ### D.3. `Actor` Runloop - MUST get its queue at the start of each `runloop` iteration and pop one object off the top - MUST first bump `object.retry_at` by `MAX_TICK_TIME` to avoid concurrent actors clashing - MUST call only one [`ActorType.tick(object)`](#e-tick) per `runloop` iteration - MUST enforce [`ActorType.tick(object)`](#e-tick) takes no longer than `ActorType.MAX_TICK_TIME` - MUST catch and log exceptions from [`ActorType.tick(object)`](#e-tick) ### D.4. `Actor` Lifecycle hooks exposed for plugins ```python abx.pm.hook.on_actor_startup(actor, queue) abx.pm.hook.on_actor_tick_start(actor, object) abx.pm.hook.on_actor_tick_end(actor, object) abx.pm.hook.on_actor_tick_exception(actor, object, exception) abx.pm.hook.on_actor_shutdown(actor) ``` <br/> ## E. `Tick` > An `ActorType.tick(object)` method is a function takes an [`Object`](#b-object), checks if it needs to be moved to a new state, and executes any [`Transition`](#f-transition) needed to do so. **Example `Tick` method:** ```python class SnapshotActor(ActorType): ... def tick(self, snapshot): # [-> queued] -> started -> sealed # SEALED (final state, do nothing) if snapshot.status in SNAPSHOT_FINAL_STATES: assert snapshot.retry_at is None return None else: assert snapshot.retry_at is not None # QUEUED -> PARTIAL elif snapshot.status == 'queued': transition_snapshot_to_started(snapshot, config, cwd) # PARTIAL -> SEALED elif snapshot.status == 'started': if snapshot_has_pending_archiveresults(snapshot, config, cwd): # tasks still in-progress, check back again in another 5s snapshot.retry_at = time.now() + timedelta(seconds=5) snapshot.save() else: # everything is finished, seal the snapshot transition_snapshot_to_sealed(snapshot, config, cwd) ``` ### E.1. `Tick` Requirements - MUST NOT return any value (it either succeeds and returns `None` or raises an `Exception`) - MUST NOT call other [`tick()`](#e-tick) methods directly - MUST NOT attempt to spawn other [`Orchestrator`](#c-orchestrator) or [`Actor`](#d-actors) processes - SHOULD NOT lock [`Object`](#b-object)s or open long-running DB transactions - SHOULD NOT enforce its own timeouts, they will be enforced by the [`Actor`](#c-actor) `runloop` code ### E.2. `Tick` Lifecycle - A tick starts by [`Read`](#g-read)ing relevant state from the db, filesystem, or external APIs - Then it performs any checks or branching to determine which next state to transition to - Execute a single [`transition_from_abx_to_xyz()`](#f-transition) method to move its [`Object`](#b-object) to that state ### E.3. `Tick` Lifecycle hooks exposed for Plugins *The [D.4. `Actor` Lifecycle hooks](#D4-Actor-Lifecycle-hooks-exposed-for-plugins) cover all of the `Tick` lifecycle events.* <br/> ## F. `Transition` > One `ActorType.transition_<objecttype>_from_<startstate>_to_<endstate>` call is made at the end of [`ActorType.tick(object)`](#e-tick) to transition its [`Object`](#b-object)'s state. **Example `Transition` function:** ```python def transition_snapshot_to_started(snapshot, config, cwd): # queued [-> started] -> sealed retry_at = time.now() + timedelta(seconds=10) retries = snapshot.retries + 1 snapshot_to_update = {'pk': snapshot.pk, 'status': 'queued'} fields_to_update = {'status': 'started', 'retry_at': retry_at, 'retries': retries, 'start_ts': time.now(), 'end_ts': None} snapshot = abx.archivebox.writes.update_snapshot(filter_kwargs=snapshot_to_update, update_kwargs=fields_to_update) # trigger side effects on state transition (these just emit an event to a separate queue thats then processed by a huey worker) cleanup_snapshot_dir(snapshot, config, cwd) create_snapshot_pending_archiveresults(snapshot, config, cwd) update_snapshot_index_json(archiveresult, config, cwd) update_snapshot_index_html(archiveresult, config, cwd) ``` ### F.1. `Transition` Requirements - MUST NOT lock [`Objects`](#b-object) or open long-lived DB transactions - MUST NOT have any branching logic - MUST be idempotent (it should fail silenly/do nothing if run multiple times/concurrently) - MUST be atomic (`.filter(previous_state).update(next_state)` compare-and-swap) ### F.2. `Transition` Lifecycle - It should first perform a SINGLE [`Write`](#h-write) to the underlying object to update its state - In the same write it must update `object.retry_at` time or set to `None` if in a final state - It should then trigger any [`Side Effects`](#I-Side-Effects) for the transition, which can: - perform filesystem operations, spawn external processes, or make other system calls - make external python library calls, 3rd party network API requests, webhooks, etc. - update `retry_at` time on *other* [`Objects`](#b-object) so that they are enqueued to be rechecked - MUST NOT directly mutate other [`Objects`](#b-object)' state via [`Write`](#h-write)s (other than `retry_at`) - MUST NOT directly call other [`Transition`](#f-transition)s in the same object or other objects - MUST NOT directly call [`Tick`](#e-tick) methods or spawn [`Actor`](#d-actor) or [`Orchestrator`](#b-orchestrator) processes ### F.3. `Transition` Lifecycle hooks Exposed for Plugins ```python abx.pm.hook.on_transition_<objecttype>_from_abx_to_xyz_start(object) abx.pm.hook.on_transition_<objecttype>_from_abx_to_xyz_end(object) ``` <br/> --- <br/> ## G. `Read` > An `archivebox.core.reads.read_<objecttype>_xyz(object, **kwargs)` method is a getter to abstract making a DB query, cache query, filesystem read, etc. **Example `Read` function:** ```python def get_outlink_parents(url, crawl=None, config=None): scope = Q(dst=url) if crawl: scope = scope | Q(via__snapshot__crawl=crawl) parent = list(Outlink.objects.filter(scope)) if not parent: # base case: we reached the top of the chain, no more parents left return [] # recursive case: there is another parent above us, get its parents yield parent[0] yield from get_outlink_parents(parent[0].src, crawl=crawl, config=config) ``` ## G.1. `Read` Requirements - MAY read from DB tables, Django cache, filesystem, in-memory caches, etc. - MAY accept either an `instance`/`pk`/`uuid`/`abid` or `filter_kwargs` - MAY return an `instance`, `benedict`, `TypedDict`, Pydantic model, or raw value - MUST NOT have any side effects like DB writes, state changes, transitions, etc. ## G.2. `Read` Lifecycle hooks exposed for Plugins *Aside from the [`Object` Lifecycle hooks](#A2-Object-lifecycle-hooks-Exposed-for-plugins), no other read-hooks are currently exposed by ABX.* <br/> ## H. `Write` > An `archivebox.core.writes.<objecttype>_update(object, **kwargs)` method is a setter to abstract making a DB write, cache update, filesystem write, etc. **Example `Write` function:** ```python def create_root_snapshot_from_seed(crawl): # create a snapshot for the seed URI which kicks off the crawl # only a single extractor will run on it, which will produce outlinks which get added back to the crawl root_snapshot, created = abx.archivebox.writes.get_or_create_snapshot(crawl=crawl, url=crawl.seed.uri, config={ 'extractors': ( abx.archivebox.reads.get_extractors_that_produce_outlinks() if crawl.seed.extractor == 'auto' else [crawl.seed.extractor] ), **crawl.seed.config, }) ``` ## H.1. `Write` Requirements - MAY perform [`Read`](#g-read) calls during its execution - MAY perform other [`Write`](#h-write) calls during its execution - SHOULD perform any DB changes as a single atomic compare-and-swap operation - SHOULD NOT lock [`Objects`](#b-object) or open long-lived DB transactions - SHOULD NOT enforce its own timeouts or ratelimits - SHOULD NOT have branching logic - MUST NOT trigger other [`Side Effects`](#i-side-effects) or spawn any long-lived subprocesses ### H.1. `Write` Lifecycle hooks exposed for Plugins ```python abx.pm.hook.on_<objecttype>_created(object) abx.pm.hook.on_<objecttype>_updated(object) abx.pm.hook.on_<objecttype>_deleted(object) ``` <br/> ## I. `Side Effects` > A `archivebox.core.sideeffects.do_xyz_task(object, **kwargs)` method is used to perform a side effect like spawning a subprocess, making external API calls, etc. **Example `Side Effect` function:** ```python def exec_extractor(url, extractor, credentials, config): # run the extractor binary in a background process asyncronously Extractor = load_extractor(extractor, credentials, config) cmd_args = Extractor.get_cmd(url) bg_proc = Extractor.exec(cmd_args, fork=True, double_fork=False, cwd=os.getcwd()) ``` ### I.1. `Side Effect` Requirements - MAY spawn processes, make API calls, write to filesystem - SHOULD execute quickly and return early - SHOULD NOT block for slow operations - SHOULD spawn separate background process for long-running operations ### I.2. `Side Effect` Hooks Exposed for Plugins ```python abx.pm.hook.on_sideeffect_xyz_started(object) abx.pm.hook.on_sideeffect_xyz_succeeded(object) abx.pm.hook.on_sideeffect_xyz_failed(object) ``` --- ### Further Reading - https://github.com/fgmacedo/python-statemachine ⭐️ - https://github.com/Borderless360/django-logic ⭐️ - https://github.com/pytransitions/transitions - https://github.com/django-commons/django-fsm-2 - https://github.com/woile/pyfsm - https://github.com/hlovatt/statmach - https://github.com/opsani/statesman - https://stately.ai/docs/actor-model - https://zagjs.com/ - https://shopify.engineering/17488160-why-developers-should-be-force-fed-state-machines