A Python implementation for subscribing and publishing messages via RabbitMQ.
Project description
Twingly::PYAMQP
A Python implementation of the twingly-amqp gem for subscribing and publishing messages via RabbitMQ.
Usage
Environment variables:
RABBITMQ_N_HOST- Defaults tolocalhostAMQP_USERNAME- Defaults toguestAMQP_PASSWORD- Defaults toguest
Docs
AMQPconfig
Used to configure RabbitMQ host, port, user, password, and ssl. Arguments take precedence over environment variables and should only be used to override environment or default values, since env variables and default values are used if no AMQPconfig is provided.
Arguments
- rabbitmq_host
- rabbitmq_port
- amqp_user
- amqp_password
- ssl
Publisher
Arguments
Constructor
| Argument | Type | Default | Description |
|---|---|---|---|
exchange_name |
str | None |
None |
The name of the exchange to route the messages to. Leave empty to publish to default exchange. |
routing_key |
str | None |
None |
The routing key used for directing the message. |
config |
AMQPconfig | None |
None |
Optional override configuration for AMQP connection settings. |
publish_args |
dict | None |
None |
Additional arguments that match the publish method arguments of Kombu's Producer. |
exchange_opts |
dict | None |
None |
Exchange options. |
max_retries |
int | None |
None |
Number of attempts to reconnect and publish. |
compress_fields |
list[str] | None |
None |
List of fields to compress before publishing. |
compression_level |
int | None |
None |
level is the compression level – an integer from 0 to 9 or -1. A value of 1 (Z_BEST_SPEED) is fastest and produces the least compression, while a value of 9 (Z_BEST_COMPRESSION) is slowest and produces the most. |
Methods
publish
| Argument | Type | Default | Description |
|---|---|---|---|
payload |
object |
No | The message to publish to the exchange. |
routing_key |
str | None |
None |
Optionally override the default routing key. |
publish_args |
dict | None |
None |
Additional publishing arguments. |
Example Usage
# Create an instance of Publisher with default values
publisher = Publisher(compress_fields=["payload"])
# Create an instance of Publisher with a specific routing key
publisher = Publisher(exchange_name="custom_exchange", routing_key="custom_routing_key")
# Publish messages
publisher.publish({"message": "hello, RabbitMQ"}) # Uses the routing key specified at instantiation
publisher.publish({"message": "hello, RabbitMQ"}, routing_key="override_routing_key") # Overrides routing key
# Publish message with additional arguments
publisher.publish({"message": "hello, RabbitMQ"}, publish_args={"priority": 7})
Compression
The Publisher class supports compressing specified fields in the message payload using zlib compression. This is particularly useful for reducing the size of large text fields before sending them over the network.
Limitations
- Only fields of type
strare supported for compression. Attempting to compress fields of other types will raise aTypeError. - The payload must be an indexable object (like a dictionary) for compression to work, as fields are accessed via
payload[field].
Subscription
Arguments
Constructor
| Argument | Type | Default | Description |
|---|---|---|---|
queue_names |
str | list[str] |
No | The name of the queue(s) to subscribe to. Accepts a single name or a list. |
config |
AMQPconfig | None |
None |
Optional override configuration for AMQP connection settings. |
exchange_name |
str | None |
None |
Name of Exchange |
bindings |
dict[str, list[str]] | None |
None |
Bind Exchange to Queue, dict queue name keys and a list of routing keys. |
queue_opts |
dict | None |
None |
Optional queue options such as Durable, etc. |
exchange_opts |
dict | None |
None |
Optional exchange options |
heartbeat |
int |
30 |
Hearbeat to check the connection. |
compressed_fields |
list[str] | None |
None |
Optional list of fields to decompress when receiving messages. |
Methods
subscribe
| Argument | Type | Default | Description |
|---|---|---|---|
callbacks |
Callable[[str, object], None] | list[Callable[[str, object], None]] |
No | The function(s) to process incoming messages. |
blocking |
bool |
True |
If True, blocks the main thread while consuming messages. |
consumer_args |
dict | None |
None |
Additional arguments that match Kombu's Consumer arguments. |
Raises RuntimeError if a subscription is already active.
cancel
| Argument | Type | Default | Description |
|---|---|---|---|
| None | - | - | Cancels the active subscription and stops consuming messages. |
purge_queue
| Argument | Type | Default | Description |
|---|---|---|---|
| queue_name | str | - | Name of queue to purge |
Example Usage
# Create an instance of Subscription for a single queue
subscription = Subscription(queue_names="task_queue")
# Create an instance of Subscription for multiple queues
subscription = Subscription(queue_names=["queue1", "queue2"])
# Subscribe to messages in blocking mode
subscription.subscribe(callback=Callable[[str, object], None])
# Subscribe to messages in non-blocking mode with a timeout
subscription.subscribe(callback=Callable[[str, object], None], blocking=False, timeout=5,consumer_args={"no_ack": True, "prefetch_count": 5})
# Cancel the subscription
subscription.cancel()
Decompression
The Subscription class supports decompressing specified fields in incoming message payloads that were previously compressed using zlib compression. If a message contains compressed_fields in its headers, those fields will be decompressed upon receipt. The user can also specify a default list of fields to decompress when initializing the Subscription instance.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file twingly_pyamqp-0.2.3.tar.gz.
File metadata
- Download URL: twingly_pyamqp-0.2.3.tar.gz
- Upload date:
- Size: 6.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8e5a85c45b37782b43a146fe8733dbc84aadc38aa866cef2570f0e3e5586bdb3
|
|
| MD5 |
69d6972de262dc20ba5eddc7b814cc9f
|
|
| BLAKE2b-256 |
f9a9ebc0f0e8ade7e805f9951e7c348e9043804cb09072fb41dfadd005b02d95
|
Provenance
The following attestation bundles were made for twingly_pyamqp-0.2.3.tar.gz:
Publisher:
cd.yml on twingly/twingly-pyamqp
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
twingly_pyamqp-0.2.3.tar.gz -
Subject digest:
8e5a85c45b37782b43a146fe8733dbc84aadc38aa866cef2570f0e3e5586bdb3 - Sigstore transparency entry: 604899154
- Sigstore integration time:
-
Permalink:
twingly/twingly-pyamqp@e16ec8eb578ea235875246e02dcd33ccf2201047 -
Branch / Tag:
refs/tags/v0.2.3 - Owner: https://github.com/twingly
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
cd.yml@e16ec8eb578ea235875246e02dcd33ccf2201047 -
Trigger Event:
push
-
Statement type:
File details
Details for the file twingly_pyamqp-0.2.3-py3-none-any.whl.
File metadata
- Download URL: twingly_pyamqp-0.2.3-py3-none-any.whl
- Upload date:
- Size: 7.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
74ee6d715ddfedc1bb9880ace0ff8d69b5c9ff13a34b79c920adde1e520679d1
|
|
| MD5 |
4ed2366dbca393fcb333c6ea043a73b7
|
|
| BLAKE2b-256 |
7d5f2c309fead97e5c1766042b48a0accbb34ee06e8d6ba7b812d009694c5cde
|
Provenance
The following attestation bundles were made for twingly_pyamqp-0.2.3-py3-none-any.whl:
Publisher:
cd.yml on twingly/twingly-pyamqp
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
twingly_pyamqp-0.2.3-py3-none-any.whl -
Subject digest:
74ee6d715ddfedc1bb9880ace0ff8d69b5c9ff13a34b79c920adde1e520679d1 - Sigstore transparency entry: 604899158
- Sigstore integration time:
-
Permalink:
twingly/twingly-pyamqp@e16ec8eb578ea235875246e02dcd33ccf2201047 -
Branch / Tag:
refs/tags/v0.2.3 - Owner: https://github.com/twingly
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
cd.yml@e16ec8eb578ea235875246e02dcd33ccf2201047 -
Trigger Event:
push
-
Statement type: