Skip to main content

Universal queue abstraction library supporting RabbitMQ, AWS SQS, Azure Service Bus, and GCP Pub/Sub

Project description

Queue-Agnostic Python Library

A flexible, queue-agnostic Python library for processing documents (PDFs, images) that seamlessly works with different queue providers including RabbitMQ, AWS SQS, Azure Service Bus, and Google Cloud Pub/Sub.

๐Ÿš€ Features

  • Provider Agnostic: Single interface for multiple queue providers
  • Environment-Based Configuration: Easy deployment across different clients
  • Support for Multiple Providers:
    • RabbitMQ
    • AWS SQS
    • Azure Service Bus
    • Google Cloud Pub/Sub
  • Async/Await: Built on asyncio for high performance
  • Automatic Connection Management: Built-in connection handling
  • Error Handling: Automatic message retry/requeue on failures
  • Easy to Extend: Add new queue providers by implementing the QueueInterface

๐Ÿ“ฆ Installation

pip install -e .

Or install from requirements:

pip install -r requirements.txt

๐Ÿ”ง Configuration

Environment Variables

Create a .env file:

# Set your queue provider
QUEUE_PROVIDER=rabbitmq  # or aws-sqs, azure-servicebus, gcp-pubsub

# Set your queue/topic name
QUEUE_NAME=document-processing-queue

# For GCP: Topic name for publishing
TOPIC_NAME=document-processing-topic

# Provider-specific configuration (see .env.example for details)
RABBITMQ_URL=amqp://localhost:5672

๐Ÿ“– Usage

Quick Start - Subscriber

import asyncio
from dotenv import load_dotenv
from queue_agnostic import QueueFactory

load_dotenv()

async def process_message(message):
    print('Processing:', message)
    # Your document processing logic here

async def main():
    # Create queue from environment variables
    result = QueueFactory.create_from_env()
    queue = result['adapter']
    queue_name = result['queue_name']
    
    await queue.connect()
    
    # Subscribe and process messages
    await queue.subscribe(queue_name, process_message)

asyncio.run(main())

Quick Start - Publisher

import asyncio
from dotenv import load_dotenv
from queue_agnostic import QueueFactory

load_dotenv()

async def main():
    result = QueueFactory.create_from_env()
    queue = result['adapter']
    topic_name = result['topic_name']
    
    await queue.connect()
    
    # Publish a message
    await queue.publish(topic_name, {
        'document_id': 'doc-123',
        'document_url': 'https://example.com/doc.pdf',
        'document_type': 'pdf'
    })
    
    await queue.disconnect()

asyncio.run(main())

Direct Usage (Without Environment Variables)

from queue_agnostic import QueueFactory

# Create queue with explicit configuration
queue = QueueFactory.create({
    'provider': 'rabbitmq',
    'options': {
        'url': 'amqp://localhost:5672'
    }
})

await queue.connect()
# ... use the queue
await queue.disconnect()

๐Ÿ—๏ธ Architecture

Project Structure

queue-agnostic-python/
โ”œโ”€โ”€ queue_agnostic/
โ”‚   โ”œโ”€โ”€ __init__.py
โ”‚   โ”œโ”€โ”€ queue_interface.py          # Abstract interface
โ”‚   โ”œโ”€โ”€ queue_factory.py            # Factory for creating adapters
โ”‚   โ””โ”€โ”€ adapters/
โ”‚       โ”œโ”€โ”€ __init__.py
โ”‚       โ”œโ”€โ”€ rabbitmq_adapter.py     # RabbitMQ implementation
โ”‚       โ”œโ”€โ”€ aws_sqs_adapter.py      # AWS SQS implementation
โ”‚       โ”œโ”€โ”€ azure_servicebus_adapter.py  # Azure implementation
โ”‚       โ””โ”€โ”€ gcp_pubsub_adapter.py   # GCP implementation
โ”œโ”€โ”€ examples/
โ”‚   โ”œโ”€โ”€ publisher.py                # Example publisher
โ”‚   โ””โ”€โ”€ subscriber.py               # Example subscriber
โ”œโ”€โ”€ setup.py
โ”œโ”€โ”€ requirements.txt
โ””โ”€โ”€ README.md

๐Ÿ”Œ Queue Interface

All adapters implement:

Methods

  • connect() - Connect to the queue service
  • disconnect() - Disconnect from the queue service
  • publish(queue_name, message, options) - Publish a message
  • subscribe(queue_name, handler, options) - Subscribe and process messages
  • is_connected() - Check connection status

โš™๏ธ Provider-Specific Configuration

RabbitMQ

QUEUE_PROVIDER=rabbitmq
RABBITMQ_URL=amqp://localhost:5672
QUEUE_NAME=my-queue

AWS SQS

QUEUE_PROVIDER=aws-sqs
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=your_key
AWS_SECRET_ACCESS_KEY=your_secret
QUEUE_NAME=my-queue

Azure Service Bus

QUEUE_PROVIDER=azure-servicebus
AZURE_SERVICEBUS_CONNECTION_STRING=Endpoint=sb://...
QUEUE_NAME=my-queue

Google Cloud Pub/Sub

QUEUE_PROVIDER=gcp-pubsub
GCP_PROJECT_ID=your-project-id
GCP_KEY_FILENAME=./service-account-key.json
TOPIC_NAME=my-topic              # For publishing
QUEUE_NAME=my-subscription       # For subscribing

๐Ÿš€ Running Examples

Run Subscriber

python examples/subscriber.py

Run Publisher

python examples/publisher.py

๐Ÿ”„ Deployment Scenarios

Same code, different configurations:

Client A (RabbitMQ):

QUEUE_PROVIDER=rabbitmq
RABBITMQ_URL=amqp://client-a-server:5672

Client B (AWS):

QUEUE_PROVIDER=aws-sqs
AWS_REGION=us-west-2

Client C (Azure):

QUEUE_PROVIDER=azure-servicebus
AZURE_SERVICEBUS_CONNECTION_STRING=Endpoint=sb://...

๐Ÿ› Error Handling

All adapters include built-in error handling:

  • Message Processing Errors: Messages are automatically requeued/nacked
  • Connection Errors: Logged and can be handled with reconnection logic
  • Graceful Shutdown: Proper cleanup on SIGINT/SIGTERM

๐Ÿ“ License

MIT

๐Ÿค Contributing

Contributions are welcome! Feel free to submit issues or pull requests.

๐Ÿ“ž Support

For issues or questions, please open an issue on the repository.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

queue_agnostic-1.0.0.tar.gz (11.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

queue_agnostic-1.0.0-py3-none-any.whl (13.6 kB view details)

Uploaded Python 3

File details

Details for the file queue_agnostic-1.0.0.tar.gz.

File metadata

  • Download URL: queue_agnostic-1.0.0.tar.gz
  • Upload date:
  • Size: 11.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.2

File hashes

Hashes for queue_agnostic-1.0.0.tar.gz
Algorithm Hash digest
SHA256 72d44d9e8940a87d04fcd7bfe7b5841659b5df397ba6753e18c8bdfc34d2aeed
MD5 f2cfc84cdbeded10a6e691d554f731e8
BLAKE2b-256 fffe5bf5f660e292306e6364c7ff1a4df93358b813eae8eda832187efdac6e41

See more details on using hashes here.

File details

Details for the file queue_agnostic-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: queue_agnostic-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 13.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.2

File hashes

Hashes for queue_agnostic-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 dc600adb85b6fc7f86e2fd7cd831ce5bc633e66f13315b6fbcaf4195898f80b5
MD5 7e6ef17d8cae1c7e8ae2359de4dbcc31
BLAKE2b-256 6e94373d126b939232197213e8070dc714b49d5c9c3bc13f75e1cd3c3b9ba80e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page