dawgz provides a lightweight and intuitive Python interface to declare, schedule and execute job workflows. It can also delegate execution to resource management backends such as Slurm, which means you can write, configure, and submit your workflows without ever leaving Python.
The dawgz package is available on PyPi and can be installed with pip.
pip install dawgz
Alternatively, if you need the latest features, you can install it from source.
pip install git+https://github.com/francois-rozet/dawgz
In dawgz, a job is a Python function. The dawgz.job decorator allows to declare the resources a job requires and capture its arguments. A job's dependencies within the workflow are declared with the dawgz.Job.after method. After declaration, the dawgz.schedule function takes care of scheduling the jobs and their dependencies, with a selected execution backend. For more information, check out the interface and the examples.
Follows a small example demonstrating how one could use dawgz to calculate π (very roughly) using the Monte Carlo method. We define two jobs, generate and estimate. Five instances of generate are declared that will be executed concurrently. The estimate job has all generate instances as dependencies, meaning that it will only start after they have completed successfuly.
import dawgz
import glob
import numpy as np
@dawgz.job(cpus=1, ram="2GB", time="5:00")
def generate(i: int):
print(f"Task {i + 1}")
x = np.random.random(10000)
y = np.random.random(10000)
within_circle = x**2 + y**2 <= 1
np.save(f"pi_{i}.npy", within_circle)
@dawgz.job(cpus=2, ram="4GB", time="15:00")
def estimate():
files = glob.glob("pi_*.npy")
stack = np.vstack([np.load(f) for f in files])
pi_estimate = stack.mean() * 4
print(f"π ≈ {pi_estimate}")
if __name__ == "__main__":
generate_jobs = [generate(i) for i in range(5)]
estimate_job = estimate().after(*generate_jobs)
dawgz.schedule(estimate_job, backend="async")$ python examples/pi.py
Task 1
Task 2
Task 3
Task 4
Task 5
π ≈ 3.14936
Alternatively, on a Slurm HPC cluster, changing the backend to "slurm" results in the following job queue.
$ python examples/pi.py
$ squeue -u username
JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON)
1868833 all 0005_estimate username PD 0:00 1 (Dependency)
1868832 all 0004_generate username PD 0:00 1 (Resources)
1868831 all 0003_generate username PD 0:00 1 (Resources)
1868830 all 0002_generate username PD 0:00 1 (Resources)
1868828 all 0000_generate username R 0:02 1 node-x
1868829 all 0001_generate username R 0:01 1 node-y
In addition to the Python interface, dawgz provides a simple command-line interface to list the scheduled workflows, the jobs of a workflow or the output of a job.
$ dawgz
╭────┬───────┬──────────────────────────┬─────────────────────┬─────────┬──────┬────────╮
│ │ Name │ ID │ Date │ Backend │ Jobs │ Errors │
├────┼───────┼──────────────────────────┼─────────────────────┼─────────┼──────┼────────┤
│ 0 │ pi.py │ handsome_jicama_bfc5a3e4 │ 2022-02-28 16:37:58 │ async │ 6 │ 0 │
│ 1 │ pi.py │ crowded_machine_23bdd047 │ 2022-02-28 16:38:33 │ slurm │ 6 │ 0 │
╰────┴───────┴──────────────────────────┴─────────────────────┴─────────┴──────┴────────╯
$ dawgz 1
╭────┬──────────┬───────────┬─────────╮
│ │ Job │ State │ ID |
├────┼──────────┼───────────┼─────────┤
│ 0 │ generate │ COMPLETED │ 1868828 |
│ 1 │ generate │ RUNNING │ 1868829 |
│ 2 │ generate │ RUNNING │ 1868830 |
│ 3 │ generate │ PENDING │ 1868831 |
│ 4 │ generate │ PENDING │ 1868832 |
│ 5 │ estimate │ PENDING │ 1868833 |
╰────┴──────────┴───────────┴─────────╯
$ dawgz 1 2
╭────┬──────────┬───────────┬────────╮
│ │ Job │ State │ Output │
├────┼──────────┼───────────┼────────┤
│ 2 │ generate │ COMPLETED │ Task 3 │
╰────┴──────────┴───────────┴────────╯
$ dawgz 1 2 --input
╭────┬──────────┬───────────┬─────────────╮
│ │ Job │ State │ Input │
├────┼──────────┼───────────┼─────────────┤
│ 2 │ generate │ COMPLETED │ generate(2) │
╰────┴──────────┴───────────┴─────────────╯
$ dawgz 1 2 --source
╭────┬──────────┬───────────┬────────────────────────────────────────────╮
│ │ Job │ State │ Source │
├────┼──────────┼───────────┼────────────────────────────────────────────┤
│ 2 │ generate │ COMPLETED │ @dawgz.job(cpus=1, ram="2GB", time="5:00") │
│ │ │ │ def generate(i: int) -> None: │
│ │ │ │ print(f"Task {i + 1}") │
│ │ │ │ │
│ │ │ │ x = np.random.random(10000) │
│ │ │ │ y = np.random.random(10000) │
│ │ │ │ within_circle = x**2 + y**2 <= 1 │
│ │ │ │ │
│ │ │ │ np.save(f"pi_{i}.npy", within_circle) │
╰────┴──────────┴───────────┴────────────────────────────────────────────╯
-
dawgz.jobregisters a function as a job, with its settings (name, resources, ...). In the following example,ais a job with the name"A", a time limit of one hour, and running onteslaorquadropartitions.@dawgz.job(name="A", time="01:00:00", partition="tesla,quadro") def a(n: int, x: float): ... a_job = a(3, 0.14)
When the decorated function is called, its context and arguments are captured in a
dawgz.Jobinstance for later execution. Modifying global variables after it has been created will not affect its execution. However, the content of Python modules is not captured, which means that modifying a module after a job has been submitted can affect its execution. If this becomes an issue for you, you can register your module such that it is pickled by value rather than by reference.import cloudpickle import my_module cloudpickle.register_pickle_by_value(my_module) @dawgz.job def a(): my_module.my_function()
To declare that a job must wait for another one to complete, you can use the
dawgz.Job.aftermethod. By default, the job waits for its dependencies to complete with success. The desired completion status can be set to"success"(default),"failure"or"any".@dawgz.job def b(): ... b_job = b().after(a_job, status="failure")
If a job has several dependencies, the
dawgz.Job.waitformethod can be used to declare whether it should wait for"all"(default) or"any"of them to be satisfied before starting.@dawgz.job def c(): ... c_job = c().after(a_job, b_job).waitfor("any")
When running the same workflow multiple times, you may want to skip jobs that have already been executed. You can mark these jobs as completed with the
dawgz.Job.markmethod, and they will be automatically pruned out of the workflow. The completion status can be set to"pending"(default),"success","failure"or"cancelled".@dawgz.job def d(): ... d_job = d().mark("success")
-
dawgz.arraycreates a job array from a group of independent jobs. The primary use case of job arrays is to schedule a large number of small jobs while throttling the number of simultaneously running jobs without saturating the Slurm queue. The returned object is itself adawgz.Jobinstance and supports the methods presented above (after,waitfor, ...).@dawgz.job def e(i): ... e_jobs = [e(i) for i in range(42)] e_array = dawgz.array(*e_jobs, throttle=3) e_array.after(d_job) dawgz.schedule(e_array, backend="slurm")
-
dawgz.scheduleschedules a set of jobs, along their dependencies. Three backends are currently supported:async,dummyandslurm.asyncwaits asynchronously for dependencies to complete before executing each job. The jobs are executed by the current Python interpreter.dummyis equivalent toasync, but instead of executing the jobs, prints their name before and after a short (random) sleep time. The main use ofdummyis debugging.slurmsubmits the jobs to the Slurm workload manager by automatically generatingsbatchsubmission scripts.