API Documentation

Publisher Class

class pyrmq.Publisher(exchange_name: str, queue_name: Optional[str] = '', routing_key: Optional[str] = '', exchange_type: Optional[str] = 'direct', **kwargs)

This class offers a BlockingConnection from pika that automatically handles queue declares and bindings plus retry logic built for its connection and publishing.

__create_connection() → pika.adapters.blocking_connection.BlockingConnection

Creates pika’s BlockingConnection from the given connection parameters.

__init__(exchange_name: str, queue_name: Optional[str] = '', routing_key: Optional[str] = '', exchange_type: Optional[str] = 'direct', **kwargs)
Parameters
  • exchange_name – Your exchange name.

  • queue_name – Your queue name.

  • routing_key – Your queue name.

  • exchange_type – Exchange type to declare. Default: "direct"

  • host – Your RabbitMQ host. Checks env var RABBITMQ_HOST. Default: "localhost"

  • port – Your RabbitMQ port. Checks env var RABBITMQ_PORT. Default: 5672

  • username – Your RabbitMQ username. Default: "guest"

  • password – Your RabbitMQ password. Default: "guest"

  • connection_attempts – How many times should PyRMQ try?. Default: 3

  • retry_delay – Seconds between retries. Default: 5

  • error_callback – Callback function to be called when connection_attempts is reached.

  • infinite_retry – Tells PyRMQ to keep on retrying to publish while firing error_callback, if any. Default: False

  • exchange_args – Your exchange arguments. Default: None

  • queue_args – Your queue arguments. Default: None

__send_reconnection_error_message(error, retry_count) → None

Send error message to your preferred location. :param error: Error that prevented the Publisher from sending the message. :param retry_count: Amount retries the Publisher tried before sending an error message.

__weakref__

list of weak references to the object (if defined)

connect(retry_count=1) → pika.adapters.blocking_connection.BlockingChannel

Creates pika’s BlockingConnection and initializes queue bindings. :param retry_count: Amount retries the Publisher tried before sending an error message.

declare_queue(channel) → None

Declare and a bind a channel to a queue. :param channel: pika Channel

publish(data: dict, priority: Optional[int] = None, message_properties: Optional[dict] = None, attempt: int = 0, retry_count: int = 1) → None

Publishes data to RabbitMQ. :param data: Data to be published. :param priority: Message priority. Only works if x-max-priority is defined as queue argument. :param message_properties: Message properties. Default: {"delivery_mode": 2} :param attempt: Number of attempts made. :param retry_count: Amount retries the Publisher tried before sending an error message.

Consumer Class

class pyrmq.Consumer(exchange_name: str, queue_name: str, routing_key: str, callback: Callable, exchange_type: Optional[str] = 'direct', **kwargs)

This class uses a BlockingConnection from pika that automatically handles queue declares and bindings plus retry logic built for its connection and consumption. It starts its own thread upon initialization and runs pika’s start_consuming().

__create_connection() → pika.adapters.blocking_connection.BlockingConnection

Creates a pika BlockingConnection from the given connection parameters.

__init__(exchange_name: str, queue_name: str, routing_key: str, callback: Callable, exchange_type: Optional[str] = 'direct', **kwargs)
Parameters
  • exchange_name – Your exchange name.

  • queue_name – Your queue name.

  • routing_key – Your queue name.

  • callback – Your callback that should handle a consumed message

  • host – Your RabbitMQ host. Default: "localhost"

  • port – Your RabbitMQ port. Default: 5672

  • username – Your RabbitMQ username. Default: "guest"

  • password – Your RabbitMQ password. Default: "guest"

  • connection_attempts – How many times should PyRMQ try? Default: 3

  • is_dlk_retry_enabled – Flag to enable DLK-based retry logic of consumed messages. Default: False

  • retry_delay – Seconds between retries. Default: 5

  • retry_backoff_base – Exponential backoff base in seconds. Default: 2

  • retry_queue_suffix – The suffix that will be appended to the queue_name to act as the name of the retry_queue. Default: retry

  • max_retries – Number of maximum retries for DLK retry logic. Default: 20

  • exchange_args – Your exchange arguments. Default: None

  • queue_args – Your queue arguments. Default: None

  • bound_exchange – The exchange this consumer needs to bind to. This is an object that has two keys, name and type. Default: None

  • auto_ack – Flag whether to ack or nack the consumed message regardless of its outcome. Default: True

  • prefetch_count – How many messages should the consumer retrieve at a time for consumption. Default: 1

__run_error_callback(message: str, error: Exception, error_type: str) → None

Log error message :param message: Message to be logged in error_callback :param error: Error encountered in consuming the message :param error_type: Type of error (CONNECT_ERROR or CONSUME_ERROR)

__send_consume_error_message(error: Exception, retry_count: int = 1) → None

Send error message to your preferred location. :param error: Error that prevented the Consumer from processing the message. :param retry_count: Amount retries the Consumer tried before sending an error message.

__send_reconnection_error_message(error: Union[pika.exceptions.AMQPConnectionError, ConnectionResetError, pika.exceptions.ChannelClosedByBroker], retry_count: int) → None

Send error message to your preferred location. :param error: Error that prevented the Consumer from processing the message. :param retry_count: Amount retries the Consumer tried before sending an error message.

__weakref__

list of weak references to the object (if defined)

_compute_expiration(retry_count: int) → int

Computes message expiration time from the retry queue in seconds.

_consume_message(channel, method, properties, data: dict) → None

Wraps the user provided callback and gracefully handles its errors and calling pika’s basic_ack once successful. :param channel: pika’s Channel this message was received. :param method: pika’s basic Return :param properties: pika’s BasicProperties :param data: Data received in bytes.

_publish_to_retry_queue(data: dict, properties, retry_reason: Exception) → None

Publishes message to retry queue with the appropriate metadata in the headers.

close() → None

Manually closes a connection to RabbitMQ. Useful for debugging and tests.

connect(retry_count=1) → None

Creates a BlockingConnection from pika and initializes queue bindings. :param retry_count: Amount retries the Publisher tried before sending an error message.

consume(retry_count=1) → None

Wraps pika’s basic_consume() and start_consuming() with retry logic.

declare_queue() → None

Declare and a bind a channel to a queue.