PyRMQ

https://img.shields.io/github/workflow/status/first-digital-finance/pyrmq/Test%20across%20Python%20versions?style=for-the-badge https://img.shields.io/pypi/v/pyrmq?style=for-the-badge https://readthedocs.org/projects/pyrmq/badge/?version=latest&style=for-the-badge https://img.shields.io/pypi/pyversions/pyrmq?style=for-the-badge https://img.shields.io/badge/license-MIT-blue.svg?longCache=true&style=for-the-badge https://img.shields.io/badge/code%20style-black-000000.svg?longCache=true&style=for-the-badge https://img.shields.io/badge/%20imports-isort-%231674b1?style=for-the-badge&labelColor=ef8336)](https://pycqa.github.io/isort/

Python with RabbitMQ—simplified so you won’t have to.

Features

Stop worrying about boilerplating and implementing retry logic on your queues. PyRMQ already does it for you.

  • Use out-of-the-box Consumer and Publisher classes created from pika for your projects and tests.

  • Custom DLX-DLK-based retry logic for message consumption.

  • Message priorities

  • Works with Python 3.

  • Production ready

Quickstart

PyRMQ is available at PyPI.

$ pip install pyrmq

Just instantiate the feature you want with their respective settings. PyRMQ already works out of the box with RabbitMQ’s default initialization settings.

from pyrmq import Publisher
publisher = Publisher(
    exchange_name="exchange_name",
    queue_name="queue_name",
    routing_key="routing_key",
)
publisher.publish({"pyrmq": "My first message"})

Publish message with priorities

To enable prioritization of messages, instantiate your queue with the queue argument x-max-priority. It takes an integer that sets the number of possible priority values with a higher number commanding more priority. Then, simply publish your message with the priority argument specified. Any number higher than the set max priority is floored or considered the same. Read more about message priorities here

from pyrmq import Publisher
publisher = Publisher(
    exchange_name="exchange_name",
    queue_name="queue_name",
    routing_key="routing_key",
    queue_args={"x-max-priority": 3}
)
publisher.publish({"pyrmq": "My first message"}, priority=1)

Warning

Adding arguments on an existing queue is not possible. If you wish to add queue arguments, you will need to either delete the existing queue then recreate the queue with arguments or simply make a new queue with the arguments.

Consuming

Instantiating a Consumer automatically starts it in its own thread making it non-blocking by default. When run after the code from before, you should be able to receive the published data.

from pyrmq import Consumer


def callback(data):
    print(f"Received {data}!")

consumer = Consumer(
    exchange_name="exchange_name",
    queue_name="queue_name",
    routing_key="routing_key",
)

consumer.start()

DLX-DLK Retry Logic

What if you wanted to retry a failure on a consumed message? PyRMQ offers a custom solution that keeps your message in queues while retrying in an exponential backoff fashion.

This approach uses dead letter exchanges and queues to republish a message to your original queue once it has expired. PyRMQ creates this “retry” queue for you with the default naming convention of appending your original queue with .retry.

from pyrmq import Consumer

def callback(data):
    print(f"Received {data}!")
    raise Exception

consumer = Consumer(
    exchange_name="exchange_name",
    queue_name="queue_name",
    routing_key="routing_key",
    callback=callback,
    is_dlk_retry_enabled=True,
)
consumer.start()

This will start a loop of passing your message between the original queue and the retry queue until it reaches the default number of max_retries.

User Guide

PyRMQ Installation

There are multiple ways to install PyRMQ as long as multiple versions to choose from.

Stable Version

PyRMQ is available at PyPI.

$ pip install pyrmq

Development Version

Since PyRMQ is continuously used in a growing number of internal microservices all working with RabbitMQ, you can see or participate in its active development in its GitHub repository.

There are two ways to work or collaborate with its development version.

Git Checkout

Clone the code from GitHub and run it in a virtualenv.

$ git clone git@github.com:first-digital-finance/pyrmq.git
$ virtualenv venv --distribute
$ . venv/bin/activate
$ python setup.py install

This will setup PyRMQ and its dependencies on your local machine. Just fetch/pull code from the master branch to keep your copy up to date.

PyPI
$ mkdir pyrmq
$ cd pyrmq
$ virtualenv venv --distribute
$ . venv/bin/activate
$ pip install git+git://github.com/first-digital-finance/pyrmq.git

How to use PyRMQ

Publishing

Instantiate the Publisher class and plug in your application specific settings. PyRMQ already works out of the box with RabbitMQ’s default initialization settings.

from pyrmq import Publisher
publisher = Publisher(
    exchange_name="exchange_name",
    queue_name="queue_name",
    routing_key="routing_key",
)
publisher.publish({"pyrmq": "My first message"})

This publishes a message that uses a BlockingConnection on its own thread with default settings and and provides a handler for its retries.

Retries

PyRMQ’s Publisher retries happen on two levels: connecting and publishing.

Connecting

PyRMQ instantiates a BlockingConnection when connecting. If this fails, it will retry for 2 more times by default with a delay of 5 seconds, a backoff base of 2 seconds, and a backoff constant of 5 seconds. All these settings are configurable via the Publisher class.

Publishing

PyRMQ calls pika’s basic_publish when publishing. If this fails, it will retry for 2 more times by default with a delay of 5 seconds, a backoff base of 2 seconds, and a backoff constant of 5 seconds. All these settings are configurable via the Publisher class.

Max retries reached

When PyRMQ has tried one too many times, it will call your specified callback.

Publish message with priorities

To enable prioritization of messages, instantiate your queue with the queue argument x-max-priority. It takes an integer that sets the number of possible priority values with a higher number commanding more priority. Then, simply publish your message with the priority argument specified. Any number higher than the set max priority is floored or considered the same. Read more about message priorities here

from pyrmq import Publisher
publisher = Publisher(
    exchange_name="exchange_name",
    queue_name="queue_name",
    routing_key="routing_key",
    queue_args={"x-max-priority": 3}
)
publisher.publish({"pyrmq": "My first message"}, priority=1)

Warning

Adding arguments on an existing queue is not possible. If you wish to add queue arguments, you will need to either delete the existing queue then recreate the queue with arguments or simply make a new queue with the arguments.

Consuming

Instantiate the Consumer class and plug in your application specific settings. PyRMQ already works out of the box with RabbitMQ’s default initialization settings.

from pyrmq import Consumer


def callback(data):
    print(f"Received {data}!")

consumer = Consumer(
    exchange_name="exchange_name",
    queue_name="queue_name",
    routing_key="routing_key",
)

consumer.start()

Once the Consumer class is instantiated, just run start() to start its own thread that targets pika’s start_consuming method on its own thread with default settings and and provides a handler for its retries. Consumption calls basic_ack with delivery_tag set to what the message’s method’s was.

Retries

PyRMQ’s Consumer retries happen on two levels: connecting and consuming.

Connecting

PyRMQ instantiates a BlockingConnection when connecting. If this fails, it will retry for 2 more times by default with a delay of 5 seconds, a backoff base of 2 seconds, and a backoff constant of 5 seconds. All these settings are configurable via the Consumer class.

DLX-DLK Consumption Retry Logic

PyRMQ calls pika’s start_consuming when Consumer is instantiated. If your consumption callback throws an exception, PyRMQ uses dead letter exchanges and queues to republish your messages to your original queue once it has expired. PyRMQ already creates this “retry” queue for you with the default naming convention of appending your original queue with .retry. This is simply enabled by setting the is_dlk_retry_enabled flag on the Consumer class to True.

from pyrmq import Consumer

def callback(data):
    print(f"Received {data}!")
    raise Exception

consumer = Consumer(
    exchange_name="exchange_name",
    queue_name="queue_name",
    routing_key="routing_key",
    callback=callback,
    is_dlk_retry_enabled=True,
)
consumer.start()

This will start a loop of passing your message between the original queue and the retry queue until it reaches the default number of max_retries.

Max retries reached

When PyRMQ has tried one too many times, it will call your specified callback.

API Documentation

Publisher Class

class pyrmq.Publisher(exchange_name: str, queue_name: str, routing_key: str, **kwargs)

This class offers a BlockingConnection from pika that automatically handles queue declares and bindings plus retry logic built for its connection and publishing.

__create_connection() → pika.adapters.blocking_connection.BlockingConnection

Creates pika’s BlockingConnection from the given connection parameters.

__init__(exchange_name: str, queue_name: str, routing_key: str, **kwargs)
Parameters
  • exchange_name – Your exchange name.

  • queue_name – Your queue name.

  • routing_key – Your queue name.

  • host – Your RabbitMQ host. Checks env var RABBITMQ_HOST. Default: "localhost"

  • port – Your RabbitMQ port. Checks env var RABBITMQ_PORT. Default: 5672

  • username – Your RabbitMQ username. Default: "guest"

  • password – Your RabbitMQ password. Default: "guest"

  • connection_attempts – How many times should PyRMQ try?. Default: 3

  • retry_delay – Seconds between retries.. Default: 5

  • error_callback – Callback function to be called when connection_attempts is reached.

  • infinite_retry – Tells PyRMQ to keep on retrying to publish while firing error_callback, if any. Default: False

  • queue_args – Your queue arguments. Default: None

__send_reconnection_error_message(retry_count, error) → None

Send error message to your preferred location. :param retry_count: Amount retries the Publisher tried before sending an error message. :param error: Error that prevented the Publisher from sending the message.

__weakref__

list of weak references to the object (if defined)

connect(retry_count=1) -> (<class 'pika.adapters.blocking_connection.BlockingConnection'>, <class 'pika.adapters.blocking_connection.BlockingChannel'>)

Creates pika’s BlockingConnection and initializes queue bindings. :param retry_count: Amount retries the Publisher tried before sending an error message.

declare_queue(channel) → None

Declare and a bind a channel to a queue. :param channel: pika Channel

publish(data: dict, priority: Optional[int] = None, message_properties: Optional[dict] = None, attempt: int = 0, retry_count: int = 1) → None

Publishes data to RabbitMQ. :param data: Data to be published. :param priority: Message priority. Only works if x-max-priority is defined as queue argument. :param message_properties: Message properties. Default: {"delivery_mode": 2} :param attempt: Number of attempts made. :param retry_count: Amount retries the Publisher tried before sending an error message.

Consumer Class

class pyrmq.Consumer(exchange_name: str, queue_name: str, routing_key: str, callback: Callable, **kwargs)

This class uses a BlockingConnection from pika that automatically handles queue declares and bindings plus retry logic built for its connection and consumption. It starts its own thread upon initialization and runs pika’s start_consuming().

__create_connection() → pika.adapters.blocking_connection.BlockingConnection

Creates a pika BlockingConnection from the given connection parameters.

__init__(exchange_name: str, queue_name: str, routing_key: str, callback: Callable, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

__send_consume_error_message(retry_count: int, error: Exception) → None

Send error message to your preferred location. :param retry_count: Amount retries the Consumer tried before sending an error message. :param error: Error that prevented the Consumer from processing the callback.

__send_reconnection_error_message(retry_count: int, error: Union[pika.exceptions.AMQPConnectionError, ConnectionResetError, pika.exceptions.ChannelClosedByBroker]) → None

Send error message to your preferred location. :param retry_count: Amount retries the Publisher tried before sending an error message. :param error: Error that prevented the Publisher from sending the message.

__weakref__

list of weak references to the object (if defined)

_compute_expiration(retry_count: int) → int

Computes message expiration time from the retry queue in seconds.

_consume_message(channel, method, properties, data: dict) → None

Wraps the user provided callback and gracefully handles its errors and calling pika’s basic_ack once successful. :param channel: pika’s Channel this message was received. :param method: pika’s basic Return :param properties: pika’s BasicProperties :param data: Data received in bytes.

_publish_to_retry_queue(data: dict, properties, retry_reason: Exception = None) → None

Publishes message to retry queue with the appropriate metadata in the headers.

close() → None

Manually closes a connection to RabbitMQ. Useful for debugging and tests.

connect(retry_count=1) → None

Creates a BlockingConnection from pika and initializes queue bindings. :param retry_count: Amount retries the Publisher tried before sending an error message.

consume(retry_count=1) → None

Wraps pika’s basic_consume() and start_consuming() with retry logic.

Testing PyRMQ

We’re not gonna lie. Testing RabbitMQ, mocks or not, is infuriating. Much harder than a traditional integration testing with a database. That said, we hope that you could help us expand on what we have started should you feel our current tests aren’t enough.

RabbitMQ

Since PyRMQ strives to be as complete with testing as it can be, it has several integration tests that need a running RabbitMQ to pass. Currently, PyRMQ is tested against rabbitmq:3.8.

Install and run RabbitMQ locally
$ # Depending on your OS
$ # Ubuntu
$ sudo apt install rabbitmq
$ # Arch Linux
$ sudo pacman -S rabbitmq

Using tox

Install pip install tox and run:

$ tox
$ tox -e py38  # If this is what you have installed or don't want to bother testing for other versions