Universal queue abstraction library supporting RabbitMQ, AWS SQS, Azure Service Bus, and GCP Pub/Sub
Project description
Queue-Agnostic Python Library
A flexible, queue-agnostic Python library for processing documents (PDFs, images) that seamlessly works with different queue providers including RabbitMQ, AWS SQS, Azure Service Bus, and Google Cloud Pub/Sub.
๐ Features
- Provider Agnostic: Single interface for multiple queue providers
- Environment-Based Configuration: Easy deployment across different clients
- Support for Multiple Providers:
- RabbitMQ
- AWS SQS
- Azure Service Bus
- Google Cloud Pub/Sub
- Async/Await: Built on asyncio for high performance
- Automatic Connection Management: Built-in connection handling
- Error Handling: Automatic message retry/requeue on failures
- Easy to Extend: Add new queue providers by implementing the QueueInterface
๐ฆ Installation
pip install -e .
Or install from requirements:
pip install -r requirements.txt
๐ง Configuration
Environment Variables
Create a .env file:
# Set your queue provider
QUEUE_PROVIDER=rabbitmq # or aws-sqs, azure-servicebus, gcp-pubsub
# Set your queue/topic name
QUEUE_NAME=document-processing-queue
# For GCP: Topic name for publishing
TOPIC_NAME=document-processing-topic
# Provider-specific configuration (see .env.example for details)
RABBITMQ_URL=amqp://localhost:5672
๐ Usage
Quick Start - Subscriber
import asyncio
from dotenv import load_dotenv
from queue_agnostic import QueueFactory
load_dotenv()
async def process_message(message):
print('Processing:', message)
# Your document processing logic here
async def main():
# Create queue from environment variables
result = QueueFactory.create_from_env()
queue = result['adapter']
queue_name = result['queue_name']
await queue.connect()
# Subscribe and process messages
await queue.subscribe(queue_name, process_message)
asyncio.run(main())
Quick Start - Publisher
import asyncio
from dotenv import load_dotenv
from queue_agnostic import QueueFactory
load_dotenv()
async def main():
result = QueueFactory.create_from_env()
queue = result['adapter']
topic_name = result['topic_name']
await queue.connect()
# Publish a message
await queue.publish(topic_name, {
'document_id': 'doc-123',
'document_url': 'https://example.com/doc.pdf',
'document_type': 'pdf'
})
await queue.disconnect()
asyncio.run(main())
Direct Usage (Without Environment Variables)
from queue_agnostic import QueueFactory
# Create queue with explicit configuration
queue = QueueFactory.create({
'provider': 'rabbitmq',
'options': {
'url': 'amqp://localhost:5672'
}
})
await queue.connect()
# ... use the queue
await queue.disconnect()
๐๏ธ Architecture
Project Structure
queue-agnostic-python/
โโโ queue_agnostic/
โ โโโ __init__.py
โ โโโ queue_interface.py # Abstract interface
โ โโโ queue_factory.py # Factory for creating adapters
โ โโโ adapters/
โ โโโ __init__.py
โ โโโ rabbitmq_adapter.py # RabbitMQ implementation
โ โโโ aws_sqs_adapter.py # AWS SQS implementation
โ โโโ azure_servicebus_adapter.py # Azure implementation
โ โโโ gcp_pubsub_adapter.py # GCP implementation
โโโ examples/
โ โโโ publisher.py # Example publisher
โ โโโ subscriber.py # Example subscriber
โโโ setup.py
โโโ requirements.txt
โโโ README.md
๐ Queue Interface
All adapters implement:
Methods
connect()- Connect to the queue servicedisconnect()- Disconnect from the queue servicepublish(queue_name, message, options)- Publish a messagesubscribe(queue_name, handler, options)- Subscribe and process messagesis_connected()- Check connection status
โ๏ธ Provider-Specific Configuration
RabbitMQ
QUEUE_PROVIDER=rabbitmq
RABBITMQ_URL=amqp://localhost:5672
QUEUE_NAME=my-queue
AWS SQS
QUEUE_PROVIDER=aws-sqs
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=your_key
AWS_SECRET_ACCESS_KEY=your_secret
QUEUE_NAME=my-queue
Azure Service Bus
QUEUE_PROVIDER=azure-servicebus
AZURE_SERVICEBUS_CONNECTION_STRING=Endpoint=sb://...
QUEUE_NAME=my-queue
Google Cloud Pub/Sub
QUEUE_PROVIDER=gcp-pubsub
GCP_PROJECT_ID=your-project-id
GCP_KEY_FILENAME=./service-account-key.json
TOPIC_NAME=my-topic # For publishing
QUEUE_NAME=my-subscription # For subscribing
๐ Running Examples
Run Subscriber
python examples/subscriber.py
Run Publisher
python examples/publisher.py
๐ Deployment Scenarios
Same code, different configurations:
Client A (RabbitMQ):
QUEUE_PROVIDER=rabbitmq
RABBITMQ_URL=amqp://client-a-server:5672
Client B (AWS):
QUEUE_PROVIDER=aws-sqs
AWS_REGION=us-west-2
Client C (Azure):
QUEUE_PROVIDER=azure-servicebus
AZURE_SERVICEBUS_CONNECTION_STRING=Endpoint=sb://...
๐ Error Handling
All adapters include built-in error handling:
- Message Processing Errors: Messages are automatically requeued/nacked
- Connection Errors: Logged and can be handled with reconnection logic
- Graceful Shutdown: Proper cleanup on SIGINT/SIGTERM
๐ License
MIT
๐ค Contributing
Contributions are welcome! Feel free to submit issues or pull requests.
๐ Support
For issues or questions, please open an issue on the repository.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file queue_agnostic-1.0.0.tar.gz.
File metadata
- Download URL: queue_agnostic-1.0.0.tar.gz
- Upload date:
- Size: 11.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
72d44d9e8940a87d04fcd7bfe7b5841659b5df397ba6753e18c8bdfc34d2aeed
|
|
| MD5 |
f2cfc84cdbeded10a6e691d554f731e8
|
|
| BLAKE2b-256 |
fffe5bf5f660e292306e6364c7ff1a4df93358b813eae8eda832187efdac6e41
|
File details
Details for the file queue_agnostic-1.0.0-py3-none-any.whl.
File metadata
- Download URL: queue_agnostic-1.0.0-py3-none-any.whl
- Upload date:
- Size: 13.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dc600adb85b6fc7f86e2fd7cd831ce5bc633e66f13315b6fbcaf4195898f80b5
|
|
| MD5 |
7e6ef17d8cae1c7e8ae2359de4dbcc31
|
|
| BLAKE2b-256 |
6e94373d126b939232197213e8070dc714b49d5c9c3bc13f75e1cd3c3b9ba80e
|