Tag Archives: asynchronous

green celery on blue background

Async Python Functions with Celery

Celery is a great tool for scheduled function execution in python. You can also use it for running functions in the background asynchronously from your main process. However, it does not support python asyncio. This is a big limitation, because async functions are usually much more I/O efficient, and there are many libraries that provide great async support. And parallel data processing with async.gather becomes impossible in celery without async support.

Celery Async Issues

Unfortunately, based on the current Open status of these issues, celery will not support async functions anytime soon.

But luckily there are two projects that provide async celery support.

AIO Celery

This project is an alternative independent asyncio implementation of Celery

aio-celery “does not depend on the celery codebase”. Instead, it provides a new implementation of the Celery Message Protocol that enables asyncio tasks and workers.

It is written completely from scratch as a thin wrapper around aio-pika (which is an asynchronous RabbitMQ python driver) and it has no other dependencies

It is actively developed, and seems like a great celery alternative. But there are some downsides:

  1. “Only RabbitMQ as a message broker” means you cannot use any other broker such as Redis
  2. “Only Redis as a result backend” means you can’t store results in any other database
  3. “Complete feature parity with upstream Celery project is not the goal”, so there may be features from celery you want that are not present in aio-celery

Celery AIO Pool

celery-aio-pool provides a custom worker pool implementation that works with celery 5.3+. Unlike aio-celery, you can keep using your existing celery implementation. All you have to do to get async task support in celery is:

  1. Start your celery worker with this environment variable: CELERY_CUSTOM_WORKER_POOL='celery_aio_pool.pool:AsyncIOPool'
  2. Run the celery worker process with --pool=custom

So your worker command will look like

CELERY_CUSTOM_WORKER_POOL='celery_aio_pool.pool:AsyncIOPool' celery worker --pool=custom

plus whatever other arguments or environment variables you need. Once you have this in place, you can start using async functions as celery tasks.

While celery-aio-pool is not as actively developed, it works, and has the following benefits:

  • Simple to install and configure with Celery >= 5.3
  • Works with any celery support message broker or result backend
  • Works with your existing celery setup without requiring any other changes
burned matches on white background in minimalist style

Python Async Gather in Batches

Python’s asyncio.gather function is great for I/O bound parallel processing. There’s a simple utility function I like to use that I call gather_in_batches:

async def gather_in_batches(tasks, batch_size=100, return_exceptions=False):
    for i in range(0, len(tasks), batch_size):
        batch = tasks[i:i+batch_size]
        for result in await asyncio.gather(*batch, return_exceptions=return_exceptions):
            yield result

The way you use it is

  1. Generate a list of tasks
  2. Gather your results

Here’s some simple sample code to demonstrate:

tasks = [process_async(obj) for obj in objects]
return [result async for result in gather_in_batches(tasks)]

objects could be all sorts of things:

  • records from a database
  • urls to scrape
  • filenames to read

And process_async is an async function that would just do whatever processing you need to do on that object. Assuming it is mostly I/O bound, then this is very simple and effective method to process data in parallel, without getting into threads, multi-processing, greenlets, or any other method.

You’ll need to experiment to figure out what the optimal batch_size is for your use case. And unless you don’t care about errors, you should set return_exceptions=True, then check if isinstance(result, Exception) to do proper error handling.