In the post Let’s go RabbitMQ!, we saw a message broker named RabbitMQ. Message brokers provide routing to your message. Think of it like your post office. Your post office accepts your mails and delivers them to your recipients, similar to how message brokers receive, routes and sends your messages to your intended recipients.
In this post we shall see Celery, a distributed task queue built over a message passing system.

Oooh CELERYYYYY! :3
Introduction
Programs, nowadays, process a myriad of tasks, tasks that may range from the smallest and fastest ones to the huge magnanimous ones that take a significant amount of time before they finish. When we issue long slow tasks with only one main flow in the program, waiting for the task to complete would make it unusable (as we cannot process other commands yet) until the tasks finishes or a result is available.
Asynchronous, or non blocking, processing is a method where we separate the execution of some tasks from the main flow of a program by delegating them to other processes (i.e. workers) to allow execution of the next tasks in the main program.
Celery and RabbitMQ
Celery is an asynchronous task queue that is built on a distributed message passing system. With it, programs can continue the execution of task seamlessly as it just assigns to celery all other tasks that may take some time. As it continues the execution of the next tasks, it can just poll celery at a later time to check if its ready and fetch results if there are any.
RabbitMQ is the default and most recommended message broker for Celery. Another widely used and stable Celery broker is Redis but it may be more susceptible to data loss especially if there is an abrupt power failure. Usage of other brokers is allowed as well: IronMQ, SQLAlchemy, MongoDB, CouchDB but as the Celery documentation notes, they are still experimental (i.e. functional but do not have dedicated maintainers). Use of a database as your message broker may not also be time efficient as reads and writes may take more time.
In this post, we shall be using RabbitMQ as the broker for our Celery application. For installation instructions for RabbitMQ, please refer to the Let’s go RabbitMQ! post! 🙂
Installation Preparation: Digging Up the Soil for Celery
In installing Celery, we’ll be using the Python package manager pip. If it’s not yet installed, let’s go install it by issuing the following command in your terminal:
sudo apt-get install python-pip
For our Celery installation, we’ll also be using virtual environments of Python. Virtual environments allow multiple Python projects (with different and sometimes conflicting dependencies) to coexist on the same computer by creating isolated virtual environments. To install Python virtual environments, we do:
sudo apt-get install python-virtualenv
*Note: In case you encounter an IndexOutOfBounds error when installing with pip on an Ubuntu 14.04, you can fix it with the following line
sudo pip install -U --no-use-wheel pip
Now that our virtual environment is now installed, we now activate it:
- Create your project folder
-
cd my_project_folder
-
- Create your virtual environment. venv in this command is the name of the virtual environment that we’ll be creating.
-
virtualenv venv
-
- Once you have created your virtual environment, you’ll notice a venv folder created inside your project folder.This folder contains your project’s requirements and dependencies that it needs to run.
- Now we can now activate your virtual environment:
-
source venv/bin/activate
-
- Yay done! In case you want your virtual environment off, you can just deactivate by typing the following:
-
deactivate
-
Installation Guide: Planting Celery
Now that our preparations are done, we can now install Celery:
pip install celery
Our First Celery Sprouts
Now, to have Celery up and running, we first create a file, tasks.py, to contain our first task:
from celery import Celery app = Celery('tasks', broker='amqp://guest@localhost:5672//') @app.task def hello(): print "Hello from the other sideee ~"
In line 1, we first imported Celery and in line 2, we instantiated our Celery application. We passed two arguments to Celery, the first being our module name and the second is the broker keyword argument. Here, we are specifying that our broker is RabbitMQ running on localhost at port 5672, which is also the default option.
Next we declare our task by first prepending an @app.task to let Celery know that it’s a task and can be added later to the queue. Our first task is very simple, it just prints Adele’s famous line: “Hello from the other sideee ~”.
Now we’re ready to run our celery worker server. In the command line issue:
celery worker -A tasks
You will be able to see something like the following:
Here we can see details such as the available queues, in this example it’s only celery, which is also default, as we didn’t specified a new one. And as we remember from our previous RabbitMQ post, Celery uses direct exchange using the routing key “celery”.
Now, let’s fire up python and start sending tasks to our worker! #wapish
Let’s see what we did here:
from tasks import hello # Simple method invocation; Will be printing on console hello() # Make the call to hello an asynchronous task by assigning it to the worker. hello.delay()
On our python console, we first imported hello, the task that we’ll be calling. We then called hello, by doing hello() and saw Adele’s famous line printed on the console, not reaching our worker.
We then do hello.delay() and we got as our return value an object with the class AsyncResult and Adele’s famous line printed on the screen of our Celery worker server. #delay() is also wrapper to #apply_async which we can also use to issue worker tasks.
Let’s try adding a return value to our hello() method:
@app.task def hello(): print "Hello from the other sideee ~" return 5
Since we redefined our hello method, we need to restart our worker.
Kill first the celery worker:
ps auxww | grep ‘[c]elery worker’ | awk ‘{print $2}’ | xargs kill
*Note: Only for development, scripts are needed for graceful termination on production.
And now restart your worker. Open up again your python console, import tasks and issue the above commands:
When we issued hello(), Adele’s famous line was printed on screen as well as our return value of 5. And then when we assigned our worker to run hello via hello.delay(), Adele’s famous line was printed on the worker server but our 5, .. is nowhere to be found? We try to get the result by issuing the get() method on the AsyncResult, but then we get an error:
... NotImplementedError: No result backend configured... ...
Why is that so? :O
As we saw, calling .delay on a task returns a AsyncResult object. This can be used to check the state of the tasks and its return value (or traceback if unsuccessful) if there’s any. But, by default this isn’t the behaviour of AsyncResult. To enable this, we have to configure Celery to use a result backend.
For this purpose, similar to the broker option, we have a myriad of choices. We can use Redis, MongoDB, CouchDB, and other database as result backend. We can use RabbitMQ as well! But bear in mind that since RabbitMQ employs the messaging style, the state of your results may not be persistent by default. Expiration of results may also happen by default. But don’t worry, as you can alter these behaviours in the Celery configurations.
We now edit tasks.py and configure our results backend:
import time from celery import Celery # Default broker: amqp://guest:**@localhost:5672// app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost:5672//') @app.task def hello(): print "Hello from the other sideee ~"
We set RabbitMQ as our result backend. Now, we can restart our servers and check for the return value:
Now with our result backend configured, we were already able to get the return value via #get() and also query our task’s state via #ready().
Now let’s move on to a more time consuming task. Say, we want to get to get the summation of n.
@app.task def get_summation(n): print "Computing summation from 1 to " + str(n) sum = 0 for i in xrange(1, n+1): sum += i return sum
Fire up python:
python
get_summation(10)
get_summation(50000000000)
We can see that our 2nd summation command takes longer and causes our command line to hang and be unusable for some moment. Let us now see the power of asynchronous processing by assigning the summation task to our worker:
async1 = get_summation.delay(10)
async2 = get_summation.delay(5000000000)
For async1 and async2, we see that upon entering the command, we can immediately enter another command while our worker processes them in the background.
We can then query the status of our task if it’s already ready by:
async1.ready() async2.ready()
And finally get the result:
async1.get()
async2.get()
In case we queried the result using get() and the result is not yet ready, get() comes into a waiting state which we don’t want. So we can put a timeout to configure how long it will wait for the result (so that it doesn’t wait forever).
async1.get(timeout=1)
async2.get(timeout=5)
In case a result is not available after the timeout period, an exception is raised.
On the other hand, in case our task encountered some error and raised an exception, RabbitMQ just re-raises the exception by default but we can change this behaviour by passing a propagate=false argument to get().
@app.task
def exception_raiser(n):
time.sleep(5)
raise ValueError(‘An exception was raised!!!’)
return “WOKE UP!”
[/python]
Scalabilty
What if you have many processes that takes some time and you just dump them to Celery. With only one worker to process these all, it may still take you some time. This is where the power of Celery and RabbitMQ comes in, they are highly scalable! You can spawn multiple worker and celery distributes tasks to them in a Round Robin fashion.
Let’s try it! Start two celery server workers. You may want to use tmux for this one so we can see the worker servers side by side real time. We name our workers with “one.<hostname>” and “two.<hostname>” with the -n tag.
celery worker -A tasks -n one.%h celery worker -A tasks -n two.%h
You may run them as background task by appending an ampersand (&) to the commands:
celery worker -A tasks -n one.%h & celery worker -A tasks -n two.%h &
When we simultaneously ran the following tasks:
async1 = get_summation.delay(10)
async2 = get_summation.delay(5000000000)
They got assigned to worker one and worker two respectively.
And when we issued three more asynchronous task:
async3 = get_summation.delay(5)
async4 = get_summation.delay(8)
async5 = get_summation.delay(9)
Task async5 got executed first than async4 because async4 still waits for the assigned worker (via round robin) to process it (but it’still busy processing async2).
So yay! In this post, we got an introdution to Celery running with RabbitMQ. We saw what result backends are and how to configure one. We also looked into inspecting a task’s state and fetch its results. We also saw how to scale up our Celery application by spawning more workers to consume our tasks.
In the next post, we’ll be looking more closely on configuration and parameter organization. 🙂
Thanks for reading!