Skip to main content

A Python implementation for subscribing and publishing messages via RabbitMQ.

Project description

Twingly::PYAMQP

GitHub Build Status

A Python implementation of the twingly-amqp gem for subscribing and publishing messages via RabbitMQ.

Installation

Using SSH

Poetry

poetry add git+ssh://git@github.com/twingly/twingly-pyamqp.git

PIP

pip install git+ssh://git@github.com/twingly/twingly-pyamqp.git

Using Private Access Token

Generate private access token via github

poetry add git+https://{ACCESS_TOKEN}@github.com/twingly/twingly-pyamqp.git

Usage

Environment variables:

  • RABBITMQ_N_HOST - Defaults to localhost
  • AMQP_USERNAME - Defaults to guest
  • AMQP_PASSWORD - Defaults to guest

Docs

AMQPconfig

Used to configure RabbitMQ host, port, user, and password. Arguments take precedence over environment variables and should only be used to override environment or default values, since env variables and default values are used if no AMQPconfig is provided.

Arguments

  • rabbitmq_host
  • rabbitmq_port
  • amqp_user
  • amqp_password

AMQP Connection

Exchange options match Kombu Exchange Arguments, similarly, queue options match those defined in Kombu Queue Arguments.

Arguments

Constructor
Argument Type Default Description
config AMQPconfig | None None Optional AMQPconfig to override RabbitMQ connection. Defaults to using environment variables.

Methods

declare_queue
Argument Type Default Description
queue_name str No The name of the queue to declare.
exchange_name str | None None The name of the exchange the queue is bound to. If None, uses the default exchange.
routing_key str | None None The routing key for the queue. If None, defaults to queue_name.
exchange_opts dict | None None Optional dictionary of options to configure the exchange.
queue_opts dict | None None Optional dictionary of options to configure the queue.

Raises ValueError if a routing key is set without specifying an exchange.

declare_exchange
Argument Type Default Description
exchange_name str No The name of the exchange to declare.
exchange_opts dict | None None Optional dictionary of options for the exchange.

Example Usage

# Establish an AMQP connection
connection = AmqpConnection()

# Declare an exchange
connection.declare_exchange("logs")

# Declare an exchange with optional options
connection.declare_exchange("logs", exchange_opts={"type": "topic", "durable": False})

# Declare a queue on default exchange
connection.declare_queue(queue_name="task_queue")

# Declare a queue and bind to exchange and routing key
connection.declare_queue(
    queue_name="task_queue",
    exchange_name="logs",
    routing_key="task_key",
    queue_opts={"max_length": 1000}
)

Publisher

Arguments

Constructor
Argument Type Default Description
exchange_name str | None None The name of the exchange to route the messages to. Leave empty to publish to default exchange.
routing_key str | None None The routing key used for directing the message.
config AMQPconfig | None None Optional override configuration for AMQP connection settings.
publish_args dict | None None Additional arguments that match the publish method arguments of Kombu's Producer.
Methods
publish
Argument Type Default Description
payload object No The message to publish to the exchange.
routing_key str | None None Optionally override the default routing key.

Raises ValueError if no routing key is provided at instantiation or publication time.

Example Usage

# Create an instance of Publisher with default values
publisher = Publisher()

# Create an instance of Publisher with a specific routing key
publisher = Publisher(exchange_name="custom_exchange", routing_key="custom_routing_key")

# Publish messages
publisher.publish({"message": "hello, RabbitMQ"})  # Uses the routing key specified at instantiation
publisher.publish({"message": "hello, RabbitMQ"}, routing_key="override_routing_key") # Overrides routing key

# Publish message with additional arguments
publisher.publish({"message": "hello, RabbitMQ"}, publish_args={"priority": 7})

Subscription

Arguments

Constructor
Argument Type Default Description
queue_names str | list[str] No The name of the queue(s) to subscribe to. Accepts a single name or a list.
config AMQPconfig | None None Optional override configuration for AMQP connection settings.
Methods
subscribe
Argument Type Default Description
callbacks Callable[[str, object], None] | list[Callable[[str, object], None]] No The function(s) to process incoming messages.
blocking bool True If True, blocks the main thread while consuming messages.
timeout int | None None Maximum time (in seconds) to wait for messages. Required if blocking=False.
max_connect_attempt int 3 Maximum number of retries for establishing a connection.
consumer_args dict | None None Additional arguments that match Kombu's Consumer arguments.

Raises ValueError if blocking=False and no timeout is provided. Raises ValueError if a subscription is already active.

cancel
Argument Type Default Description
None - - Cancels the active subscription and stops consuming messages.

Example Usage

# Create an instance of Subscription for a single queue
subscription = Subscription(queue_names="task_queue")

# Create an instance of Subscription for multiple queues
subscription = Subscription(queue_names=["queue1", "queue2"])

# Subscribe to messages in blocking mode
subscription.subscribe(callback=Callable[[str, object], None])

# Subscribe to messages in non-blocking mode with a timeout
subscription.subscribe(callback=Callable[[str, object], None], blocking=False, timeout=5,consumer_args={"no_ack": True, "prefetch_count": 5})

# Cancel the subscription
subscription.cancel()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

twingly_pyamqp-0.1.0.tar.gz (5.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

twingly_pyamqp-0.1.0-py3-none-any.whl (6.1 kB view details)

Uploaded Python 3

File details

Details for the file twingly_pyamqp-0.1.0.tar.gz.

File metadata

  • Download URL: twingly_pyamqp-0.1.0.tar.gz
  • Upload date:
  • Size: 5.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for twingly_pyamqp-0.1.0.tar.gz
Algorithm Hash digest
SHA256 45414e2f71d6871e88d39b31d2228b5f9d2c7b206fe5f6098b7dbaa70b8eb787
MD5 a4526fe20c29d3a72cd6e1c83e3d52bf
BLAKE2b-256 07b3da1d2ce8817fcb3215c644da10d51c14db4ab2e166a4709465756d433791

See more details on using hashes here.

Provenance

The following attestation bundles were made for twingly_pyamqp-0.1.0.tar.gz:

Publisher: cd.yml on twingly/twingly-pyamqp

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file twingly_pyamqp-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: twingly_pyamqp-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 6.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for twingly_pyamqp-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 403ffb1947ad2238ea1deb4155991842edd743c1a46c7f59550be9439b01e74e
MD5 1bd6b35d22e8fa556d8148edc6b8a76a
BLAKE2b-256 189491a3accbf657ee26d32a62b17bb70a190725d59f91a50d87d2c3b071411c

See more details on using hashes here.

Provenance

The following attestation bundles were made for twingly_pyamqp-0.1.0-py3-none-any.whl:

Publisher: cd.yml on twingly/twingly-pyamqp

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page