<center>
# ArchiveBox States & Lifecycles Specification
**Draft Version 1.0**
</center>
## Abstract
This document specifies the process management and actor model implementation for ArchiveBox, defining the behavior of `Orchestrators`, `Actors`, and `Objects` within the system.
## Table of Contents
A. [Glossary](#a-glossary)
**Core Types:**
B. [`Object`](#b-object)
C. [`Orchestrator`](#c-orchestrator)
D. [`Actor`](#d-actors)
**Helper Methods:**
E. [`Tick`](#e-tick) (`ActorType.tick(self, object)` methods)
F. [`Transition`](#f-transition) (`ActorType.transition_from_abx_to_xyz(self, object)` methods)
G. [`Read`](#g-read) (`archivebox.core.reads.read_<objecttype>_xyz(object)` funcs)
H. [`Write`](#h-write) (`archivebox.core.writes.write_<objecttype>_xyz(object, **kwargs)` funcs)
I. [`Side Effect`](#i-side-effects) (`archivebox.core.sideeffects.sideeffect_xyz(object, **kwargs)` funcs)
<br/>
## A. Glossary
The following terms are used throughout this specification:
- **startup**: When a new process is spawned
- **shutdown**: When a process is exiting
- **start**: At the beginning of some Python code block
- **end**: At the end of some Python code block
- **queue**: A Django queryset of `Objects` of a single type that are waiting to be processed
- **actor**: A long-running daemon process that wakes up and processes a single `Object` from a `queue` at a time
- **plugin**: A Python package that defines hookimpls based on hookspecs exposed by ABX
- **object**: An instance of a Django model that represents a single row in the database
<br/>
---
<br/>
## A. `Object`
> An `object` is a DB row, an instance of a Django `django.db.Model` class.
### A.1. `Object` Requirements
- MUST correspond to a single row in database table, defined by a Django `Model`
- MUST have a finite set of possible states (aka be a finite state machine)
- MUST have a `status` field holding the current state name (e.g., `status="queued"`)
- MUST have a `retry_at` field holding the next check timestamp as a `datetime`
- MUST have `__init__()`, `clean()`, and `save()` methods defined
### A.2. `Object` lifecycle hooks Exposed for plugins
```python
abx.pm.hook.on_<objecttype>_init(object) # In-memory initialization
abx.pm.hook.on_<objecttype>_clean(object) # Form field validation
abx.pm.hook.on_<objecttype>_save(object) # Pre-save operations
```
<br/>
## B. `Orchestrator`
> An [`Orchestrator`](#c-orchestrator) is a single long-running daemon process for the entire collection. It manages spawning [`Actors`](#d-actors) to process queues of [`Objects`](#b-object) to their next states.
**Example `Orchestrator` implementation:**
```python
from multiprocessing import Process
class Orchestrator:
pid: int = None
@classmethod
def spawn_orchestrator(cls):
orchestrator = cls()
orchestrator_bg_proc = Process(target=orchestrator.runloop)
orchestrator_bg_proc.start()
return orchestrator_bg_proc.pid
@classmethod
def get_all_actor_types(self) -> Dict[str, ActorType]:
# returns a Dict of all discovered {actor_type_id: ActorType} ...
return {'Snapshot': SnapshotActorType, 'ArchiveResult_chrome': ChromeActorType, ...}
def runloop(self):
self.pid = os.getpid()
abx.pm.hook.on_orchestrator_startup(self)
try:
while True:
actor_types = self.get_all_actor_types()
all_queues = {
actor_type.name: actor_type.get_queue()
for actor_type in actor_types.values()
}
abx.pm.hook.on_orchestrator_tick_started(self, actor_types, all_queues)
for actor_type, queue in all_queues.items():
running_actors = actor_type.get_running_actors()
actors_to_spawn = actor_type.get_actors_to_spawn(queue, running_actors)
for launch_kwargs in actors_to_spawn:
actor_type.spawn_actor(**launch_kwargs)
orphaned_objects = self.get_orphaned_objects(all_queues)
if orphaned_objects:
print('WARNING: some objects may will not be processed', orphaned_objects)
if not any(len(queue) for queue in all_queues.values()):
# we are idle
abx.pm.hook.on_orchestrator_idle(self, actor_types)
time.sleep(0.250)
except (KeyboardInterrupt, SystemExit, PipeError):
abx.pm.on_orchestrator_shutdown(self)
```
### B.1. `Orchestrator` Lifecycle
- An [`Orchestrator`](#c-orchestrator) MUST start when ArchiveBox starts and stop when ArchiveBox is killed
- Only a maximum of ONE [`Orchestrator`](#c-orchestrator) process MAY run per collection, per machine
- An [`Orchestrator`](#c-orchestrator) MUST be aware of *all* `ActorTypes` defined in the system
### B.2. `Orchestrator` Runloop
The [`Orchestrator`](#c-orchestrator) MUST run a single runloop that continues looping forever until the parent ArchiveBox process is killed. On each loop iteration, the [`Orchestrator`](#c-orchestrator):
A fetch the queue of pending [`Objects`](#b-object) via: `ActorType.get_queue()`
1. loops through each `ActorType` defined in the system:
- get the list of currently running [`Actors`](#d-actors) via: `ActorType.get_running_actors()`
- determine required new [`Actors`](#d-actors) via:
`ActorType.get_actors_to_spawn(queue, current_actors)`
- spawn a new [`Actors`](#d-actors) as a subprocess for each, via: `ActorType.spawn_actor(actors_to_spawn[0], block=False, double_fork=False)`
A. check for orphaned [`Objects`](#b-object) not claimed by any `ActorType`:
- MUST collect all [`Objects`](#b-object) in the DB with a `retry_at` time set but not in any `ActorType` queue
- MUST raise a warning that these orphaned [`Objects`](#b-object) will never be processed
3. sleep for 0.1s before repeating the loop to reduce CPU load
### B.3. `Orchestrator` logic for spawning `Actor` subprocesses
`ActorType.get_actors_to_spawn(queue, current_actors)` determines how many [`Actors`](#d-actors) to spawn and what their launch args should be based on a given queue of pending objects.
A If a small number of `ArchiveResult` [`Objects`](#b-object) are in the queue, and all have the same persona + extractor ➡️ spawn a single [`Actor`](#d-actors) to handle them all
A few examples of how this could work:
- If a large number of `ArchiveResult` [`Objects`](#b-object) are pending ➡️ spawn one [`Actor`](#d-actors) per `persona`:`extractor` combination, up to `ActorType.MAX_CONCURRENT_ACTORS` total
- If `len(current_actors) >= ActorType.MAX_CONCURRENT_ACTORS` ➡️ don't spawn any more
### B.4. `Orchestrator` Lifecycle hooks exposed for plugins
The following `hookspec`s are defined for plugins to hook into the `Orchestrator` lifecycle.
```python
abx.pm.hook.on_orchestrator_startup(all_actor_types)
abx.pm.hook.on_orchestrator_tick_started(all_actor_types, all_queues, all_running_actors)
abx.pm.hook.on_orchestrator_idle(all_actor_types) # only when no pending objects
abx.pm.hook.on_orchestrator_shutdown(all_actor_types)
```
<br/>
## D. `Actor`
> An `Actor` is a semi-long-lived process (an instance of an `ActorType` class). It calls a `tick()` method for each [`Object`](#b-object) in its queue, and exits when its queue is empty.
**Example `ActorType` definition:**
```python
from multiprocessing import Process
class ChromeActor(ActorType):
pid: int = None
persona: Persona = None
chrome_pid: int = None
MAX_CONCURRENT_ACTORS: ClassVar[int] = 4
MAX_TICK_TIME: ClassVar[int] = 60
def __init__(self, persona=None):
self.persona = persona
@classmethod
def spawn_actor(cls, **launch_kwargs):
actor = cls(**launch_kwargs)
bg_actor_proccess = Process(target=actor.runloop)
bg_actor_proccess.start()
return bg_actor_proccess.pid
@classmethod
def get_queue(cls):
return ArchiveResult.objects.filter(status='queued', extractor__in=('pdf', 'dom', 'screenshot'))
@classmethod
def get_actors_to_spawn(cls, queue, running_actors):
if not queue.exists():
return []
personas_needed = queue.values_list('persona', flat=True)
for persona in personas_needed:
existing_actors = [a for a in running_actors if a.persona == persona]
max_spawnable = self.MAX_CONCURRENT_ACTORS - len(existing_actors)
if len(queue) > 10:
yield from max_spawnable * {'persona': persona}
elif len(queue) > 5:
yield from min(2, max_spawnable) * {'persona': persona}
yield from min(1, max_spawnable) * {'persona': persona}
def on_startup(self):
self.pid = os.getpid()
# spawn an instance of chrome in the background and save its PID
self.chrome_pid = subprocess.Popen(['chrome', '--remote-debugging-port=9222', f'--user-data-dir={self.persona.user_data_dir}']).pid
def on_shudown(self):
# kill chrome worker and cleanup lockfile it sometimes leaves behind
psutil.kill(self.chrome_pid)
chrome_lockfile = self.persona.user_data_dir / 'Default' / 'SingletonLock'
chrome_lockfile.unlink(missing_ok=True)
def runloop(self):
abx.pm.hook.on_actor_startup(self)
self.on_startup()
# loops until queue is empty
while True:
queue = self.get_queue()
try:
next_archiveresult_to_process = queue.last()
except ArchiveResult.DoesNotExist:
# stop looping and exit if queue is empty
break
abx.pm.hook.on_actor_tick_start(self, next_archiveresult_to_process)
try:
with TimedProgress(timeout=self.MAX_TICK_TIME):
# run the tick function on the object
self.tick(next_archiveresult_to_process)
except Exception as err:
abx.pm.hook.on_actor_tick_exception(self, next_archiveresult_to_process, err)
abx.pm.hook.on_actor_tick_end(self, next_archiveresult_to_process)
self.on_shutdown()
abx.pm.hook.on_actor_shutdown(self)
def tick(self, archiveresult):
... # see Tick section below
```
### D.1. `Actor` Requirements
- MUST be an instance of an `ActorType` class + some launch `kwargs`
- MUST be a child process of an [`Orchestrator`](#c-orchestrator), and run a `runloop` to consume [`Objects`](#b-object)
- MUST be started lazily by the [`Orchestrator`](#c-orchestrator) only when pending [`Objects`](#b-object) exist in its queue
- MUST have the following methods on its `ActorType` class definition:
- `ActorType.get_queue() -> List[Object]`
- `ActorType.get_running_actors(filter_kwargs: Dict | None=None) -> List[Actor]`
- `ActorType.get_actors_to_spawn(queue: List[Obj], running_actors: List[Act])`
- `ActorType.spawn_actor(launch_kwargs: Dict, fork=True, double_fork=False)`
### D.2. `Actor` Lifecycle
- MAY initialize shared resources on startup (e.g. spawn background worker processes)
- MUST exit when its queue is empty
- MAY release shared resources on exit
### D.3. `Actor` Runloop
- MUST get its queue at the start of each `runloop` iteration and pop one object off the top
- MUST first bump `object.retry_at` by `MAX_TICK_TIME` to avoid concurrent actors clashing
- MUST call only one [`ActorType.tick(object)`](#e-tick) per `runloop` iteration
- MUST enforce [`ActorType.tick(object)`](#e-tick) takes no longer than `ActorType.MAX_TICK_TIME`
- MUST catch and log exceptions from [`ActorType.tick(object)`](#e-tick)
### D.4. `Actor` Lifecycle hooks exposed for plugins
```python
abx.pm.hook.on_actor_startup(actor, queue)
abx.pm.hook.on_actor_tick_start(actor, object)
abx.pm.hook.on_actor_tick_end(actor, object)
abx.pm.hook.on_actor_tick_exception(actor, object, exception)
abx.pm.hook.on_actor_shutdown(actor)
```
<br/>
## E. `Tick`
> An `ActorType.tick(object)` method is a function takes an [`Object`](#b-object), checks if it needs to be moved to a new state, and executes any [`Transition`](#f-transition) needed to do so.
**Example `Tick` method:**
```python
class SnapshotActor(ActorType):
...
def tick(self, snapshot):
# [-> queued] -> started -> sealed
# SEALED (final state, do nothing)
if snapshot.status in SNAPSHOT_FINAL_STATES:
assert snapshot.retry_at is None
return None
else:
assert snapshot.retry_at is not None
# QUEUED -> PARTIAL
elif snapshot.status == 'queued':
transition_snapshot_to_started(snapshot, config, cwd)
# PARTIAL -> SEALED
elif snapshot.status == 'started':
if snapshot_has_pending_archiveresults(snapshot, config, cwd):
# tasks still in-progress, check back again in another 5s
snapshot.retry_at = time.now() + timedelta(seconds=5)
snapshot.save()
else:
# everything is finished, seal the snapshot
transition_snapshot_to_sealed(snapshot, config, cwd)
```
### E.1. `Tick` Requirements
- MUST NOT return any value (it either succeeds and returns `None` or raises an `Exception`)
- MUST NOT call other [`tick()`](#e-tick) methods directly
- MUST NOT attempt to spawn other [`Orchestrator`](#c-orchestrator) or [`Actor`](#d-actors) processes
- SHOULD NOT lock [`Object`](#b-object)s or open long-running DB transactions
- SHOULD NOT enforce its own timeouts, they will be enforced by the [`Actor`](#c-actor) `runloop` code
### E.2. `Tick` Lifecycle
- A tick starts by [`Read`](#g-read)ing relevant state from the db, filesystem, or external APIs
- Then it performs any checks or branching to determine which next state to transition to
- Execute a single [`transition_from_abx_to_xyz()`](#f-transition) method to move its [`Object`](#b-object) to that state
### E.3. `Tick` Lifecycle hooks exposed for Plugins
*The [D.4. `Actor` Lifecycle hooks](#D4-Actor-Lifecycle-hooks-exposed-for-plugins) cover all of the `Tick` lifecycle events.*
<br/>
## F. `Transition`
> One `ActorType.transition_<objecttype>_from_<startstate>_to_<endstate>` call is made at the end of [`ActorType.tick(object)`](#e-tick) to transition its [`Object`](#b-object)'s state.
**Example `Transition` function:**
```python
def transition_snapshot_to_started(snapshot, config, cwd):
# queued [-> started] -> sealed
retry_at = time.now() + timedelta(seconds=10)
retries = snapshot.retries + 1
snapshot_to_update = {'pk': snapshot.pk, 'status': 'queued'}
fields_to_update = {'status': 'started', 'retry_at': retry_at, 'retries': retries, 'start_ts': time.now(), 'end_ts': None}
snapshot = abx.archivebox.writes.update_snapshot(filter_kwargs=snapshot_to_update, update_kwargs=fields_to_update)
# trigger side effects on state transition (these just emit an event to a separate queue thats then processed by a huey worker)
cleanup_snapshot_dir(snapshot, config, cwd)
create_snapshot_pending_archiveresults(snapshot, config, cwd)
update_snapshot_index_json(archiveresult, config, cwd)
update_snapshot_index_html(archiveresult, config, cwd)
```
### F.1. `Transition` Requirements
- MUST NOT lock [`Objects`](#b-object) or open long-lived DB transactions
- MUST NOT have any branching logic
- MUST be idempotent (it should fail silenly/do nothing if run multiple times/concurrently)
- MUST be atomic (`.filter(previous_state).update(next_state)` compare-and-swap)
### F.2. `Transition` Lifecycle
- It should first perform a SINGLE [`Write`](#h-write) to the underlying object to update its state
- In the same write it must update `object.retry_at` time or set to `None` if in a final state
- It should then trigger any [`Side Effects`](#I-Side-Effects) for the transition, which can:
- perform filesystem operations, spawn external processes, or make other system calls
- make external python library calls, 3rd party network API requests, webhooks, etc.
- update `retry_at` time on *other* [`Objects`](#b-object) so that they are enqueued to be rechecked
- MUST NOT directly mutate other [`Objects`](#b-object)' state via [`Write`](#h-write)s (other than `retry_at`)
- MUST NOT directly call other [`Transition`](#f-transition)s in the same object or other objects
- MUST NOT directly call [`Tick`](#e-tick) methods or spawn [`Actor`](#d-actor) or [`Orchestrator`](#b-orchestrator) processes
### F.3. `Transition` Lifecycle hooks Exposed for Plugins
```python
abx.pm.hook.on_transition_<objecttype>_from_abx_to_xyz_start(object)
abx.pm.hook.on_transition_<objecttype>_from_abx_to_xyz_end(object)
```
<br/>
---
<br/>
## G. `Read`
> An `archivebox.core.reads.read_<objecttype>_xyz(object, **kwargs)` method is a getter to abstract making a DB query, cache query, filesystem read, etc.
**Example `Read` function:**
```python
def get_outlink_parents(url, crawl=None, config=None):
scope = Q(dst=url)
if crawl:
scope = scope | Q(via__snapshot__crawl=crawl)
parent = list(Outlink.objects.filter(scope))
if not parent:
# base case: we reached the top of the chain, no more parents left
return []
# recursive case: there is another parent above us, get its parents
yield parent[0]
yield from get_outlink_parents(parent[0].src, crawl=crawl, config=config)
```
## G.1. `Read` Requirements
- MAY read from DB tables, Django cache, filesystem, in-memory caches, etc.
- MAY accept either an `instance`/`pk`/`uuid`/`abid` or `filter_kwargs`
- MAY return an `instance`, `benedict`, `TypedDict`, Pydantic model, or raw value
- MUST NOT have any side effects like DB writes, state changes, transitions, etc.
## G.2. `Read` Lifecycle hooks exposed for Plugins
*Aside from the [`Object` Lifecycle hooks](#A2-Object-lifecycle-hooks-Exposed-for-plugins), no other read-hooks are currently exposed by ABX.*
<br/>
## H. `Write`
> An `archivebox.core.writes.<objecttype>_update(object, **kwargs)` method is a setter to abstract making a DB write, cache update, filesystem write, etc.
**Example `Write` function:**
```python
def create_root_snapshot_from_seed(crawl):
# create a snapshot for the seed URI which kicks off the crawl
# only a single extractor will run on it, which will produce outlinks which get added back to the crawl
root_snapshot, created = abx.archivebox.writes.get_or_create_snapshot(crawl=crawl, url=crawl.seed.uri, config={
'extractors': (
abx.archivebox.reads.get_extractors_that_produce_outlinks()
if crawl.seed.extractor == 'auto' else
[crawl.seed.extractor]
),
**crawl.seed.config,
})
```
## H.1. `Write` Requirements
- MAY perform [`Read`](#g-read) calls during its execution
- MAY perform other [`Write`](#h-write) calls during its execution
- SHOULD perform any DB changes as a single atomic compare-and-swap operation
- SHOULD NOT lock [`Objects`](#b-object) or open long-lived DB transactions
- SHOULD NOT enforce its own timeouts or ratelimits
- SHOULD NOT have branching logic
- MUST NOT trigger other [`Side Effects`](#i-side-effects) or spawn any long-lived subprocesses
### H.1. `Write` Lifecycle hooks exposed for Plugins
```python
abx.pm.hook.on_<objecttype>_created(object)
abx.pm.hook.on_<objecttype>_updated(object)
abx.pm.hook.on_<objecttype>_deleted(object)
```
<br/>
## I. `Side Effects`
> A `archivebox.core.sideeffects.do_xyz_task(object, **kwargs)` method is used to perform a side effect like spawning a subprocess, making external API calls, etc.
**Example `Side Effect` function:**
```python
def exec_extractor(url, extractor, credentials, config):
# run the extractor binary in a background process asyncronously
Extractor = load_extractor(extractor, credentials, config)
cmd_args = Extractor.get_cmd(url)
bg_proc = Extractor.exec(cmd_args, fork=True, double_fork=False, cwd=os.getcwd())
```
### I.1. `Side Effect` Requirements
- MAY spawn processes, make API calls, write to filesystem
- SHOULD execute quickly and return early
- SHOULD NOT block for slow operations
- SHOULD spawn separate background process for long-running operations
### I.2. `Side Effect` Hooks Exposed for Plugins
```python
abx.pm.hook.on_sideeffect_xyz_started(object)
abx.pm.hook.on_sideeffect_xyz_succeeded(object)
abx.pm.hook.on_sideeffect_xyz_failed(object)
```
---
### Further Reading
- https://github.com/fgmacedo/python-statemachine ⭐️
- https://github.com/Borderless360/django-logic ⭐️
- https://github.com/pytransitions/transitions
- https://github.com/django-commons/django-fsm-2
- https://github.com/woile/pyfsm
- https://github.com/hlovatt/statmach
- https://github.com/opsani/statesman
- https://stately.ai/docs/actor-model
- https://zagjs.com/
- https://shopify.engineering/17488160-why-developers-should-be-force-fed-state-machines