How to use PyRMQ

Publishing

Instantiate the Publisher class and plug in your application specific settings. PyRMQ already works out of the box with RabbitMQ’s default initialization settings.

from pyrmq import Publisher
publisher = Publisher(
    exchange_name="exchange_name",
    queue_name="queue_name",
    routing_key="routing_key",
)
publisher.publish({"pyrmq": "My first message"})

This publishes a message that uses a BlockingConnection on its own thread with default settings and and provides a handler for its retries.

Retries

PyRMQ’s Publisher retries happen on two levels: connecting and publishing.

Connecting

PyRMQ instantiates a BlockingConnection when connecting. If this fails, it will retry for 2 more times by default with a delay of 5 seconds, a backoff base of 2 seconds, and a backoff constant of 5 seconds. All these settings are configurable via the Publisher class.

Publishing

PyRMQ calls pika’s basic_publish when publishing. If this fails, it will retry for 2 more times by default with a delay of 5 seconds, a backoff base of 2 seconds, and a backoff constant of 5 seconds. All these settings are configurable via the Publisher class.

Max retries reached

When PyRMQ has tried one too many times, it will call your specified callback.

Publish message with priorities

To enable prioritization of messages, instantiate your queue with the queue argument x-max-priority. It takes an integer that sets the number of possible priority values with a higher number commanding more priority. Then, simply publish your message with the priority argument specified. Any number higher than the set max priority is floored or considered the same. Read more about message priorities here

from pyrmq import Publisher
publisher = Publisher(
    exchange_name="exchange_name",
    queue_name="queue_name",
    routing_key="routing_key",
    queue_args={"x-max-priority": 3}
)
publisher.publish({"pyrmq": "My first message"}, priority=1)

Warning

Adding arguments on an existing queue is not possible. If you wish to add queue arguments, you will need to either delete the existing queue then recreate the queue with arguments or simply make a new queue with the arguments.

Consuming

Instantiate the Consumer class and plug in your application specific settings. PyRMQ already works out of the box with RabbitMQ’s default initialization settings.

from pyrmq import Consumer


def callback(data):
    print(f"Received {data}!")

consumer = Consumer(
    exchange_name="exchange_name",
    queue_name="queue_name",
    routing_key="routing_key",
)

consumer.start()

Once the Consumer class is instantiated, just run start() to start its own thread that targets pika’s start_consuming method on its own thread with default settings and and provides a handler for its retries. Consumption calls basic_ack with delivery_tag set to what the message’s method’s was.

Retries

PyRMQ’s Consumer retries happen on two levels: connecting and consuming.

Connecting

PyRMQ instantiates a BlockingConnection when connecting. If this fails, it will retry for 2 more times by default with a delay of 5 seconds, a backoff base of 2 seconds, and a backoff constant of 5 seconds. All these settings are configurable via the Consumer class.

DLX-DLK Consumption Retry Logic

PyRMQ calls pika’s start_consuming when Consumer is instantiated. If your consumption callback throws an exception, PyRMQ uses dead letter exchanges and queues to republish your messages to your original queue once it has expired. PyRMQ already creates this “retry” queue for you with the default naming convention of appending your original queue with .retry. This is simply enabled by setting the is_dlk_retry_enabled flag on the Consumer class to True.

from pyrmq import Consumer

def callback(data):
    print(f"Received {data}!")
    raise Exception

consumer = Consumer(
    exchange_name="exchange_name",
    queue_name="queue_name",
    routing_key="routing_key",
    callback=callback,
    is_dlk_retry_enabled=True,
)
consumer.start()

This will start a loop of passing your message between the original queue and the retry queue until it reaches the default number of max_retries.

Max retries reached

When PyRMQ has tried one too many times, it will call your specified callback.