Skip to content

Bare-Bones Python 3.10+ Asyncio Coroutine Support For Celery 6+ #7874

@the-wondersmith

Description

@the-wondersmith

Checklist

  • I have checked the issues list
    for similar or identical enhancement to an existing feature.
  • I have checked the pull requests list
    for existing proposed enhancements.
  • I have checked the commit log
    to find out if the if the same enhancement was already implemented in the
    master branch.
  • I have included all related issues and possible duplicate issues in this issue
    (If there are none, check this box anyway).

Related Issues and Possible Duplicates

Related Issues

Possible Duplicates

  • None

Brief Summary

Building on the work initially done by @ShaheedHaque and @kai3341 in celery-pool-asyncio, I've worked out an absolutely bare-bones implementation of a Python 3.8+ / Celery 5+ compatible worker pool class that enables the use of vanilla-Python async functions (coroutine functions) with Celery's standard app.task decorator. It does not interfere with or preclude the continued use of extant synchronous functions, nor does it force extant codebases to convert current code to async.

Design

The implementation is a heavily stripped back version of @kai3341's existing Celery 4 compatible celery-pool-asyncio package. In does not include automatic monkey-patching and more, and although it does utilize monkey the only thing being patched in that manner is celery.app.trace.build_tracer. The applied patch does not change the existing implementation of the celery.app.trace.build_tracer utility, except to use replace direct calls of a task function or its associated "handler" methods (i.e. on_error) with the new AsyncIOPool's run method instead.

The new AsyncIOPool class is (essentially) a drop-in compatible version of the built-in celery.concurrency.solo.TaskPool class with a few added tricks. Firstly, it tries to enforce itself as a per-thread singleton, and will throw an error if another asyncio event loop is already running in the same thread as the pool. Second, it implements a run method that inspects the supplied callables to see if they're 1) vanilla python functions, 2) a coroutine function, or 3) some already-instantiated awaitable (i.e. a coroutine object, or an asyncio.Future instance). Non-callable, non-awaitable values are simply returned as-is. Vanilla Python callables are wrapped using the to_thread utility provided by asyncio (with the provided *args and **kwargs. Coroutine functions are simply called with the provided *args and **kwargs to get an awaitable coroutine. Anything that's already awaitable is handed off to the thread-bound event loop using loop.run_coroutine_threadsafe. The results of the returned asyncio.Future are then inspected to see if they raised an error (which is propagated). If no error was raised, the .result() method is handed back through AsyncIOPool.run. This allows virtually any mix-and-match of synchronous and coroutine tasks.

Architectural Considerations

As best I can tell, there are no real concerns with the implementation itself. However, I am concerned that it's naive. I know offhand that I didn't bother to check if task revocation still works as expected. Task cancelation is currently unimplemented as well, which may or may not have implications for timeouts. Honestly I have no idea, but would very much appreciate some input and feedback from the Celery team.

Proposed Behavior

See design above.

Proposed UI/UX

See design above.

Diagrams

None included

Alternatives

None proposed

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions