Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 45 additions & 11 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,31 @@ jobs:
docs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: Set up Python
uses: actions/setup-python@v1
uses: actions/setup-python@v2
- uses: actions/checkout@v2
- name: Install binary dependencies
run: sudo apt-get install -y python3-enchant graphviz
- name: Install Python dependencies
run: |
python -m pip install --upgrade pip setuptools
pip install sphinxcontrib-spelling
python setup.py develop
pip install -e '.[docs]'
- name: Build documentation
run: python setup.py build_sphinx -W -b spelling

tests:
dist:
runs-on: ubuntu-latest
steps:
- uses: actions/setup-python@v2
- run: python -m pip install --upgrade pip setuptools wheel twine readme-renderer
- uses: actions/checkout@v2
- run: python setup.py sdist bdist_wheel
- run: python -m twine check dist/*

pytest:
needs:
- dist
runs-on: ubuntu-latest
strategy:
matrix:
Expand All @@ -31,26 +42,49 @@ jobs:
django-version:
- 2.2.*
- 3.0.*
extras:
- test
- test,reversion
steps:
- uses: actions/checkout@v1
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v1
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- uses: actions/checkout@v2
- name: Install binary dependencies
run: |
sudo apt-get update
sudo apt-get install -y graphviz redis-server
- name: Install Python dependencies
run: |
python -m pip install --upgrade pip setuptools wheel codecov
python -m pip install -e .[${{ matrix.extras }}]
python -m pip install -e .[dramatiq]
python -m pip install django==${{ matrix.django-version }}
- name: Run tests
run: PATH=$PATH:$(pwd)/bin py.test
run: python setup.py test
- run: codecov
env:
CODECOV_TOKEN: ${{secrets.CODECOV_TOKEN}}

extras:
runs-on: ubuntu-latest
strategy:
matrix:
extras:
- dramatiq
- celery
- dramatiq,reversion
steps:
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
- uses: actions/checkout@v2
- name: Install binary dependencies
run: |
sudo apt-get update
sudo apt-get install -y graphviz redis-server
- name: Install Python dependencies
run: |
python -m pip install --upgrade pip setuptools wheel codecov
python -m pip install -e .[${{ matrix.extras }}]
- name: Run tests
run: python setup.py test
- run: codecov
env:
CODECOV_TOKEN: ${{secrets.CODECOV_TOKEN}}
2 changes: 1 addition & 1 deletion docs/tutorial/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ the PyPi package…

.. code:: shell

pip install joeflow[reversion]
python3 -m pip install "joeflow[reversion,dramatiq,celery]"

…and add ``joeflow`` to the ``INSTALLED_APP`` setting. You will also need to have
celery setup.
Expand Down
3 changes: 2 additions & 1 deletion docs/urls.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ pattern consisting of the process name (lowercase) and task name, e.g.:

.. code-block:: python

>>> reverse(process_name:task_name, args=[task.pk])
>>> from django.urls import reverse
>>> reverse("process_name:task_name", args=[task.pk])
'/url/to/process/task/1'

All task URLs need the `.Task` primary key as an argument. There are some
Expand Down
Empty file added joeflow/celery/__init__.py
Empty file.
17 changes: 16 additions & 1 deletion joeflow/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,22 @@ class JoeflowAppConfig(AppConf):
process state.
"""

JOEFLOW_CELERY_QUEUE_NAME = 'celery'
JOEFLOW_TASK_RUNNER = 'joeflow.runner.dramatiq.task_runner'
"""
Task runner is used to execute machine tasks.

JoeFlow supports two different asynchronous task runners – Dramatiq_ and Celery_.

To use either of the task runners change this setting to:

* ``joeflow.runner.dramatiq.task_runner``
* ``joeflow.runner.celery.task_runner``

.. _Dramatiq: https://dramatiq.io/
.. _Celery: http://www.celeryproject.org/
"""

JOEFLOW_CELERY_QUEUE_NAME = 'joeflow'
"""
Queue name in which all machine tasks will be queued.
"""
3 changes: 2 additions & 1 deletion joeflow/locking.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def _lock(process_pk):
The lock is not blocking to free CPU time for other tasks on celery
workers.
"""
connection = redis.StrictRedis.from_url(settings.JOEFLOW_REDIS_LOCK_URL)
connection = redis.Redis.from_url(settings.JOEFLOW_REDIS_LOCK_URL)
__lock = connection.lock('joeflow_process_{}'.format(process_pk), timeout=settings.JOEFLOW_REDIS_LOCK_TIMEOUT)
successful = __lock.acquire(blocking=False)
try:
Expand All @@ -33,6 +33,7 @@ def _lock(process_pk):
finally:
if successful:
__lock.release()
connection.close()


# little hack for testing purposes
Expand Down
11 changes: 6 additions & 5 deletions joeflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
from django.db.models.functions import Now
from django.urls import NoReverseMatch, path, reverse
from django.utils import timezone
from django.utils.module_loading import import_string
from django.utils.safestring import SafeString
from django.utils.translation import ugettext_lazy as t
from django.views.generic.edit import BaseCreateView

from joeflow import celery, tasks, utils, views

from . import tasks, utils, views
from .conf import settings

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -527,11 +527,12 @@ def enqueue(self, countdown=None, eta=None):
'exception',
'stacktrace',
])
transaction.on_commit(lambda: celery.task_wrapper.apply_async(
args=(self.pk, self._process_id),
task_runner = import_string(settings.JOEFLOW_TASK_RUNNER)
transaction.on_commit(lambda: task_runner(
task_pk=self.pk,
process_pk=self._process_id,
countdown=countdown,
eta=eta,
queue=settings.JOEFLOW_CELERY_QUEUE_NAME,
))

@transaction.atomic()
Expand Down
Empty file added joeflow/runner/__init__.py
Empty file.
29 changes: 16 additions & 13 deletions joeflow/celery.py → joeflow/runner/celery.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,24 @@
import logging
import random

from celery import shared_task
from django.apps import apps
from django.db import transaction

from joeflow.conf import settings
from joeflow.contrib.reversion import with_reversion

from . import locking
from .. import locking, utils

logger = logging.getLogger('joeflow')
logger = logging.getLogger(__name__)


def jitter():
"""Return a random number between 0 and 1."""
return random.randrange(2) # nosec


def backoff(retries):
"""Return an exponentially growing number limited to 600 plus a random jitter."""
return min(600, 2 ** retries) + jitter()
__all__ = ['task_runner']


@shared_task(bind=True, ignore_results=True, max_retries=None)
def task_wrapper(self, task_pk, process_pk):
def _celery_task_runner(self, task_pk, process_pk):
with locking.lock(process_pk) as lock_result:
countdown = backoff(self.request.retries)
countdown = utils.backoff(self.request.retries)
if not lock_result:
logger.info("Process is locked, retrying in %s seconds", countdown)
self.retry(countdown=countdown)
Expand Down Expand Up @@ -55,3 +48,13 @@ def task_wrapper(self, task_pk, process_pk):
logger.info("Task completed successful, starting next tasks: %s", result)
task.start_next_tasks(next_nodes=result)
task.finish()


def task_runner(*, task_pk, process_pk, countdown, eta):
"""Schedule asynchronous machine task using celery."""
_celery_task_runner.apply_async(
args=(task_pk, process_pk),
countdown=countdown,
eta=eta,
queue=settings.JOEFLOW_CELERY_QUEUE_NAME,
)
61 changes: 61 additions & 0 deletions joeflow/runner/dramatiq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import logging

import dramatiq
from django.apps import apps
from django.db import OperationalError, transaction

from .. import locking, utils
from ..conf import settings
from ..contrib.reversion import with_reversion

logger = logging.getLogger(__name__)


def task_runner(*, task_pk, process_pk, countdown=None, eta=None, retries=0):
"""Schedule asynchronous machine task using celery."""
_dramatiq_task_runner.send_with_options(
args=(task_pk, process_pk),
delay=countdown,
retries=retries,
)


@dramatiq.actor(queue_name=settings.JOEFLOW_CELERY_QUEUE_NAME, retry_when=lambda a, b: isinstance(b, OperationalError))
def _dramatiq_task_runner(task_pk, process_pk, retries=0):
with locking.lock(process_pk) as lock_result:
countdown = utils.backoff(retries)
if not lock_result:
logger.info("Process is locked, retrying in %s seconds", countdown)
task_runner(task_pk=task_pk, process_pk=process_pk, countdown=countdown, retries=retries + 1)
return
Task = apps.get_model('joeflow', 'Task')
task = Task.objects.get(pk=task_pk, completed=None)
process = task.process

try:
logger.info("Executing %r", task)
node = getattr(type(process), task.name)
with_task = getattr(node, 'with_task', False)
kwargs = {}
if with_task:
kwargs['task'] = task
with with_reversion(task):
result = node(process, **kwargs)
except: # NoQA
task.fail()
logger.exception("Execution of %r failed", task)
else:
if result is False:
logger.info("Task returned False, retrying in %s seconds", countdown)
transaction.on_commit(lambda: task_runner(
task_pk=task_pk,
process_pk=process_pk,
countdown=countdown,
retries=retries + 1,
))
return
elif result is True:
result = None
logger.info("Task completed successful, starting next tasks: %s", result)
task.start_next_tasks(next_nodes=result)
task.finish()
7 changes: 7 additions & 0 deletions joeflow/tasks/machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
'Wait',
)

try:
# We need to import the Dramatiq task,to ensure
# it is deteced by django-dramatiq rundramatiq command.
from joeflow.runner.dramatiq import _dramatiq_task_runner # NoQA
except ImportError:
pass


class Start:
"""
Expand Down
11 changes: 11 additions & 0 deletions joeflow/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import secrets
import types


Expand All @@ -10,3 +11,13 @@ def get_processes() -> types.GeneratorType:
for model in apps.get_models():
if issubclass(model, Process) and model is not Process:
yield model


def jitter():
"""Return a random number between 0 and 1."""
return secrets.randbelow(5)


def backoff(retries):
"""Return an exponentially growing number limited to 600 plus a random jitter."""
return min(600, 2 ** retries) + jitter()
2 changes: 1 addition & 1 deletion joeflow/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from django.contrib.auth.mixins import PermissionRequiredMixin
from django.db import transaction
from django.shortcuts import get_object_or_404
from django.utils.translation import ugettext_lazy as t
from django.utils.translation import gettext_lazy as t
from django.views import generic

from . import models
Expand Down
Loading