A Python implementation for subscribing and publishing messages via RabbitMQ.
Project description
Twingly::PYAMQP
A Python implementation of the twingly-amqp gem for subscribing and publishing messages via RabbitMQ.
Installation
Using SSH
Poetry
poetry add git+ssh://git@github.com/twingly/twingly-pyamqp.git
PIP
pip install git+ssh://git@github.com/twingly/twingly-pyamqp.git
Using Private Access Token
Generate private access token via github
poetry add git+https://{ACCESS_TOKEN}@github.com/twingly/twingly-pyamqp.git
Usage
Environment variables:
RABBITMQ_N_HOST- Defaults tolocalhostAMQP_USERNAME- Defaults toguestAMQP_PASSWORD- Defaults toguest
Docs
AMQPconfig
Used to configure RabbitMQ host, port, user, and password. Arguments take precedence over environment variables and should only be used to override environment or default values, since env variables and default values are used if no AMQPconfig is provided.
Arguments
- rabbitmq_host
- rabbitmq_port
- amqp_user
- amqp_password
AMQP Connection
Exchange options match Kombu Exchange Arguments, similarly, queue options match those defined in Kombu Queue Arguments.
Arguments
Constructor
| Argument | Type | Default | Description |
|---|---|---|---|
config |
AMQPconfig | None |
None |
Optional AMQPconfig to override RabbitMQ connection. Defaults to using environment variables. |
Methods
declare_queue
| Argument | Type | Default | Description |
|---|---|---|---|
queue_name |
str |
No | The name of the queue to declare. |
exchange_name |
str | None |
None |
The name of the exchange the queue is bound to. If None, uses the default exchange. |
routing_key |
str | None |
None |
The routing key for the queue. If None, defaults to queue_name. |
exchange_opts |
dict | None |
None |
Optional dictionary of options to configure the exchange. |
queue_opts |
dict | None |
None |
Optional dictionary of options to configure the queue. |
Raises ValueError if a routing key is set without specifying an exchange.
declare_exchange
| Argument | Type | Default | Description |
|---|---|---|---|
exchange_name |
str |
No | The name of the exchange to declare. |
exchange_opts |
dict | None |
None |
Optional dictionary of options for the exchange. |
Example Usage
# Establish an AMQP connection
connection = AmqpConnection()
# Declare an exchange
connection.declare_exchange("logs")
# Declare an exchange with optional options
connection.declare_exchange("logs", exchange_opts={"type": "topic", "durable": False})
# Declare a queue on default exchange
connection.declare_queue(queue_name="task_queue")
# Declare a queue and bind to exchange and routing key
connection.declare_queue(
queue_name="task_queue",
exchange_name="logs",
routing_key="task_key",
queue_opts={"max_length": 1000}
)
Publisher
Arguments
Constructor
| Argument | Type | Default | Description |
|---|---|---|---|
exchange_name |
str | None |
None |
The name of the exchange to route the messages to. Leave empty to publish to default exchange. |
routing_key |
str | None |
None |
The routing key used for directing the message. |
config |
AMQPconfig | None |
None |
Optional override configuration for AMQP connection settings. |
publish_args |
dict | None |
None |
Additional arguments that match the publish method arguments of Kombu's Producer. |
Methods
publish
| Argument | Type | Default | Description |
|---|---|---|---|
payload |
object |
No | The message to publish to the exchange. |
routing_key |
str | None |
None |
Optionally override the default routing key. |
Raises ValueError if no routing key is provided at instantiation or publication time.
Example Usage
# Create an instance of Publisher with default values
publisher = Publisher()
# Create an instance of Publisher with a specific routing key
publisher = Publisher(exchange_name="custom_exchange", routing_key="custom_routing_key")
# Publish messages
publisher.publish({"message": "hello, RabbitMQ"}) # Uses the routing key specified at instantiation
publisher.publish({"message": "hello, RabbitMQ"}, routing_key="override_routing_key") # Overrides routing key
# Publish message with additional arguments
publisher.publish({"message": "hello, RabbitMQ"}, publish_args={"priority": 7})
Subscription
Arguments
Constructor
| Argument | Type | Default | Description |
|---|---|---|---|
queue_names |
str | list[str] |
No | The name of the queue(s) to subscribe to. Accepts a single name or a list. |
config |
AMQPconfig | None |
None |
Optional override configuration for AMQP connection settings. |
Methods
subscribe
| Argument | Type | Default | Description |
|---|---|---|---|
callbacks |
Callable[[str, object], None] | list[Callable[[str, object], None]] |
No | The function(s) to process incoming messages. |
blocking |
bool |
True |
If True, blocks the main thread while consuming messages. |
timeout |
int | None |
None |
Maximum time (in seconds) to wait for messages. Required if blocking=False. |
max_connect_attempt |
int |
3 |
Maximum number of retries for establishing a connection. |
consumer_args |
dict | None |
None |
Additional arguments that match Kombu's Consumer arguments. |
Raises ValueError if blocking=False and no timeout is provided.
Raises ValueError if a subscription is already active.
cancel
| Argument | Type | Default | Description |
|---|---|---|---|
| None | - | - | Cancels the active subscription and stops consuming messages. |
Example Usage
# Create an instance of Subscription for a single queue
subscription = Subscription(queue_names="task_queue")
# Create an instance of Subscription for multiple queues
subscription = Subscription(queue_names=["queue1", "queue2"])
# Subscribe to messages in blocking mode
subscription.subscribe(callback=Callable[[str, object], None])
# Subscribe to messages in non-blocking mode with a timeout
subscription.subscribe(callback=Callable[[str, object], None], blocking=False, timeout=5,consumer_args={"no_ack": True, "prefetch_count": 5})
# Cancel the subscription
subscription.cancel()
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file twingly_pyamqp-0.1.0.tar.gz.
File metadata
- Download URL: twingly_pyamqp-0.1.0.tar.gz
- Upload date:
- Size: 5.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
45414e2f71d6871e88d39b31d2228b5f9d2c7b206fe5f6098b7dbaa70b8eb787
|
|
| MD5 |
a4526fe20c29d3a72cd6e1c83e3d52bf
|
|
| BLAKE2b-256 |
07b3da1d2ce8817fcb3215c644da10d51c14db4ab2e166a4709465756d433791
|
Provenance
The following attestation bundles were made for twingly_pyamqp-0.1.0.tar.gz:
Publisher:
cd.yml on twingly/twingly-pyamqp
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
twingly_pyamqp-0.1.0.tar.gz -
Subject digest:
45414e2f71d6871e88d39b31d2228b5f9d2c7b206fe5f6098b7dbaa70b8eb787 - Sigstore transparency entry: 177344129
- Sigstore integration time:
-
Permalink:
twingly/twingly-pyamqp@ee4c229a6dee81d46a34450fe0dd11b2a0816158 -
Branch / Tag:
refs/tags/Release0.1.0 - Owner: https://github.com/twingly
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
cd.yml@ee4c229a6dee81d46a34450fe0dd11b2a0816158 -
Trigger Event:
push
-
Statement type:
File details
Details for the file twingly_pyamqp-0.1.0-py3-none-any.whl.
File metadata
- Download URL: twingly_pyamqp-0.1.0-py3-none-any.whl
- Upload date:
- Size: 6.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
403ffb1947ad2238ea1deb4155991842edd743c1a46c7f59550be9439b01e74e
|
|
| MD5 |
1bd6b35d22e8fa556d8148edc6b8a76a
|
|
| BLAKE2b-256 |
189491a3accbf657ee26d32a62b17bb70a190725d59f91a50d87d2c3b071411c
|
Provenance
The following attestation bundles were made for twingly_pyamqp-0.1.0-py3-none-any.whl:
Publisher:
cd.yml on twingly/twingly-pyamqp
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
twingly_pyamqp-0.1.0-py3-none-any.whl -
Subject digest:
403ffb1947ad2238ea1deb4155991842edd743c1a46c7f59550be9439b01e74e - Sigstore transparency entry: 177344133
- Sigstore integration time:
-
Permalink:
twingly/twingly-pyamqp@ee4c229a6dee81d46a34450fe0dd11b2a0816158 -
Branch / Tag:
refs/tags/Release0.1.0 - Owner: https://github.com/twingly
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
cd.yml@ee4c229a6dee81d46a34450fe0dd11b2a0816158 -
Trigger Event:
push
-
Statement type: