Delayed message delivery in RabbitMQ
UPD June 01, 2015: there is a plugin for this now.
A lot of developers use RabbitMQ message broker. It is quite mature but still lacks for some features that one may need. One of them is delayed message delivery: there is no way to send a message that will be delivered after a specified delay (it’s a limitation of AMQP protocol). Hopefully, there is a hack for this.
Let’s start from dead letters. A message can become “dead” by several reasons, such as rejection or TTL (time to live) expiration. RabbitMQ can deal with such messages by redirecting them to a particular exchange and routing key. We can use this ability to implement delayed delivery. We will create a special queue for holding delayed messages. This queue will not have any subscribers in order for messages to expire. After the expiration, messages will be passed to a destination exchange and routing key, just as planned.
There is a difficulty: the message TTL expiration mechanism works only with the message at the end of a queue, so all messages behind the last message will wait at least as long as the last. However, this can be solved by declaring a separate queue per delay duration.
A destination exchange and routing key are set as parameters of a hold queue. So, it is necessary to create a separate hold queue for each exchange-routing key as well.
Now, you see the problem: a lot of temporary queues (one per duration + exchange/routing key). There is a way to get rid of them. Not only messages have the concept of TTL, but queues also. The TTL can be set for a queue, and the queue will be deleted when it is unused for this time (disregarding the presence of messages in it). This “using” includes message consuming and queue redeclaration. Hold queues have no consumers, but we can redeclare them on each message posting. If we specify the queue TTL greater than the message TTL, the queue will surely outlive all messages put in it.
We will need several queue arguments, which will be set on its declaration:
x-message-ttl
- the number of milliseconds for a message to stay consumed in a queue.x-dead-letter-exchange
andx-dead-letter-routing-key
- an exchange and routing key where message will be send after the expiration.x-expires
- the number of milliseconds for a queue to be unused before it will be deleted.
Talk is cheap. Show me the code. – L. Torvalds
Let's come to the code.#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pika
RABBITMQ_HOST = '<your_host>'
RABBITMQ_USER = '<user>'
RABBITMQ_USER_PASSWORD = '<password>'
def post_delayed(connection, message, delay_duration, exchange, routing_key):
"""Post message to RabbitMQ with a delay
:param connection: RabbitMQ connection.
:param message: a message to send.
:param delay_duration: a duration of the delay.
:param exchange: a exchange where message should be sent after the delay.
:param routing_key: a routing key which should be used resending
the message after the delay.
"""
channel = connection.channel()
hold_queue = "delay.{0}.{1}.{2}".format(
delay_duration, exchange, routing_key)
hold_queue_arguments = {
# Exchange where to send messages after TTL expiration.
"x-dead-letter-exchange": exchange,
# Routing key which use when resending expired messages.
"x-dead-letter-routing-key": routing_key,
# Time in milliseconds
# after that message will expire and be sent to destination.
"x-message-ttl": delay_duration,
# Time after that the queue will be deleted.
"x-expires": delay_duration * 2
}
# It's necessary to redeclare the queue each time
# (to zero its TTL timer).
channel.queue_declare(queue=hold_queue,
durable=True,
exclusive=False,
arguments=hold_queue_arguments)
channel.basic_publish(
exchange='', # Publish to the default exchange.
routing_key=hold_queue, body=message,
# Make the message persistent.
properties=pika.BasicProperties(delivery_mode=2,)
)
# The channel is expendable.
channel.close()
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=RABBITMQ_HOST,
credentials=pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_USER_PASSWORD)))
post_delayed(connection, "oh hi", 2000, 'my_exchange', 'my_queue')
connection.close()
As you can see, quite simple.
Dear readers, do you use RabbitMQ or other message brokers? What are your use cases? Share, please.