API Documentation¶
Publisher Class¶
- class pyrmq.Publisher(exchange_name: str, queue_name: Optional[str] = '', routing_key: Optional[str] = '', exchange_type: Optional[str] = 'direct', **kwargs)¶
This class uses a
BlockingConnection
from pika that automatically handles queue declarations and bindings plus retry logic built for its connection and publishing.- __create_connection() BlockingConnection ¶
Create pika’s
BlockingConnection
from the given connection parameters.
- __init__(exchange_name: str, queue_name: Optional[str] = '', routing_key: Optional[str] = '', exchange_type: Optional[str] = 'direct', **kwargs)¶
- Parameters
exchange_name – Your exchange name.
queue_name – Your queue name.
routing_key – Your queue name.
exchange_type – Exchange type to declare. Default:
"direct"
host – Your RabbitMQ host. Checks env var
RABBITMQ_HOST
. Default:"localhost"
port – Your RabbitMQ port. Checks env var
RABBITMQ_PORT
. Default:5672
username – Your RabbitMQ username. Default:
"guest"
password – Your RabbitMQ password. Default:
"guest"
connection_attempts – How many times should PyRMQ try?. Default:
3
retry_delay – Seconds between connection retries. Default:
5
error_callback – Callback function to be called when connection_attempts is reached.
infinite_retry – Tells PyRMQ to keep on retrying to publish while firing error_callback, if any. Default:
False
exchange_args – Your exchange arguments. Default:
None
queue_args – Your queue arguments. Default:
None
- __send_reconnection_error_message(error, retry_count) None ¶
Send error message to your preferred location. :param error: Error that prevented the Publisher from sending the message. :param retry_count: Amount retries the Publisher tried before sending an error message.
- __weakref__¶
list of weak references to the object (if defined)
- connect(retry_count=1) BlockingChannel ¶
Create pika’s
BlockingConnection
and initialize queue bindings. :param retry_count: Amount retries the Publisher tried before sending an error message.
- declare_queue(channel) None ¶
Declare and bind a channel to a queue. :param channel: pika Channel
- publish(data: dict, priority: Optional[int] = None, message_properties: Optional[dict] = None, attempt: int = 0, retry_count: int = 1) None ¶
Publish data to RabbitMQ. :param data: Data to be published. :param priority: Message priority. Only works if
x-max-priority
is defined as queue argument. :param message_properties: Message properties. Default:{"delivery_mode": 2}
:param attempt: Number of attempts made. :param retry_count: Amount retries the Publisher tried before sending an error message.
Consumer Class¶
- class pyrmq.Consumer(exchange_name: str, queue_name: str, routing_key: str, callback: Callable, exchange_type: Optional[str] = 'direct', **kwargs)¶
This class uses a
BlockingConnection
from pika that automatically handles queue declarations and bindings plus retry logic built for its connection and consumption. It starts its own thread upon initialization and runs pika’sstart_consuming()
.- __create_connection() BlockingConnection ¶
Create pika’s
BlockingConnection
from the given connection parameters.
- __init__(exchange_name: str, queue_name: str, routing_key: str, callback: Callable, exchange_type: Optional[str] = 'direct', **kwargs)¶
- Parameters
exchange_name – Your exchange name.
queue_name – Your queue name.
routing_key – Your queue name.
callback – Your callback that should handle a consumed message
host – Your RabbitMQ host. Default:
"localhost"
port – Your RabbitMQ port. Default:
5672
username – Your RabbitMQ username. Default:
"guest"
password – Your RabbitMQ password. Default:
"guest"
connection_attempts – How many times should PyRMQ try? Default:
3
is_dlk_retry_enabled – Flag to enable DLK-based retry logic of consumed messages. Default:
False
retry_delay – Seconds between connection retries. Default:
5
retry_interval – Seconds between consumption retries. Default:
900
retry_queue_suffix – The suffix that will be appended to the
queue_name
to act as the name of the retry_queue. Default:retry
max_retries – Number of maximum retries for DLK retry logic. Default:
20
exchange_args – Your exchange arguments. Default:
None
queue_args – Your queue arguments. Default:
None
bound_exchange – The exchange this consumer needs to bind to. This is an object that has two keys,
name
andtype
. Default:None
auto_ack – Flag whether to ack or nack the consumed message regardless of its outcome. Default:
True
prefetch_count – How many messages should the consumer retrieve at a time for consumption. Default:
1
- __run_error_callback(message: str, error: Exception, error_type: str) None ¶
Log error message :param message: Message to be logged in error_callback :param error: Error encountered in consuming the message :param error_type: Type of error (CONNECT_ERROR or CONSUME_ERROR)
- __send_consume_error_message(error: Exception, retry_count: int = 1) None ¶
Send error message to your preferred location. :param error: Error that prevented the Consumer from processing the message. :param retry_count: Amount retries the Consumer tried before sending an error message.
- __send_reconnection_error_message(error: Union[AMQPConnectionError, ConnectionResetError, ChannelClosedByBroker], retry_count: int) None ¶
Send error message to your preferred location. :param error: Error that prevented the Consumer from processing the message. :param retry_count: Amount retries the Consumer tried before sending an error message.
- __weakref__¶
list of weak references to the object (if defined)
- _consume_message(channel, method, properties, data: dict) None ¶
Wrap the user-provided callback, gracefully handle its errors, and call pika’s
basic_ack
once successful. :param channel: pika’s Channel this message was received. :param method: pika’s basic Return :param properties: pika’s BasicProperties :param data: Data received in bytes.
- _publish_to_retry_queue(data: dict, properties, retry_reason: Exception) None ¶
Publish message to retry queue with the appropriate metadata in the headers.
- close() None ¶
Manually close a connection to RabbitMQ. This is useful for debugging and tests.
- connect(retry_count=1) None ¶
Create pika’s
BlockingConnection
and initialize queue bindings. :param retry_count: Amount retries the Consumer tried before sending an error message.
- consume(retry_count=1) None ¶
Wrap pika’s
basic_consume()
andstart_consuming()
with retry logic.
- declare_queue() None ¶
Declare and bind a channel to a queue.