Skip to content

francois-rozet/dawgz

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

120 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Directed Acyclic Workflow Graph Scheduling

dawgz provides a lightweight and intuitive Python interface to declare, schedule and execute job workflows. It can also delegate execution to resource management backends such as Slurm, which means you can write, configure, and submit your workflows without ever leaving Python.

Installation

The dawgz package is available on PyPi and can be installed with pip.

pip install dawgz

Alternatively, if you need the latest features, you can install it from source.

pip install git+https://github.com/francois-rozet/dawgz

Getting started

In dawgz, a job is a Python function. The dawgz.job decorator allows to declare the resources a job requires and capture its arguments. A job's dependencies within the workflow are declared with the dawgz.Job.after method. After declaration, the dawgz.schedule function takes care of scheduling the jobs and their dependencies, with a selected execution backend. For more information, check out the interface and the examples.

Follows a small example demonstrating how one could use dawgz to calculate π (very roughly) using the Monte Carlo method. We define two jobs, generate and estimate. Five instances of generate are declared that will be executed concurrently. The estimate job has all generate instances as dependencies, meaning that it will only start after they have completed successfuly.

import dawgz
import glob
import numpy as np

@dawgz.job(cpus=1, ram="2GB", time="5:00")
def generate(i: int):
    print(f"Task {i + 1}")

    x = np.random.random(10000)
    y = np.random.random(10000)
    within_circle = x**2 + y**2 <= 1

    np.save(f"pi_{i}.npy", within_circle)

@dawgz.job(cpus=2, ram="4GB", time="15:00")
def estimate():
    files = glob.glob("pi_*.npy")
    stack = np.vstack([np.load(f) for f in files])
    pi_estimate = stack.mean() * 4

    print(f"π ≈ {pi_estimate}")

if __name__ == "__main__":
    generate_jobs = [generate(i) for i in range(5)]
    estimate_job = estimate().after(*generate_jobs)

    dawgz.schedule(estimate_job, backend="async")
$ python examples/pi.py
Task 1
Task 2
Task 3
Task 4
Task 5
π ≈ 3.14936

Alternatively, on a Slurm HPC cluster, changing the backend to "slurm" results in the following job queue.

$ python examples/pi.py
$ squeue -u username
             JOBID PARTITION          NAME     USER ST       TIME  NODES NODELIST(REASON)
           1868833       all 0005_estimate username PD       0:00      1 (Dependency)
           1868832       all 0004_generate username PD       0:00      1 (Resources)
           1868831       all 0003_generate username PD       0:00      1 (Resources)
           1868830       all 0002_generate username PD       0:00      1 (Resources)
           1868828       all 0000_generate username  R       0:02      1 node-x
           1868829       all 0001_generate username  R       0:01      1 node-y

In addition to the Python interface, dawgz provides a simple command-line interface to list the scheduled workflows, the jobs of a workflow or the output of a job.

$ dawgz
╭────┬───────┬──────────────────────────┬─────────────────────┬─────────┬──────┬────────╮
│    │ Name  │ ID                       │ Date                │ Backend │ Jobs │ Errors │
├────┼───────┼──────────────────────────┼─────────────────────┼─────────┼──────┼────────┤
│  0 │ pi.py │ handsome_jicama_bfc5a3e4 │ 2022-02-28 16:37:58 │ async   │    6 │      0 │
│  1 │ pi.py │ crowded_machine_23bdd047 │ 2022-02-28 16:38:33 │ slurm   │    6 │      0 │
╰────┴───────┴──────────────────────────┴─────────────────────┴─────────┴──────┴────────╯
$ dawgz 1
╭────┬──────────┬───────────┬─────────╮
│    │ Job      │ State     │ ID      |
├────┼──────────┼───────────┼─────────┤
│  0 │ generate │ COMPLETED │ 1868828 |
│  1 │ generate │ RUNNING   │ 1868829 |
│  2 │ generate │ RUNNING   │ 1868830 |
│  3 │ generate │ PENDING   │ 1868831 |
│  4 │ generate │ PENDING   │ 1868832 |
│  5 │ estimate │ PENDING   │ 1868833 |
╰────┴──────────┴───────────┴─────────╯
$ dawgz 1 2
╭────┬──────────┬───────────┬────────╮
│    │ Job      │ State     │ Output │
├────┼──────────┼───────────┼────────┤
│  2 │ generate │ COMPLETED │ Task 3 │
╰────┴──────────┴───────────┴────────╯
$ dawgz 1 2 --input
╭────┬──────────┬───────────┬─────────────╮
│    │ Job      │ State     │ Input       │
├────┼──────────┼───────────┼─────────────┤
│  2 │ generate │ COMPLETED │ generate(2) │
╰────┴──────────┴───────────┴─────────────╯
$ dawgz 1 2 --source
╭────┬──────────┬───────────┬────────────────────────────────────────────╮
│    │ Job      │ State     │ Source                                     │
├────┼──────────┼───────────┼────────────────────────────────────────────┤
│  2 │ generate │ COMPLETED │ @dawgz.job(cpus=1, ram="2GB", time="5:00") │
│    │          │           │ def generate(i: int) -> None:              │
│    │          │           │     print(f"Task {i + 1}")                 │
│    │          │           │                                            │
│    │          │           │     x = np.random.random(10000)            │
│    │          │           │     y = np.random.random(10000)            │
│    │          │           │     within_circle = x**2 + y**2 <= 1       │
│    │          │           │                                            │
│    │          │           │     np.save(f"pi_{i}.npy", within_circle)  │
╰────┴──────────┴───────────┴────────────────────────────────────────────╯

Interface

  • dawgz.job registers a function as a job, with its settings (name, resources, ...). In the following example, a is a job with the name "A", a time limit of one hour, and running on tesla or quadro partitions.

    @dawgz.job(name="A", time="01:00:00", partition="tesla,quadro")
    def a(n: int, x: float):
        ...
    a_job = a(3, 0.14)

    When the decorated function is called, its context and arguments are captured in a dawgz.Job instance for later execution. Modifying global variables after it has been created will not affect its execution. However, the content of Python modules is not captured, which means that modifying a module after a job has been submitted can affect its execution. If this becomes an issue for you, you can register your module such that it is pickled by value rather than by reference.

    import cloudpickle
    import my_module
    
    cloudpickle.register_pickle_by_value(my_module)
    
    @dawgz.job
    def a():
        my_module.my_function()

    To declare that a job must wait for another one to complete, you can use the dawgz.Job.after method. By default, the job waits for its dependencies to complete with success. The desired completion status can be set to "success" (default), "failure" or "any".

    @dawgz.job
    def b():
        ...
    b_job = b().after(a_job, status="failure")

    If a job has several dependencies, the dawgz.Job.waitfor method can be used to declare whether it should wait for "all" (default) or "any" of them to be satisfied before starting.

    @dawgz.job
    def c():
        ...
    c_job = c().after(a_job, b_job).waitfor("any")

    When running the same workflow multiple times, you may want to skip jobs that have already been executed. You can mark these jobs as completed with the dawgz.Job.mark method, and they will be automatically pruned out of the workflow. The completion status can be set to "pending" (default), "success", "failure" or "cancelled".

    @dawgz.job
    def d():
        ...
    d_job = d().mark("success")
  • dawgz.array creates a job array from a group of independent jobs. The primary use case of job arrays is to schedule a large number of small jobs while throttling the number of simultaneously running jobs without saturating the Slurm queue. The returned object is itself a dawgz.Job instance and supports the methods presented above (after, waitfor, ...).

    @dawgz.job
    def e(i):
        ...
    
    e_jobs = [e(i) for i in range(42)]
    e_array = dawgz.array(*e_jobs, throttle=3)
    e_array.after(d_job)
    
    dawgz.schedule(e_array, backend="slurm")
  • dawgz.schedule schedules a set of jobs, along their dependencies. Three backends are currently supported: async, dummy and slurm.

    1. async waits asynchronously for dependencies to complete before executing each job. The jobs are executed by the current Python interpreter.
    2. dummy is equivalent to async, but instead of executing the jobs, prints their name before and after a short (random) sleep time. The main use of dummy is debugging.
    3. slurm submits the jobs to the Slurm workload manager by automatically generating sbatch submission scripts.

Contributors

Languages