Cooking Up Celery with RabbitMQ (Part 2 of 2)

So hi again! As you remember from our previous post, we looked into Celery and RabbitMQ for distributing tasks among workers. Now, on this post, we’ll be seeing how we can organize our configuration parameters and see a few more awesome features of RabbitMQ. 🙂

Configuration File: celeryconfig.py

To start off, let’s first see how we can organize our configuration parameters. So far we have just used the common configuration options such as BROKER_URL and RESULT_BACKEND, and we had them all dumped in our tasks.py before.

In the long run, when we already  have a lot of configuration settings, it might be to difficult to track all of them if they appear in separate files so it is very much recommended that all these parameters be together in one file.

For this purpose, Celery allows configuration options to be loaded from a module.


celery = Celery()
celery.config_from_object('celeryconfig')

In this section, we’ll be seeing how to do that and we’ll also be making a separate file to contain our “client”, the one that invokes the tasks (for which we just used the python console last time :D).

Let’s create first two files that would contain our tasks definition:

tasks.py


from celery.task import task

@task
def square(x):
    product = x * x
    print "[SQUARE] I GOT " + str(product)
    return "[SQUARE] RESULT: " + str(product)

tasks2.py


from celery.task import task

@task
def double(x):
    product = 2 * x
    print "[DOUBLE] I GOT " + str(product)
    return "[DOUBLE] RESULT: " + str(product)

Our first two tasks are simple, the first just returns the square of the input and the second one just doubles it.

Now on to the real action! We now create our configuration file. This configuration file is commonly called celeryconfig.py, but you may name it with any name as you wish. Just make sure that you load the correct module when you declare your Celery application. 🙂


CELERY_IGNORE_RESULT = False

BROKER_URL="amqp://"

CELERY_RESULT_BACKEND = "amqp"

CELERY_IMPORTS=("tasks", "tasks2")

On our configuration file, we just moved our broker and result backend settings. We also declared the modules that we will be including (tasks and tasks2). As we remember before, we declared our celery apps as follows:

Old tasks.py


app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost:5672//')

@app.task
def hello():
    print "Hello from the other sideee ~"

Now with the celeryconfig.py, we now have a centralized repository for all our configurables.

On the other hand, here is our client.py, the one that will be invoking our tasks on the workers:


from celery import Celery

results = []
celery = Celery()
celery.config_from_object('celeryconfig')

for x in range(1,5):
    results.append(celery.send_task("tasks.square", [x]))

for x in range(1,5):
    results.append(celery.send_task("tasks2.double", [x]))

On our client.py, we imported the configurations from the celeryconfig module we made earlier and invoked tasks.square and tasks2.double.

Running it is still the same, we run client.py:

python client.py

Screen Shot 2015-12-07 at 5.35.26 PM.png

And see the output in the worker server. yay!

Remote RabbitMQ Server:

In line with the goal of scalability and separation of concerns, sometimes, our RabbitMQ message broker may be located in another server, away from the workers or the “clients”, the actual tasks invoker. In these cases, our Celery Application should still be able to interact with our remote RabbitMQ server.

Now that we have a separate configuration file, we could start playing around with the configuration, let’s try with BROKER_URL.

* Note: For this example, I fired up another EC2 instance running RabbitMQ. If you’re doing the same thing, make sure to open up port 5672 such that it is accessible to your instance running Celery. Add a new user as well with the right permissions (the default guest user is only allowed access within localhost. This can also be changed by altering rabbitmq.config).

Once your RabbitMQ remote server is ready to accept connections from the outside, let’s now edit our celeryconfig.py for a more customized broker url (we’ll no longer be connecting to localhost for this one :D).


import os
import celery

CELERY_IGNORE_RESULT = False

broker_username = os.environ.get('BROKER_USERNAME')
broker_password = os.environ.get('BROKER_PASSWORD')
broker_host = os.environ.get('BROKER_HOST')
broker_port = os.environ.get('BROKER_PORT')

if broker_username and broker_password and broker_host and broker_port:
    BROKER_URL= "".join(["amqp://",
                         broker_username,
                         ":", 
                         broker_password,
                         "@",
                         broker_host,
                         ":",
                         broker_port])
else:
    BROKER_URL= "amqp://"

print BROKER_URL

CELERY_RESULT_BACKEND = "amqp"

CELERY_IMPORTS=("tasks", "tasks2", "periodic_tasks")

With this celeryconfig.py, we are getting the parts of our broker URL from our environment variables. S

Since we changed our celeryconfig.py, let us restart our workers.


celery worker

Now, upon starting up Celery again,

witharrow.png

we can see that we are now connected to the remote RabbitMQ server we specified in our environment variables.

Also, when we check our rabbitMQ GUI (head over to port 15672 of your RabbitMQ server), we see the Celery’s default queue, celery, already created and running.

rbtwitharrow

Celery Beat

In some of our projects at work, some tasks are needed to be done periodically on schedule. Cron jobs are one of the options as well as the use of sidekiq-scheduler, that allows scheduled tasks to be ran by workers which can reside in different servers (thus separating execution of the scheduled tasks from the main server).

In this aspect of task scheduling, Celery natively also offers a scheduler, Celery Beat. It kicks off tasks depending on the given schedule which are then executed by available workers at that time.

By default, Celery Beat looks for the tasks specified in the CELERYBEAT_SCHEDULE setting in the config file . The official Celery Documentation notes though that it should be ensured that only one scheduler is running at a time else it might lead to duplicate tasks.

To employ the use of celery beat, we first declare the tasks that we want to be ran periodically:


from celery.task import task
import time

@task
def time_ticker():
    print "Time now is " + time.strftime("%b %d %Y %H:%M:%S")
@task
def say_hi(name):
    print "Hi " + name + "!"

 

and add the following to our celeryconfig.py

 


...

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    'say-hi-every-30-seconds': {
        'task': 'periodic_tasks.say_hi',
        'schedule': timedelta(seconds=30),
        'args': ['user']
    },
   
    'tell-time-every-minute': {
        'task': 'periodic_tasks.time_ticker',
        'schedule': crontab(minute='*/1')
    }
}

For this example, we have two tasks:

  1. say-hi-every-30-seconds task: It runs say_hi task in the periodic_tasks module every 30 seconds. It accepts one argument where we passed in ‘user’.
  2. tell-time-every-minute task: It runs time_ticker in the periodic_tasks module. Here, as you notice we gave our schedule in the crontab format.

Easy and flexible, isn’t it? 🙂

To run celery beat, we have two options,

  1. Either we run it alone:
    • celery beat
  2. Or run it as a worker:
    • celery worker --beat

And then TADAAAAAAA, our tasks are ran as scheduled.

Screen Shot 2015-12-07 at 7.35.23 PM

Queues

For all our examples, we did not specify our preferred queue and instead just used Celery’s default queue, celery.

What if in the long run, we just used one queue with all our tasks ranging from the most important and crucial ones up to the ones that could be a little bit delayed. In these cases, the high priority ones cannot go first because they are in a  queue and the worker might be busy doing something else.

In these times, the importance of separate queues is greatly magnified. Say we have two tasks in our online store:

  1. One that expires already sold out items on our site and
  2. One that sends daily marketing emails every 3 AM.

Clearly, task 1 is more time sensitive as overselling may occur if it does not run on time. On the other hand, task 2 is relatively less time sensitive. When we have separate queues, two different workers can focus on doing each of the tasks so that when most of our items have been sold out by 3 am, the sending of marketing emails don’t become bottlenecks to our expiring of stocks.

Also, in case marketing emails get too many, we can just spawn another worker to consume tasks from that specific queue. Scalable isn’t it? 🙂

It is also easy to use multiple queues with Celery. We just declare which tasks do we want routed to a different queue. Also we should not worry if that certain queue hasn’t been declared yet because Celery provides the CELERY_CREATE_MISSING_QUEUES setting (which is enabled by default) which auto creates queues as needed.

*Note: The Celery documentation warns the use of queues as these are only available on brokers implementing AMQP.


CELERY_ROUTES = {'tasks.square': {'queue': 'square_feed'}}

Screen Shot 2015-12-07 at 9.30.03 PM

Run three workers which consumes from each and both of the queues.

We first fire up three workers,

  • The first one, on the upper left, consumes from the default queue, celery
  • The second one, bottom left, consumes from square_feed, our delegated queue for the tasks tasks.square.
  • And the third one, bottom right, which consumes from both celery and square_feed.

Tip: You may use tmux for this example for you to see the message exchange realtime! 🙂

Screen Shot 2015-12-07 at 9.30.26 PM

As the workers are running, we can see from which queues do they consume.

Screen Shot 2015-12-07 at 9.31.00 PM

And as we run our tasks.square task, we see that the individual tasks get distributed. The worker consuming from square_feed, only got [SQUARE] tasks, while for the worker consuming celery, it only got [DOUBLE] tasks. On the other hand, for our industrious worker, the one that consumes fro both, it was still able to get part of both tasks.

Specifying our desired route through CELERY_ROUTES is only one way to add routing specifications to our tasks. We could also do any of the following:

1) Specify the preferred queue during task definition:Screen Shot 2015-12-07 at 10.16.10 PM

To test this method, we first spawn the worker that would be consuming from queue queue1:

celery worker -Q queue1

then in the python console, we can do:

from tasks import square
square.delay(3)

2) Or we can also pass the queue name as an argument to apply async (if the above is set, this overrides it)

Fire up again the worker that would be consuming from our desired queue:

celery worker -Q queue1

And then on our python console:

from tasks import square
a = square.apply_async(args=[100], queue='queue2')

SO there! We’ve seen a way to organize our config files. We were also able to connect to a remote server, schedule tasks with Celery Beat, and assign tasks to separate queues.

Many as they seem, we are still scratching Celery’s surface, Celery has indeed more capabilities that we can ever imagine. These include tasks chaining, chunking, etc which I hope to write in a future post soon. For the mean time, you may check out the official Celery documentation on https://media.readthedocs.org/pdf/celery/latest/celery.pdf. 🙂

Thanks for reading and have a great day!!!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s