Let’s go RabbitMQ!

Woohoo! Our CS 255 class for this sem is almost over :’D One more paper and it’s done. So gladdd that I took this class; definitely learned a lot! I also never thought that I’ll be able to read that much technical papers in one semester LOL.

Anyhoo, in this blog post, we’ll be talking about a special kind of rabbit.

992972d7b8918707a98ad60fe2622d1d.jpg

Ain’t I fluffeeeeh?

Cuteness overloaaaad!!! But hehe nope, not that kind of rabbit, but today we’ll be talking about a message broker with the name RabbitMQ.

rabbitmq_logo_strap

So first, what’s a message broker?

Technically, a message broker is like a router for your messages. With the rules you set, messages are routed to their corresponding recipients. It may seem simple but they prove to be super useful.

Practical applications include freeing up your computer of the load by passing the message to a broker (from where it would be consumed by consumers) so that your machine can continue processing other requests.

RabbitMQ’s Brief History

RabbitMQ’s development started from the need for a standard messaging protocol that anyone can adopt. In the early 1980’s to late 1990’s, messaging systems were proprietary and one of the most common problem was vendor lock-in which makes it difficult for companies that are already using one software to incorporate another in their operations.

The ongoing development in the early 2000s of the Advanced Message Queuing Protocol (AMQP) which aims to create a standardized protocol  plus the vision, dedication, and expertise of Alexis Richardson and Matthias Radestock gave birth to RabbitMQ, an Erlang based message broker.

Client-Server vs Consumer-Producer

In our day to day work, we are very much familiar with the client server model where a client sends a requests to the server and waits for its response before it continues to do other tasks. With message brokers, the mindset of having a producer and consumer (or a publisher – subscriber) may be more appropriate.

In a consumer – producer model, the action may be non blocking. Once the producer has produced a message, it doesn’t wait for a consumer to consume the message. It moves on to the next task in line (i.e. serving other requests).

As Alvaro Videla and Jason Williams described in their book RabbitMQ in Action, it’s like the Coffee Bean model (from the Coffee Bean and Tea Leaf coffee shops). Order takers produce orders to be processed while the baristas process the orders/requests. Customers don’t need to wait in the order taker’s counter for their finished orders but instead they move to a waiting counter so the next customer could be served by the order taker.

By this model as well, we can see that it scales. In case there are already so much order produced by the order taker that customers are now complaining that their orders take too long, the owner could just hire additional baristas to process orders (but maintaining the same number of order takers). This way too, the order taker is not a bottleneck in taking orders and completing purchases (thus creating revenue for the company).

Now that we have a brief background of message brokers, the producer-consumer model, and RabbitMQ, let’s now install it in our systems.

Installation

These installation scripts were ran on an Ubuntu Server 14.04:

  1. We first update our system resources using the following code:
    sudo apt-get update
    sudo apt-get -y upgrade
    
  2. We then add the repositiory to the list of our known sources:
    sudo su -c "echo 'deb http://www.rabbitmq.com/debian testing main' >> /etc/apt/sources.list"
    
  3. We then now fetch and store RabbitMQ’s public key. We first need to install curl first in case it’s not yet present in our system.
    sudo apt-get install curl
    curl http://www.rabbitmq.com/rabbitmq-signing-key-public.asc | sudo apt-key add
    
  4. And finally we install the RabbitMQ server:
    sudo apt-get install rabbitmq-server
    

So now that we have RabbitMQ installed, let us double check that it’s up and running. Easy! We just issue the following to the command line:


sudo rabbitmqctl status

In a case of a successful installation, the server is automatically ran and you will be able to see something like the following:

Screen Shot 2015-12-01 at 1.48.22 AM.png

The running applications include rabbit, the RabbitMQ application and mnesia the database RabbitMQ uses to store exchange and queue metadata (what are exchanges and queues? We’ll see them in a bit 😉 ), and several other Erlang and system related applications.

Graphical User Interface

Oh yes, RabbitMQ has it’s own Graphical User Interface to make navigation and administration easier. We just need to enable the GUI plugin:


sudo rabbitmq-plugins enable rabbitmq_management

And then we need to restart the RabbitMQ server to reflect our changes:


sudo service rabbitmq-server restart

Now grab your server’s IP and go to the following:

server-ip-address:15672

Note: If you’re using AWS instances, make sure you have added the Custom TCP Rule that allows access to port 15672.

You should be able to see something like this:

 

Screen Shot 2015-12-01 at 2.21.09 AM

  • Another way you can test if your RabbitMQ GUI is working is to use port forwarding (this way you won’t need to change your security group’s rules to enable access to port 15672)

ssh  -L 15672:localhost:15672 username@remote-server-ip  -f -N

This code just routes connections from your port 15672 (first 15672 in the script) to the port 15672 (second 15672) of the localhost of your remote server. Replace remote-server-ip with your remote server’s IP address.

User Access and Permissions

Before we start with the basic configurations for RabbitMQ, let’s talk about one detail on the defaults that come with it.

On installation, RabbitMQ comes with default settings just like many other software. The default settings include a default user “guest”, with a default passwrod “guest” as well. This login-password pair comes with every installation and is widely known to be RabbitMQ’s default. Thus if left unchanged, it could be used by anyone to maliciously gain access to your setup (and have administrator privileges) even if by default, the “guest”-“guest” credentials can only access the broker on localhost.

You can check this by logging in as “guest” with password “guest” on our RabbitMQ GUI. On the GUI that we accessed via remote-server-ip:15672, we cannot successfully login as guest but with the one where we used port forwarding to the server’s localhost 15672, logging in as guest is successful.

So in order to solve this and secure this aspect, we could either change the password for guest and allow remote access or delete the default user and create a different user altogether with administrative privileges.

We will discuss here how to create a new user and grant administrative privileges:


sudo rabbitmqctl add_user adelen password

sudo rabbitmqctl set_user_tags adelen administrator

sudo rabbitmqctl set_permissions -p / adelen ".*" ".*" ".*"

Simple, isn’t it? 🙂 The first line just adds user adelen with the password “password” (change this in your own setups :D). Then we set the user adelen as administrator and grant configure (first .*), write (next .*), and read (last .*) permissions on all channels.

Exchanges, Queues, and Messages

In this section, we will see the basic configurations for exchanges, queues, and messages. Messages are the actual data sent through exchanges from which queues are bound through binding rules.

Exchanges:


channel.exchange_declare(
exchange="direct-exchange-auto-delete-durable",
type="direct",
passive=False,
durable=True,
auto_delete=True)

In creating an exchange, here are some of the parameters that we can set:

  1. durable – Persists through server restarts as long as there are bound queues.
  2. auto-delete – If there is no more listening queues on an auto-delete exchange, it is automatically deleted.
  3. passive – When set to true, it can be used to just check the existence of an exchange. If the exchange exists, it returns the exchange else it does not create the exchange but instead raises an error.

Queues:


channel.queue_declare(queue="queue-auto-delete-durable",
passive=False,
durable=True,
auto_delete=True)

  1. durable – When set to true, a durable queue will survive server stop ands tarts even if there are no messages enqueued.
  2. auto-delete – When there are no longer consumers on the queue and no more messages that are left unacknowledged, an auto-delete queue is automatically deleted.
  3. passive – Same with passive exchanges, when set to true, in case the queue does not exist, the queue is not created and an error is raised. Else if it exists, the queue serves as the return value.

Messages:


channel.basic_publish(body=msg,
exchange="direct-exchange-auto-delete-durable",
properties=pika.BasicProperties(delivery_mode = 2),
routing_key="hello")

  1. Persistent (delivery_mode = 2) – Survives through server reboots as long as the message was sent through a durable exchange and placed in a durable queue as well. When only the message is persistent but the queue where it is placed is not durable, the message still gets deleted.

Types of Exchanges

We have said before that queues are bound to exchanges. But how do exchanges know to whom it will send what? To answer this question, let’s see a few different kinds of exchanges:

  1. Direct: If the routing key of the message matches that of the queue binding, message is delivered to corresponding queue.
    channel.exchange_declare(exchange="company-direct-exchange",
    type="direct")
    
  2. Fanout: All the queues bound to a fanout exchange will receive all the messages that are sent through the exchange.

    </span><span class="n">channel</span><span class="o">.</span><span class="n">exchange_declare</span><span class="p">(</span><span class="n">exchange</span><span class="o">=</span><span class="s">'logs-fanout'</span><span class="p">, </span><span class="nb">type</span><span class="o">=</span><span class="s">'fanout'</span><span class="p">)
    

  3. Topic: Message could be sent to multiple queues depending if their routing key matches. The wildcard character * can be used to match multiple arbitrary strings.

Real Life Example:

Say we have company X that has three main departments: the marketing department, stocks department, and the software development department.

For organized forwarding of issues, they have all agreed to have their incoming messages keyword-coded such that:

  1. Messages’ title should compose of: <department name>.<severity level>. So for example,
    • For info messages to the marketing department,
      marketing.info
    • For critical  messages to the stock department
      stock.critical
  2. All critical messages should be sent to the development team as well for urgent action.
  3. Departments can reach the development team by setting the title as URGENT

Sample Messaging Scenario Between the Departments:

Screen Shot 2015-12-01 at 10.20.47 PM.png

To do this, we create three consumers representing our three departments (marketing, stocks, development). We then declare the respective exchanges that we’ll be needing. One topic exchange and one direct exchange for the URGENT messages.


import pika

print "** Stock Department **"

credentials = pika.PlainCredentials("guest", "guest")
conn_params = pika.ConnectionParameters("localhost",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)
channel = conn_broker.channel()

channel.exchange_declare(exchange="company-topic-exchange",
type="topic")

channel.queue_declare(queue="stock-manager-queue")
channel.queue_bind(queue='stock-manager-queue',
exchange='company-topic-exchange',
routing_key='stock.*')

def msg_consumer(channel, method, header, body):
channel.basic_ack(delivery_tag=method.delivery_tag)
if body == "quit":
channel.basic_cancel(consumer_tag="hello-stock")
channel.stop_consuming()
else:
print body
return

channel.basic_consume(msg_consumer,
queue="stock-manager-queue",
consumer_tag="hello-stock")
channel.start_consuming()

 

Let’s take this consumer code step by step:

  1. We first got hold of our channel:
    credentials = pika.PlainCredentials("guest", "guest")
    conn_params = pika.ConnectionParameters("localhost",
    credentials = credentials)conn_broker = pika.BlockingConnection(conn_params)
    channel = conn_broker.channel()
    
  2. We then declared our topic exchange, queue, and bindings. Redeclaring queues and exchanges is alright as long as they all have the same parameters. RabbitMQ would just return the existing one. But in case the parameters are not the same, RabbitMQ would produce an error informing you that another one already exists.
    channel.exchange_declare(exchange="company-topic-exchange",
    type="topic")
    channel.queue_declare(queue="stock-manager-queue")channel.queue_bind(queue='stock-manager-queue',
    exchange='company-topic-exchange',
    routing_key='stock.*')
    
  3. We then let our consumer consume from his own assigned queue (i.e. stock-manager-queue)
    channel.basic_consume(msg_consumer,
    queue="stock-manager-queue",
    consumer_tag="hello-stock")
    channel.start_consuming()
    

Now we can look at our producer code:


import pika, sys

print "** Message Producer **"

credentials = pika.PlainCredentials("guest", "guest")
conn_params = pika.ConnectionParameters("localhost",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)
channel = conn_broker.channel()
# Declare Exchanges
channel.exchange_declare(exchange="company-fanout-exchange",
type="fanout")

channel.exchange_declare(exchange="company-direct-exchange",
type="direct")

channel.exchange_declare(exchange="company-topic-exchange",
type="topic")

# Declare Corresponding Queues
channel.queue_declare(queue="marketing-dept-queue")

channel.queue_declare(queue="stock-manager-queue")

channel.queue_declare(queue="devops-queue")
msg = sys.argv[1]
msg_props= pika.BasicProperties(delivery_mode = 2)
msg_props.content_type = "text/plain"

if msg == "URGENT":
channel.basic_publish(body=msg,
exchange="company-direct-exchange",
properties=msg_props,
routing_key=msg)
else:
channel.basic_publish(body=msg,
exchange="company-topic-exchange",
properties=msg_props,
routing_key=msg)

It’s almost the same, we declare the exchanges and queues (no harm if these already exists). We then publish the message given as the command line argument. When message is URGENT, we use direct exchange to send the message directly to the Development Department.

What if one day, we want to have an announcement channel, where management can broadcast messages to all the departments. For this purpose, we can use the fanout exchange.

Screen Shot 2015-12-01 at 10.49.41 PM.png

To accomplish this, we just need to add a few lines:

  1. In our producer, we just added the one that sends to the fanout exchange
    if "ALL:" in msg:
    channel.basic_publish(body=msg,
    exchange="company-fanout-exchange",
    properties=msg_props,
    routing_key=msg)
    ...
    
  2. In each of our consumers, we just bound their respective queues to the fanout exchange of the company:
    channel.queue_bind(queue="marketing-dept-queue",
    exchange='company-fanout-exchange')

 

So that’s it! A working sample message exchange scenario to help us get a feel of RabbitMQ. 🙂 In this blog post, we have seen a brief overview of RabbitMQ, the different configurations for its exchanges, messages, and queues and we also saw a simulation of how RabbitMQ can aid in scenarios like the one we saw above. 🙂

 

Full code for this producer-consumer scenario can be found here.

 

Thank you so much and ’til the next post! :’D

 

 

 

One thought on “Let’s go RabbitMQ!

  1. Pingback: Cooking Up Celery with RabbitMQ | blog()

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s