API Documentation

Publisher Class

class pyrmq.Publisher(exchange_name: str, queue_name: Optional[str] = '', routing_key: Optional[str] = '', exchange_type: Optional[str] = 'direct', **kwargs)

This class uses a BlockingConnection from pika that automatically handles queue declarations and bindings plus retry logic built for its connection and publishing.

__create_connection() BlockingConnection

Create pika’s BlockingConnection from the given connection parameters.

__init__(exchange_name: str, queue_name: Optional[str] = '', routing_key: Optional[str] = '', exchange_type: Optional[str] = 'direct', **kwargs)
Parameters
  • exchange_name – Your exchange name.

  • queue_name – Your queue name.

  • routing_key – Your queue name.

  • exchange_type – Exchange type to declare. Default: "direct"

  • host – Your RabbitMQ host. Checks env var RABBITMQ_HOST. Default: "localhost"

  • port – Your RabbitMQ port. Checks env var RABBITMQ_PORT. Default: 5672

  • username – Your RabbitMQ username. Default: "guest"

  • password – Your RabbitMQ password. Default: "guest"

  • connection_attempts – How many times should PyRMQ try?. Default: 3

  • retry_delay – Seconds between connection retries. Default: 5

  • error_callback – Callback function to be called when connection_attempts is reached.

  • infinite_retry – Tells PyRMQ to keep on retrying to publish while firing error_callback, if any. Default: False

  • exchange_args – Your exchange arguments. Default: None

  • queue_args – Your queue arguments. Default: None

__send_reconnection_error_message(error, retry_count) None

Send error message to your preferred location. :param error: Error that prevented the Publisher from sending the message. :param retry_count: Amount retries the Publisher tried before sending an error message.

__weakref__

list of weak references to the object (if defined)

connect(retry_count=1) BlockingChannel

Create pika’s BlockingConnection and initialize queue bindings. :param retry_count: Amount retries the Publisher tried before sending an error message.

declare_queue(channel) None

Declare and bind a channel to a queue. :param channel: pika Channel

publish(data: dict, priority: Optional[int] = None, message_properties: Optional[dict] = None, attempt: int = 0, retry_count: int = 1) None

Publish data to RabbitMQ. :param data: Data to be published. :param priority: Message priority. Only works if x-max-priority is defined as queue argument. :param message_properties: Message properties. Default: {"delivery_mode": 2} :param attempt: Number of attempts made. :param retry_count: Amount retries the Publisher tried before sending an error message.

Consumer Class

class pyrmq.Consumer(exchange_name: str, queue_name: str, routing_key: str, callback: Callable, exchange_type: Optional[str] = 'direct', **kwargs)

This class uses a BlockingConnection from pika that automatically handles queue declarations and bindings plus retry logic built for its connection and consumption. It starts its own thread upon initialization and runs pika’s start_consuming().

__create_connection() BlockingConnection

Create pika’s BlockingConnection from the given connection parameters.

__init__(exchange_name: str, queue_name: str, routing_key: str, callback: Callable, exchange_type: Optional[str] = 'direct', **kwargs)
Parameters
  • exchange_name – Your exchange name.

  • queue_name – Your queue name.

  • routing_key – Your queue name.

  • callback – Your callback that should handle a consumed message

  • host – Your RabbitMQ host. Default: "localhost"

  • port – Your RabbitMQ port. Default: 5672

  • username – Your RabbitMQ username. Default: "guest"

  • password – Your RabbitMQ password. Default: "guest"

  • connection_attempts – How many times should PyRMQ try? Default: 3

  • is_dlk_retry_enabled – Flag to enable DLK-based retry logic of consumed messages. Default: False

  • retry_delay – Seconds between connection retries. Default: 5

  • retry_interval – Seconds between consumption retries. Default: 900

  • retry_queue_suffix – The suffix that will be appended to the queue_name to act as the name of the retry_queue. Default: retry

  • max_retries – Number of maximum retries for DLK retry logic. Default: 20

  • exchange_args – Your exchange arguments. Default: None

  • queue_args – Your queue arguments. Default: None

  • bound_exchange – The exchange this consumer needs to bind to. This is an object that has two keys, name and type. Default: None

  • auto_ack – Flag whether to ack or nack the consumed message regardless of its outcome. Default: True

  • prefetch_count – How many messages should the consumer retrieve at a time for consumption. Default: 1

__run_error_callback(message: str, error: Exception, error_type: str) None

Log error message :param message: Message to be logged in error_callback :param error: Error encountered in consuming the message :param error_type: Type of error (CONNECT_ERROR or CONSUME_ERROR)

__send_consume_error_message(error: Exception, retry_count: int = 1) None

Send error message to your preferred location. :param error: Error that prevented the Consumer from processing the message. :param retry_count: Amount retries the Consumer tried before sending an error message.

__send_reconnection_error_message(error: Union[AMQPConnectionError, ConnectionResetError, ChannelClosedByBroker], retry_count: int) None

Send error message to your preferred location. :param error: Error that prevented the Consumer from processing the message. :param retry_count: Amount retries the Consumer tried before sending an error message.

__weakref__

list of weak references to the object (if defined)

_consume_message(channel, method, properties, data: dict) None

Wrap the user-provided callback, gracefully handle its errors, and call pika’s basic_ack once successful. :param channel: pika’s Channel this message was received. :param method: pika’s basic Return :param properties: pika’s BasicProperties :param data: Data received in bytes.

_publish_to_retry_queue(data: dict, properties, retry_reason: Exception) None

Publish message to retry queue with the appropriate metadata in the headers.

close() None

Manually close a connection to RabbitMQ. This is useful for debugging and tests.

connect(retry_count=1) None

Create pika’s BlockingConnection and initialize queue bindings. :param retry_count: Amount retries the Consumer tried before sending an error message.

consume(retry_count=1) None

Wrap pika’s basic_consume() and start_consuming() with retry logic.

declare_queue() None

Declare and bind a channel to a queue.