Skip to main content

A Python implementation for subscribing and publishing messages via RabbitMQ.

Project description

Twingly::PYAMQP

GitHub Build Status

A Python implementation of the twingly-amqp gem for subscribing and publishing messages via RabbitMQ.

Usage

Environment variables:

  • RABBITMQ_N_HOST - Defaults to localhost
  • AMQP_USERNAME - Defaults to guest
  • AMQP_PASSWORD - Defaults to guest

Docs

AMQPconfig

Used to configure RabbitMQ host, port, user, password, and ssl. Arguments take precedence over environment variables and should only be used to override environment or default values, since env variables and default values are used if no AMQPconfig is provided.

Arguments

  • rabbitmq_host
  • rabbitmq_port
  • amqp_user
  • amqp_password
  • ssl

Publisher

Arguments

Constructor
Argument Type Default Description
exchange_name str | None None The name of the exchange to route the messages to. Leave empty to publish to default exchange.
routing_key str | None None The routing key used for directing the message.
config AMQPconfig | None None Optional override configuration for AMQP connection settings.
publish_args dict | None None Additional arguments that match the publish method arguments of Kombu's Producer.
exchange_opts dict | None None Exchange options.
max_retries int | None None Number of attempts to reconnect and publish.
compress_fields list[str] | None None List of fields to compress before publishing.
compression_level int | None None level is the compression level – an integer from 0 to 9 or -1. A value of 1 (Z_BEST_SPEED) is fastest and produces the least compression, while a value of 9 (Z_BEST_COMPRESSION) is slowest and produces the most.
Methods
publish
Argument Type Default Description
payload object No The message to publish to the exchange.
routing_key str | None None Optionally override the default routing key.
publish_args dict | None None Additional publishing arguments.

Example Usage

# Create an instance of Publisher with default values
publisher = Publisher(compress_fields=["payload"])

# Create an instance of Publisher with a specific routing key
publisher = Publisher(exchange_name="custom_exchange", routing_key="custom_routing_key")

# Publish messages
publisher.publish({"message": "hello, RabbitMQ"})  # Uses the routing key specified at instantiation
publisher.publish({"message": "hello, RabbitMQ"}, routing_key="override_routing_key") # Overrides routing key

# Publish message with additional arguments
publisher.publish({"message": "hello, RabbitMQ"}, publish_args={"priority": 7})

Compression

The Publisher class supports compressing specified fields in the message payload using zlib compression. This is particularly useful for reducing the size of large text fields before sending them over the network.

Limitations
  • Only fields of type str are supported for compression. Attempting to compress fields of other types will raise a TypeError.
  • The payload must be an indexable object (like a dictionary) for compression to work, as fields are accessed via payload[field].

Subscription

Arguments

Constructor
Argument Type Default Description
queue_names str | list[str] No The name of the queue(s) to subscribe to. Accepts a single name or a list.
config AMQPconfig | None None Optional override configuration for AMQP connection settings.
exchange_name str | None None Name of Exchange
bindings dict[str, list[str]] | None None Bind Exchange to Queue, dict queue name keys and a list of routing keys.
queue_opts dict | None None Optional queue options such as Durable, etc.
exchange_opts dict | None None Optional exchange options
heartbeat int 30 Hearbeat to check the connection.
compressed_fields list[str] | None None Optional list of fields to decompress when receiving messages.
Methods
subscribe
Argument Type Default Description
callbacks Callable[[str, object], None] | list[Callable[[str, object], None]] No The function(s) to process incoming messages.
blocking bool True If True, blocks the main thread while consuming messages.
consumer_args dict | None None Additional arguments that match Kombu's Consumer arguments.

Raises RuntimeError if a subscription is already active.

cancel
Argument Type Default Description
None - - Cancels the active subscription and stops consuming messages.
purge_queue
Argument Type Default Description
queue_name str - Name of queue to purge

Example Usage

# Create an instance of Subscription for a single queue
subscription = Subscription(queue_names="task_queue")

# Create an instance of Subscription for multiple queues
subscription = Subscription(queue_names=["queue1", "queue2"])

# Subscribe to messages in blocking mode
subscription.subscribe(callback=Callable[[str, object], None])

# Subscribe to messages in non-blocking mode with a timeout
subscription.subscribe(callback=Callable[[str, object], None], blocking=False, timeout=5,consumer_args={"no_ack": True, "prefetch_count": 5})

# Cancel the subscription
subscription.cancel()

Decompression

The Subscription class supports decompressing specified fields in incoming message payloads that were previously compressed using zlib compression. If a message contains compressed_fields in its headers, those fields will be decompressed upon receipt. The user can also specify a default list of fields to decompress when initializing the Subscription instance.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

twingly_pyamqp-0.2.2.tar.gz (6.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

twingly_pyamqp-0.2.2-py3-none-any.whl (7.1 kB view details)

Uploaded Python 3

File details

Details for the file twingly_pyamqp-0.2.2.tar.gz.

File metadata

  • Download URL: twingly_pyamqp-0.2.2.tar.gz
  • Upload date:
  • Size: 6.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for twingly_pyamqp-0.2.2.tar.gz
Algorithm Hash digest
SHA256 72d45a3dbb1f2f21c430e24f7e15ee3e53b98b1f101fa4ad955e66061b0039f9
MD5 d26c50cf4555288dc45be1019d34e6ae
BLAKE2b-256 f9306912b7d2ad97e33cc10f853a31bcf9d3210dd0bf25254f139396f420a246

See more details on using hashes here.

Provenance

The following attestation bundles were made for twingly_pyamqp-0.2.2.tar.gz:

Publisher: cd.yml on twingly/twingly-pyamqp

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file twingly_pyamqp-0.2.2-py3-none-any.whl.

File metadata

  • Download URL: twingly_pyamqp-0.2.2-py3-none-any.whl
  • Upload date:
  • Size: 7.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for twingly_pyamqp-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 4728c703d923ba5194991936131d2d1827bd748a06cd0d6507be887dcff80143
MD5 b19ca2b2bf394f2140dba8708696bece
BLAKE2b-256 2aabfcf8cea3af4df9e03769955bdcfb88655a4ea6f6b211d12430880638d86c

See more details on using hashes here.

Provenance

The following attestation bundles were made for twingly_pyamqp-0.2.2-py3-none-any.whl:

Publisher: cd.yml on twingly/twingly-pyamqp

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page