-
-
Notifications
You must be signed in to change notification settings - Fork 5k
Description
Checklist
- I have checked the issues list
for similar or identical enhancement to an existing feature. - I have checked the pull requests list
for existing proposed enhancements. - I have checked the commit log
to find out if the if the same enhancement was already implemented in the
master branch. - I have included all related issues and possible duplicate issues in this issue
(If there are none, check this box anyway).
Related Issues and Possible Duplicates
Related Issues
Possible Duplicates
- None
Brief Summary
Building on the work initially done by @ShaheedHaque and @kai3341 in celery-pool-asyncio, I've worked out an absolutely bare-bones implementation of a Python 3.8+ / Celery 5+ compatible worker pool class that enables the use of vanilla-Python async functions (coroutine functions) with Celery's standard app.task decorator. It does not interfere with or preclude the continued use of extant synchronous functions, nor does it force extant codebases to convert current code to async.
Design
The implementation is a heavily stripped back version of @kai3341's existing Celery 4 compatible celery-pool-asyncio package. In does not include automatic monkey-patching and more, and although it does utilize monkey the only thing being patched in that manner is celery.app.trace.build_tracer. The applied patch does not change the existing implementation of the celery.app.trace.build_tracer utility, except to use replace direct calls of a task function or its associated "handler" methods (i.e. on_error) with the new AsyncIOPool's run method instead.
The new AsyncIOPool class is (essentially) a drop-in compatible version of the built-in celery.concurrency.solo.TaskPool class with a few added tricks. Firstly, it tries to enforce itself as a per-thread singleton, and will throw an error if another asyncio event loop is already running in the same thread as the pool. Second, it implements a run method that inspects the supplied callables to see if they're 1) vanilla python functions, 2) a coroutine function, or 3) some already-instantiated awaitable (i.e. a coroutine object, or an asyncio.Future instance). Non-callable, non-awaitable values are simply returned as-is. Vanilla Python callables are wrapped using the to_thread utility provided by asyncio (with the provided *args and **kwargs. Coroutine functions are simply called with the provided *args and **kwargs to get an awaitable coroutine. Anything that's already awaitable is handed off to the thread-bound event loop using loop.run_coroutine_threadsafe. The results of the returned asyncio.Future are then inspected to see if they raised an error (which is propagated). If no error was raised, the .result() method is handed back through AsyncIOPool.run. This allows virtually any mix-and-match of synchronous and coroutine tasks.
Architectural Considerations
As best I can tell, there are no real concerns with the implementation itself. However, I am concerned that it's naive. I know offhand that I didn't bother to check if task revocation still works as expected. Task cancelation is currently unimplemented as well, which may or may not have implications for timeouts. Honestly I have no idea, but would very much appreciate some input and feedback from the Celery team.
Proposed Behavior
See design above.
Proposed UI/UX
See design above.
Diagrams
None included
Alternatives
None proposed