Cooking Up Celery with RabbitMQ (Part 1 of 2)

In the post Let’s go RabbitMQ!, we saw a message broker named RabbitMQ. Message brokers provide routing to your message. Think of it like your post office. Your post office accepts your mails and delivers them to your recipients, similar to how message brokers receive, routes and sends  your messages to your intended recipients.

In this post we shall see Celery, a distributed task queue built over a message passing system.

e47e1ec598aeccd6e812bf2ca53bcafe.jpg

Oooh CELERYYYYY! :3

Introduction

Programs, nowadays, process a myriad of tasks, tasks that may range from the smallest and fastest ones to the huge magnanimous ones that take a significant amount of time before they finish. When we issue long slow tasks with only one main flow in the program, waiting for the task to complete  would make it unusable (as we cannot process other commands yet) until the tasks finishes or a result is available.

Asynchronous, or non blocking, processing is a method where we separate the execution of some tasks from the main flow of a program by delegating them to other processes (i.e. workers) to allow execution of the next tasks in the main program.

Celery and RabbitMQ

Celery is an asynchronous task queue that is built on a distributed message passing system. With it, programs can continue the execution of task seamlessly as it just assigns to celery all other tasks that may take some time. As it continues the execution of the next tasks, it can just poll celery at a later time to check if its ready and fetch results if there are any.

RabbitMQ is the default and most recommended message broker for Celery. Another widely used and stable Celery broker is Redis but it may be more susceptible to data loss especially if there is an abrupt power failure. Usage of other brokers is allowed as well: IronMQ, SQLAlchemy, MongoDB, CouchDB but as the Celery documentation notes, they are still experimental (i.e. functional but do not have dedicated maintainers). Use of a database as your message broker may not also be time efficient as reads and writes may take more time.

In this post, we shall be using RabbitMQ as the broker for our Celery application. For installation instructions for RabbitMQ, please refer to the Let’s go RabbitMQ! post! 🙂

Installation Preparation: Digging Up the Soil for Celery

In installing Celery, we’ll be using the Python package manager pip. If it’s not yet installed, let’s go install it by issuing the following command in your terminal:

sudo apt-get install python-pip

For our Celery installation, we’ll also be using virtual environments of Python. Virtual environments allow multiple Python projects (with different and sometimes conflicting dependencies) to coexist on the same computer by creating isolated virtual environments. To install Python virtual environments, we do:

sudo apt-get install python-virtualenv

*Note: In case you encounter an IndexOutOfBounds error when installing with pip on an Ubuntu 14.04, you can fix it with the following line

sudo pip install -U --no-use-wheel pip

Now that our virtual environment is now installed, we now activate it:

  1. Create your project folder
    • cd my_project_folder
  2. Create your virtual environment. venv in this command is the name of the virtual environment that we’ll be creating.
    • virtualenv venv
  3. Once you have created your virtual environment, you’ll notice a venv folder created inside your project folder.This folder contains your project’s requirements and dependencies that it needs to run.
  4. Now we can now activate your virtual environment:
    • source venv/bin/activate
  5. Yay done! In case you want your virtual environment off, you can just deactivate by typing the following:
    • deactivate

Installation Guide: Planting Celery

Now that our preparations are done, we can now install Celery:

pip install celery

Our First Celery Sprouts

Now, to have Celery up and running, we first create a file, tasks.py, to contain our first task:

from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost:5672//')

@app.task
def hello():
print "Hello from the other sideee ~"

In line 1, we first imported Celery and in line 2, we instantiated our Celery application. We passed two arguments to Celery, the first being our module name and the second is the broker keyword argument. Here, we are specifying that our broker is RabbitMQ running on localhost at port 5672, which is also the default option.

Next we declare our task by first prepending an @app.task to let Celery know that it’s a task and can be added later to the queue. Our first task is very simple, it just prints Adele’s famous line: “Hello from the other sideee ~”.

Now we’re ready to run our celery worker server. In the command line issue:
celery worker -A tasks

You will be able to see something like the following:

Screen Shot 2015-12-06 at 6.36.26 PM.png

Here we can see details such as the available queues, in this example it’s only celery, which is also default, as we didn’t specified a new one. And as we remember from our previous RabbitMQ post, Celery uses direct exchange using the routing key “celery”.

Now, let’s fire up python and start sending tasks to our worker! #wapish

Screen Shot 2015-12-06 at 9.53.05 PM

Let’s see what we did here:


from tasks import hello

# Simple method invocation; Will be printing on console
hello()

# Make the call to hello an asynchronous task by assigning it to the worker.
hello.delay()

On our python console, we first imported hello, the task that we’ll be calling. We then called hello, by doing hello() and saw Adele’s famous line printed on the console, not reaching our worker.

We then do hello.delay() and we got as our return value an object with the class AsyncResult and Adele’s famous line printed on the screen of our Celery worker server. #delay() is also wrapper to #apply_async which we can also use to issue worker tasks.

Let’s try adding a return value to our hello() method:


@app.task
def hello():
print "Hello from the other sideee ~"
return 5

Since we redefined our hello method, we need to restart our worker.

Kill first the celery worker:

ps auxww | grep ‘[c]elery worker’ | awk ‘{print $2}’ | xargs kill

*Note: Only for development, scripts are needed for graceful termination on production.

And now restart your worker. Open up again your python console, import tasks and issue the above commands:

Screen Shot 2015-12-06 at 9.51.49 PM.png

When we issued hello(), Adele’s famous line was printed on screen as well as our return value of 5. And then when we assigned our worker to run hello via hello.delay(), Adele’s famous line was printed on the worker server but our 5, .. is nowhere to be found? We try to get the result by issuing the get() method on the AsyncResult, but then we get an error:

...
NotImplementedError: No result backend configured...
...

Why is that so? :O

As we saw, calling .delay on a task returns a AsyncResult object. This can be used to check the state of the tasks and its return value (or traceback if unsuccessful) if there’s any. But, by default this isn’t the behaviour of AsyncResult. To enable this, we have to configure Celery to use a result backend.

For this purpose, similar to the broker option, we have a myriad of choices. We can use Redis, MongoDB, CouchDB, and other database as result backend. We can use RabbitMQ as well! But bear in mind that since RabbitMQ employs the messaging style, the state of your results may not be persistent by default. Expiration of results may also happen by default. But don’t worry, as you can alter these behaviours in the Celery configurations.

We now edit tasks.py and configure our results backend:


import time
from celery import Celery

# Default broker: amqp://guest:**@localhost:5672//
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost:5672//')

@app.task
def hello():
print "Hello from the other sideee ~"

We set RabbitMQ as our result backend. Now, we can restart our servers and check for the return value:

Screen Shot 2015-12-06 at 11.22.29 PM.png

Now with our result backend configured, we were already able to get the return value via #get() and also query our task’s state via #ready().

Now let’s move on to a more time consuming task. Say, we want to get to get the summation of n.


@app.task
def get_summation(n):
print "Computing summation from 1 to " + str(n)
sum = 0
for i in xrange(1, n+1):
sum += i
return sum

Fire up python:

python

get_summation(10)

get_summation(50000000000)

We can see that our 2nd summation command takes longer and causes our command line to hang and be unusable for some moment. Let us now see the power of asynchronous processing by assigning the summation task to our worker:

async1 = get_summation.delay(10)
async2 = get_summation.delay(5000000000)

For async1 and async2, we see that upon entering the command, we can immediately enter another command while our worker processes them in the background.

We can then query the status of our task if it’s already ready by:

async1.ready() async2.ready() Screen Shot 2015-12-06 at 11.50.55 PM

And finally get the result:

async1.get()
async2.get()

In case we queried the result using get() and the result is not yet ready, get() comes into a waiting state which we don’t want. So we can put a timeout to configure how long it will wait for the result (so that it doesn’t wait forever).

async1.get(timeout=1)
async2.get(timeout=5)

In case a result is not available after the timeout period, an exception is raised.

Screen Shot 2015-12-06 at 11.52.25 PM.png

On the other hand, in case our task encountered some error and raised an exception, RabbitMQ just re-raises the exception by default but we can change this behaviour by passing a propagate=false argument to get().

@app.task
def exception_raiser(n):
time.sleep(5)
raise ValueError(‘An exception was raised!!!’)
return “WOKE UP!”

[/python]

Screen Shot 2015-12-06 at 11.57.34 PM.png

 

Scalabilty

What if you have many processes that takes some time and you just dump them to Celery. With only one worker to process these all, it may still take you some time. This is where the power of Celery and RabbitMQ comes in, they are highly scalable! You can spawn multiple worker and celery distributes tasks to them in a Round Robin fashion.

Let’s try it! Start two celery server workers. You may want to use tmux for this one so we can see the worker servers side by side real time. We name our workers with “one.<hostname>” and “two.<hostname>” with the -n tag.

celery worker -A tasks -n one.%h
celery worker -A tasks -n two.%h

You may run them as background task by appending an ampersand (&) to the commands:

celery worker -A tasks -n one.%h &
 celery worker -A tasks -n two.%h &

When we simultaneously ran the following tasks:

async1 = get_summation.delay(10)

async2 = get_summation.delay(5000000000)

They got assigned to worker one and worker two respectively.

And when we issued three more asynchronous task:

async3 = get_summation.delay(5)
async4 = get_summation.delay(8)

async5 = get_summation.delay(9)

Screen Shot 2015-12-07 at 12.30.28 AM.png

Task async5 got executed first than async4 because async4 still waits for the assigned worker (via round robin) to process it (but it’still busy processing async2).
So yay! In this post, we got an introdution to Celery running with RabbitMQ. We saw what result backends are and how to configure one. We also looked into inspecting a task’s state and fetch its results. We also saw how to scale up our Celery application by spawning more workers to consume our tasks.

In the next post, we’ll be looking more closely on configuration and parameter organization. 🙂

Thanks for reading!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s