Skip to main content

A Python implementation for subscribing and publishing messages via RabbitMQ.

Project description

Twingly::PYAMQP

GitHub Build Status

A Python implementation of the twingly-amqp gem for subscribing and publishing messages via RabbitMQ.

Usage

Environment variables:

  • RABBITMQ_N_HOST - Defaults to localhost
  • AMQP_USERNAME - Defaults to guest
  • AMQP_PASSWORD - Defaults to guest

Docs

AMQPconfig

Used to configure RabbitMQ host, port, user, password, and ssl. Arguments take precedence over environment variables and should only be used to override environment or default values, since env variables and default values are used if no AMQPconfig is provided.

Arguments

  • rabbitmq_host
  • rabbitmq_port
  • amqp_user
  • amqp_password
  • ssl

Publisher

Arguments

Constructor
Argument Type Default Description
exchange_name str | None None The name of the exchange to route the messages to. Leave empty to publish to default exchange.
routing_key str | None None The routing key used for directing the message.
config AMQPconfig | None None Optional override configuration for AMQP connection settings.
publish_args dict | None None Additional arguments that match the publish method arguments of Kombu's Producer.
exchange_opts dic | None None Exchange options.
max_retries int | None None Number of attempts to reconnect and publish.
Methods
publish
Argument Type Default Description
payload object No The message to publish to the exchange.
routing_key str | None None Optionally override the default routing key.
publish_args dict | None None Additional publishing arguments.

Example Usage

# Create an instance of Publisher with default values
publisher = Publisher()

# Create an instance of Publisher with a specific routing key
publisher = Publisher(exchange_name="custom_exchange", routing_key="custom_routing_key")

# Publish messages
publisher.publish({"message": "hello, RabbitMQ"})  # Uses the routing key specified at instantiation
publisher.publish({"message": "hello, RabbitMQ"}, routing_key="override_routing_key") # Overrides routing key

# Publish message with additional arguments
publisher.publish({"message": "hello, RabbitMQ"}, publish_args={"priority": 7})

Subscription

Arguments

Constructor
Argument Type Default Description
queue_names str | list[str] No The name of the queue(s) to subscribe to. Accepts a single name or a list.
config AMQPconfig | None None None
exchange_name str | None None Name of Exchange
bindings dict[str, list[str]] | None None Bind Exchange to Queue, dict queue name keys and a list of routing keys.
queue_opts dict | None None Optional queue options such as Durable, etc.
exchange_opts dict | None None Optional exchange options
heartbeat int 30 Hearbeat to check the connection.
Methods
subscribe
Argument Type Default Description
callbacks Callable[[str, object], None] | list[Callable[[str, object], None]] No The function(s) to process incoming messages.
blocking bool True If True, blocks the main thread while consuming messages.
consumer_args dict | None None Additional arguments that match Kombu's Consumer arguments.

Raises RuntimeError if a subscription is already active.

cancel
Argument Type Default Description
None - - Cancels the active subscription and stops consuming messages.
purge_queue
Argument Type Default Description
queue_name str - Name of queue to purge

Example Usage

# Create an instance of Subscription for a single queue
subscription = Subscription(queue_names="task_queue")

# Create an instance of Subscription for multiple queues
subscription = Subscription(queue_names=["queue1", "queue2"])

# Subscribe to messages in blocking mode
subscription.subscribe(callback=Callable[[str, object], None])

# Subscribe to messages in non-blocking mode with a timeout
subscription.subscribe(callback=Callable[[str, object], None], blocking=False, timeout=5,consumer_args={"no_ack": True, "prefetch_count": 5})

# Cancel the subscription
subscription.cancel()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

twingly_pyamqp-0.2.1.tar.gz (5.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

twingly_pyamqp-0.2.1-py3-none-any.whl (6.1 kB view details)

Uploaded Python 3

File details

Details for the file twingly_pyamqp-0.2.1.tar.gz.

File metadata

  • Download URL: twingly_pyamqp-0.2.1.tar.gz
  • Upload date:
  • Size: 5.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for twingly_pyamqp-0.2.1.tar.gz
Algorithm Hash digest
SHA256 066e1c3e975ac81ad831a1ae50ae0b7d303ab41e801e5844cc5b58686c8f030e
MD5 1191c2b949427d46fb2390e2891f74ed
BLAKE2b-256 8d040101bb1812612fb0d2a0a3b2212c6cb2ea00fc06a88506f3dfd1328f5b4d

See more details on using hashes here.

Provenance

The following attestation bundles were made for twingly_pyamqp-0.2.1.tar.gz:

Publisher: cd.yml on twingly/twingly-pyamqp

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file twingly_pyamqp-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: twingly_pyamqp-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 6.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for twingly_pyamqp-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 7506c641f08c76140a9afdc859476d7d28dfb47faecc3b5f067aa8678ef04941
MD5 e575544e518881614f67a4b664910403
BLAKE2b-256 36780412e1fa8e355299c594c6a379bcb660f40ece7bce62ef6964f1afda6626

See more details on using hashes here.

Provenance

The following attestation bundles were made for twingly_pyamqp-0.2.1-py3-none-any.whl:

Publisher: cd.yml on twingly/twingly-pyamqp

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page