API Documentation¶
Publisher Class¶
- class pyrmq.Publisher(exchange_name: str, queue_name: Optional[str] = '', routing_key: Optional[str] = '', exchange_type: Optional[str] = 'direct', **kwargs)¶
This class uses a
BlockingConnectionfrom pika that automatically handles queue declarations and bindings plus retry logic built for its connection and publishing.- __create_connection() BlockingConnection¶
Create pika’s
BlockingConnectionfrom the given connection parameters.
- __init__(exchange_name: str, queue_name: Optional[str] = '', routing_key: Optional[str] = '', exchange_type: Optional[str] = 'direct', **kwargs)¶
- Parameters
exchange_name – Your exchange name.
queue_name – Your queue name.
routing_key – Your queue name.
exchange_type – Exchange type to declare. Default:
"direct"host – Your RabbitMQ host. Checks env var
RABBITMQ_HOST. Default:"localhost"port – Your RabbitMQ port. Checks env var
RABBITMQ_PORT. Default:5672username – Your RabbitMQ username. Default:
"guest"password – Your RabbitMQ password. Default:
"guest"connection_attempts – How many times should PyRMQ try?. Default:
3retry_delay – Seconds between connection retries. Default:
5error_callback – Callback function to be called when connection_attempts is reached.
infinite_retry – Tells PyRMQ to keep on retrying to publish while firing error_callback, if any. Default:
Falseexchange_args – Your exchange arguments. Default:
Nonequeue_args – Your queue arguments. Default:
None
- __send_reconnection_error_message(error, retry_count) None¶
Send error message to your preferred location. :param error: Error that prevented the Publisher from sending the message. :param retry_count: Amount retries the Publisher tried before sending an error message.
- __weakref__¶
list of weak references to the object (if defined)
- connect(retry_count=1) BlockingChannel¶
Create pika’s
BlockingConnectionand initialize queue bindings. :param retry_count: Amount retries the Publisher tried before sending an error message.
- declare_queue(channel) None¶
Declare and bind a channel to a queue. :param channel: pika Channel
- publish(data: dict, priority: Optional[int] = None, message_properties: Optional[dict] = None, attempt: int = 0, retry_count: int = 1) None¶
Publish data to RabbitMQ. :param data: Data to be published. :param priority: Message priority. Only works if
x-max-priorityis defined as queue argument. :param message_properties: Message properties. Default:{"delivery_mode": 2}:param attempt: Number of attempts made. :param retry_count: Amount retries the Publisher tried before sending an error message.
Consumer Class¶
- class pyrmq.Consumer(exchange_name: str, queue_name: str, routing_key: str, callback: Callable, exchange_type: Optional[str] = 'direct', **kwargs)¶
This class uses a
BlockingConnectionfrom pika that automatically handles queue declarations and bindings plus retry logic built for its connection and consumption. It starts its own thread upon initialization and runs pika’sstart_consuming().- __create_connection() BlockingConnection¶
Create pika’s
BlockingConnectionfrom the given connection parameters.
- __init__(exchange_name: str, queue_name: str, routing_key: str, callback: Callable, exchange_type: Optional[str] = 'direct', **kwargs)¶
- Parameters
exchange_name – Your exchange name.
queue_name – Your queue name.
routing_key – Your queue name.
callback – Your callback that should handle a consumed message
host – Your RabbitMQ host. Default:
"localhost"port – Your RabbitMQ port. Default:
5672username – Your RabbitMQ username. Default:
"guest"password – Your RabbitMQ password. Default:
"guest"connection_attempts – How many times should PyRMQ try? Default:
3is_dlk_retry_enabled – Flag to enable DLK-based retry logic of consumed messages. Default:
Falseretry_delay – Seconds between connection retries. Default:
5retry_interval – Seconds between consumption retries. Default:
900retry_queue_suffix – The suffix that will be appended to the
queue_nameto act as the name of the retry_queue. Default:retrymax_retries – Number of maximum retries for DLK retry logic. Default:
20exchange_args – Your exchange arguments. Default:
Nonequeue_args – Your queue arguments. Default:
Nonebound_exchange – The exchange this consumer needs to bind to. This is an object that has two keys,
nameandtype. Default:Noneauto_ack – Flag whether to ack or nack the consumed message regardless of its outcome. Default:
Trueprefetch_count – How many messages should the consumer retrieve at a time for consumption. Default:
1
- __run_error_callback(message: str, error: Exception, error_type: str) None¶
Log error message :param message: Message to be logged in error_callback :param error: Error encountered in consuming the message :param error_type: Type of error (CONNECT_ERROR or CONSUME_ERROR)
- __send_consume_error_message(error: Exception, retry_count: int = 1) None¶
Send error message to your preferred location. :param error: Error that prevented the Consumer from processing the message. :param retry_count: Amount retries the Consumer tried before sending an error message.
- __send_reconnection_error_message(error: Union[AMQPConnectionError, ConnectionResetError, ChannelClosedByBroker], retry_count: int) None¶
Send error message to your preferred location. :param error: Error that prevented the Consumer from processing the message. :param retry_count: Amount retries the Consumer tried before sending an error message.
- __weakref__¶
list of weak references to the object (if defined)
- _consume_message(channel, method, properties, data: dict) None¶
Wrap the user-provided callback, gracefully handle its errors, and call pika’s
basic_ackonce successful. :param channel: pika’s Channel this message was received. :param method: pika’s basic Return :param properties: pika’s BasicProperties :param data: Data received in bytes.
- _publish_to_retry_queue(data: dict, properties, retry_reason: Exception) None¶
Publish message to retry queue with the appropriate metadata in the headers.
- close() None¶
Manually close a connection to RabbitMQ. This is useful for debugging and tests.
- connect(retry_count=1) None¶
Create pika’s
BlockingConnectionand initialize queue bindings. :param retry_count: Amount retries the Consumer tried before sending an error message.
- consume(retry_count=1) None¶
Wrap pika’s
basic_consume()andstart_consuming()with retry logic.
- declare_queue() None¶
Declare and bind a channel to a queue.