API Documentation

Publisher Class

class pyrmq.Publisher(exchange_name: str, queue_name: str, routing_key: str, **kwargs)

This class offers a BlockingConnection from pika that automatically handles queue declares and bindings plus retry logic built for its connection and publishing.

__create_connection() → pika.adapters.blocking_connection.BlockingConnection

Creates pika’s BlockingConnection from the given connection parameters.

__init__(exchange_name: str, queue_name: str, routing_key: str, **kwargs)
Parameters
  • exchange_name – Your exchange name.

  • queue_name – Your queue name.

  • routing_key – Your queue name.

  • host – Your RabbitMQ host. Checks env var RABBITMQ_HOST. Default: "localhost"

  • port – Your RabbitMQ port. Checks env var RABBITMQ_PORT. Default: 5672

  • username – Your RabbitMQ username. Default: "guest"

  • password – Your RabbitMQ password. Default: "guest"

  • connection_attempts – How many times should PyRMQ try?. Default: 3

  • retry_delay – Seconds between retries.. Default: 5

  • error_callback – Callback function to be called when connection_attempts is reached.

  • infinite_retry – Tells PyRMQ to keep on retrying to publish while firing error_callback, if any. Default: False

  • queue_args – Your queue arguments. Default None

__send_reconnection_error_message(retry_count, error) → None

Send error message to your preferred location. :param retry_count: Amount retries the Publisher tried before sending an error message. :param error: Error that prevented the Publisher from sending the message.

__weakref__

list of weak references to the object (if defined)

connect(retry_count=1) -> (<class 'pika.adapters.blocking_connection.BlockingConnection'>, <class 'pika.adapters.blocking_connection.BlockingChannel'>)

Creates pika’s BlockingConnection and initializes queue bindings. :param retry_count: Amount retries the Publisher tried before sending an error message.

declare_queue(channel) → None

Declare and a bind a channel to a queue. :param channel: pika Channel

publish(data: dict, priority: Optional[int] = None, attempt=0, retry_count=1) → None

Publishes data to RabbitMQ. :param data: Data to be published. :param priority: Message priority. Only works if x-max-priority is defined as queue argument. :param attempt: Number of attempts made. :param retry_count: Amount retries the Publisher tried before sending an error message.

Consumer Class

class pyrmq.Consumer(exchange_name: str, queue_name: str, routing_key: str, callback: Callable, **kwargs)

This class uses a BlockingConnection from pika that automatically handles queue declares and bindings plus retry logic built for its connection and consumption. It starts its own thread upon initialization and runs pika’s start_consuming().

__create_connection() → pika.adapters.blocking_connection.BlockingConnection

Creates a pika BlockingConnection from the given connection parameters.

__init__(exchange_name: str, queue_name: str, routing_key: str, callback: Callable, **kwargs)
Parameters
  • exchange_name – Your exchange name.

  • queue_name – Your queue name.

  • routing_key – Your queue name.

  • callback – Your callback that should handle a consumed message

  • host – Your RabbitMQ host. Default: "localhost"

  • port – Your RabbitMQ port. Default: 5672

  • username – Your RabbitMQ username. Default: "guest"

  • password – Your RabbitMQ password. Default: "guest"

  • connection_attempts – How many times should PyRMQ try? Default: 3

  • retry_delay – Seconds between retries.. Default: 5

  • retry_backoff_base – Exponential backoff base in seconds. Default: 2

  • retry_backoff_constant_secs – Exponential backoff constant in seconds. Default: 5

__send_reconnection_error_message(retry_count, error) → None

Send error message to your preferred location. :param retry_count: Amount retries the Publisher tried before sending an error message. :param error: Error that prevented the Publisher from sending the message.

__weakref__

list of weak references to the object (if defined)

_consume_message(channel, method, properties, data) → None

Wraps the user provided callback and gracefully handles its errors and calling pika’s basic_ack once successful. :param channel: pika’s Channel this message was received. :param method: pika’s basic Return :param properties: pika’s BasicProperties :param data: Data received in bytes.

close() → None

Manually closes a connection to RabbitMQ. Useful for debugging and tests.

connect(retry_count=1) → None

Creates a BlockingConnection from pika and initializes queue bindings. :param retry_count: Amount retries the Publisher tried before sending an error message.

consume(retry_count=1) → None

Wraps pika’s basic_consume() and start_consuming() with retry logic.