API Documentation¶
Publisher Class¶
-
class
pyrmq.
Publisher
(exchange_name: str, queue_name: Optional[str] = '', routing_key: Optional[str] = '', exchange_type: Optional[str] = 'direct', **kwargs)¶ This class offers a
BlockingConnection
from pika that automatically handles queue declares and bindings plus retry logic built for its connection and publishing.-
__create_connection
() → pika.adapters.blocking_connection.BlockingConnection¶ Creates pika’s
BlockingConnection
from the given connection parameters.
-
__init__
(exchange_name: str, queue_name: Optional[str] = '', routing_key: Optional[str] = '', exchange_type: Optional[str] = 'direct', **kwargs)¶ - Parameters
exchange_name – Your exchange name.
queue_name – Your queue name.
routing_key – Your queue name.
exchange_type – Exchange type to declare. Default:
"direct"
host – Your RabbitMQ host. Checks env var
RABBITMQ_HOST
. Default:"localhost"
port – Your RabbitMQ port. Checks env var
RABBITMQ_PORT
. Default:5672
username – Your RabbitMQ username. Default:
"guest"
password – Your RabbitMQ password. Default:
"guest"
connection_attempts – How many times should PyRMQ try?. Default:
3
retry_delay – Seconds between retries. Default:
5
error_callback – Callback function to be called when connection_attempts is reached.
infinite_retry – Tells PyRMQ to keep on retrying to publish while firing error_callback, if any. Default:
False
queue_args – Your queue arguments. Default:
None
-
__send_reconnection_error_message
(retry_count, error) → None¶ Send error message to your preferred location. :param retry_count: Amount retries the Publisher tried before sending an error message. :param error: Error that prevented the Publisher from sending the message.
-
__weakref__
¶ list of weak references to the object (if defined)
-
connect
(retry_count=1) → pika.adapters.blocking_connection.BlockingChannel¶ Creates pika’s
BlockingConnection
and initializes queue bindings. :param retry_count: Amount retries the Publisher tried before sending an error message.
-
declare_queue
(channel) → None¶ Declare and a bind a channel to a queue. :param channel: pika Channel
-
publish
(data: dict, priority: Optional[int] = None, message_properties: Optional[dict] = None, attempt: int = 0, retry_count: int = 1) → None¶ Publishes data to RabbitMQ. :param data: Data to be published. :param priority: Message priority. Only works if
x-max-priority
is defined as queue argument. :param message_properties: Message properties. Default:{"delivery_mode": 2}
:param attempt: Number of attempts made. :param retry_count: Amount retries the Publisher tried before sending an error message.
-
Consumer Class¶
-
class
pyrmq.
Consumer
(exchange_name: str, queue_name: str, routing_key: str, callback: Callable, exchange_type: Optional[str] = 'direct', **kwargs)¶ This class uses a
BlockingConnection
from pika that automatically handles queue declares and bindings plus retry logic built for its connection and consumption. It starts its own thread upon initialization and runs pika’sstart_consuming()
.-
__create_connection
() → pika.adapters.blocking_connection.BlockingConnection¶ Creates a pika BlockingConnection from the given connection parameters.
-
__init__
(exchange_name: str, queue_name: str, routing_key: str, callback: Callable, exchange_type: Optional[str] = 'direct', **kwargs)¶ - Parameters
exchange_name – Your exchange name.
queue_name – Your queue name.
routing_key – Your queue name.
callback – Your callback that should handle a consumed message
host – Your RabbitMQ host. Default:
"localhost"
port – Your RabbitMQ port. Default:
5672
username – Your RabbitMQ username. Default:
"guest"
password – Your RabbitMQ password. Default:
"guest"
connection_attempts – How many times should PyRMQ try? Default:
3
is_dlk_retry_enabled – Flag to enable DLK-based retry logic of consumed messages. Default:
False
retry_delay – Seconds between retries. Default:
5
retry_backoff_base – Exponential backoff base in seconds. Default:
2
retry_queue_suffix – The suffix that will be appended to the
queue_name
to act as the name of the retry_queue. Default:retry
max_retries – Number of maximum retries for DLK retry logic. Default:
20
queue_args – Your queue arguments. Default:
None
-
__send_consume_error_message
(retry_count: int, error: Exception) → None¶ Send error message to your preferred location. :param retry_count: Amount retries the Consumer tried before sending an error message. :param error: Error that prevented the Consumer from processing the callback.
-
__send_reconnection_error_message
(retry_count: int, error: Union[pika.exceptions.AMQPConnectionError, ConnectionResetError, pika.exceptions.ChannelClosedByBroker]) → None¶ Send error message to your preferred location. :param retry_count: Amount retries the Publisher tried before sending an error message. :param error: Error that prevented the Publisher from sending the message.
-
__weakref__
¶ list of weak references to the object (if defined)
-
_compute_expiration
(retry_count: int) → int¶ Computes message expiration time from the retry queue in seconds.
-
_consume_message
(channel, method, properties, data: dict) → None¶ Wraps the user provided callback and gracefully handles its errors and calling pika’s
basic_ack
once successful. :param channel: pika’s Channel this message was received. :param method: pika’s basic Return :param properties: pika’s BasicProperties :param data: Data received in bytes.
-
_publish_to_retry_queue
(data: dict, properties, retry_reason: Exception = None) → None¶ Publishes message to retry queue with the appropriate metadata in the headers.
-
close
() → None¶ Manually closes a connection to RabbitMQ. Useful for debugging and tests.
-
connect
(retry_count=1) → None¶ Creates a BlockingConnection from pika and initializes queue bindings. :param retry_count: Amount retries the Publisher tried before sending an error message.
-
consume
(retry_count=1) → None¶ Wraps pika’s
basic_consume()
andstart_consuming()
with retry logic.
-
declare_queue
() → None¶ Declare and a bind a channel to a queue.
-