Skip to main content

AetherMagic - Multi-protocol communications library with channel support (MQTT, Redis, WebSocket, ZeroMQ, RabbitMQ)

Project description

AetherMagic - Multi-Protocol Microservices Communication

Python 3.8+ License: MIT

AetherMagic is a powerful multi-protocol communication library for microservices, providing a unified API for different transport mechanisms including MQTT, Redis, HTTP/WebSocket, and ZeroMQ.

Supported Protocols

1. MQTT (original) - Best for Task Distribution

  • Lightweight protocol for IoT and microservices
  • Shared subscriptions for automatic load balancing
  • Built-in task distribution among multiple workers
  • SSL/TLS encryption

2. Redis Pub/Sub + Streams

  • Pub/Sub: Fast in-memory message delivery
  • Streams: Reliable delivery with load-balanced consumer groups
  • High performance, atomic operations

3. WebSocket

  • Real-time bidirectional communication
  • Built-in load balancing with random client selection
  • Web interface compatibility

4. ZeroMQ

  • High-performance messaging with natural load balancing
  • PUSH/PULL pattern for automatic task distribution
  • Brokerless architecture

Installation

Basic Installation

pip install aethermagic

Protocol-Specific Installation

Install only the protocols you need:

# For MQTT support
pip install aethermagic[mqtt]

# For Redis support  
pip install aethermagic[redis]

# For WebSocket support
pip install aethermagic[websocket]

# For ZeroMQ support
pip install aethermagic[zeromq]

# For RabbitMQ support
pip install aethermagic[rabbitmq]

# Install all protocols
pip install aethermagic[all]

Usage

Quick Start with Different Protocols

import asyncio
from aethermagic import AetherMagic, ProtocolType, AetherTask

# MQTT (original)
aether_mqtt = AetherMagic(
    protocol_type=ProtocolType.MQTT,
    host='localhost',
    port=1883
)

# Redis Pub/Sub
aether_redis = AetherMagic(
    protocol_type=ProtocolType.REDIS,
    host='localhost',
    port=6379
)

# Redis Streams (with reliable delivery)
aether_streams = AetherMagic(
    protocol_type=ProtocolType.REDIS,
    host='localhost',
    port=6379,
    use_streams=True,
    consumer_group='workers'
)

# HTTP/WebSocket
aether_http = AetherMagic(
    protocol_type=ProtocolType.HTTP,
    host='localhost',
    port=8080,
    mode='client'  # or 'server'
)

# ZeroMQ
aether_zmq = AetherMagic(
    protocol_type=ProtocolType.ZEROMQ,
    host='localhost',
    port=5555,
    pattern='pubsub'  # or 'pushpull', 'reqrep'
)

Creating a Worker

async def handle_task(ae_task, data):
    print(f"Processing: {data}")
    
    # Send intermediate status
    await ae_task.status(50)
    
    # Simulate work
    await asyncio.sleep(2)
    
    # Complete task
    await ae_task.complete(True, {"result": "success"})

# Create task
task = AetherTask(
    job='my_service',
    task='process_data', 
    context='production',
    on_perform=handle_task
)

# Register worker
await task.idle()

# Start main loop
await aether.main()

Sending Tasks from Client

async def on_status(ae_task, complete, succeed, progress, data):
    if complete:
        print(f"Task finished: {succeed}")
    else:
        print(f"Progress: {progress}%")

async def on_complete(ae_task, succeed, data):
    print(f"Result: {data}")

# Create client task
client_task = AetherTask(
    job='my_service',
    task='process_data',
    context='production',
    on_status=on_status,
    on_complete=on_complete
)

# Send task
await client_task.perform({
    "input_file": "/path/to/data.csv",
    "options": {"format": "json"}
})

Task Distribution & Load Balancing

AetherMagic automatically distributes tasks among multiple workers for all protocols:

MQTT with Shared Subscriptions

# Multiple workers subscribe to shared topic
# $share/union_job_workgroup/union/job/task/context/+/perform
await aether.listen('image-process', 'resize', 'batch1', 'perform', 'workers', handle_task)

Redis with Consumer Groups

# Atomic LPUSH/BRPOP operations ensure single delivery
await aether.listen('image-process', 'resize', 'batch1', 'perform', 'workers', handle_task)

WebSocket with Random Selection

# Tasks distributed randomly among connected clients
await aether.listen('image-process', 'resize', 'batch1', 'perform', 'workers', handle_task)  

ZeroMQ PUSH/PULL Pattern

# Natural load balancing with PUSH/PULL sockets
await aether.listen('image-process', 'resize', 'batch1', 'perform', 'workers', handle_task)

Key Benefits:

  • 🔄 Automatic load balancing - tasks distributed among available workers
  • 🚫 No duplicate processing - each task handled by exactly one worker
  • 📈 Horizontal scaling - add more workers to increase capacity
  • 🛡️ Fault tolerance - failed workers don't block other workers

See TASK_DISTRIBUTION.md for detailed examples.

Protocol Selection Guide

MQTT

Use when:

  • Need IoT device compatibility
  • Require lightweight protocol
  • Have bandwidth constraints

Redis Pub/Sub

Use when:

  • Need maximum speed
  • Already have Redis in infrastructure
  • Message loss on failures is acceptable

Redis Streams

Use when:

  • Need reliable message delivery
  • Require load balancing between workers
  • Message persistence is important

HTTP/WebSocket

Use when:

  • Need web application integration
  • Require HTTP proxy/load balancer compatibility
  • Need REST API for external systems

ZeroMQ

Use when:

  • Maximum performance is critical
  • Don't want external broker dependencies
  • Need specific communication patterns

Examples

See examples.py for complete usage examples of each protocol:

# Run Redis example
python examples.py redis

# Run HTTP/WebSocket example  
python examples.py http

# Run ZeroMQ example
python examples.py zeromq

# Multi-protocol example
python examples.py multi

Configuration

Common Parameters

config = ConnectionConfig(
    protocol_type=ProtocolType.REDIS,
    host='localhost',
    port=6379,
    ssl=False,
    username='user',
    password='pass',
    union='my_app',  # namespace for application
    timeout=30,
    keepalive=60
)

Protocol-Specific Parameters

Redis Streams

aether = AetherMagic(
    protocol_type=ProtocolType.REDIS,
    use_streams=True,
    consumer_group='workers',
    consumer_name='worker_1'
)

HTTP/WebSocket

aether = AetherMagic(
    protocol_type=ProtocolType.HTTP,
    mode='server',  # or 'client'
    ssl=True
)

ZeroMQ

aether = AetherMagic(
    protocol_type=ProtocolType.ZEROMQ,
    pattern='pushpull',  # 'pubsub', 'reqrep', 'all'
    server_mode=True
)

Performance Comparison

Protocol Throughput Latency Reliability Complexity
MQTT Medium Low High Low
Redis Pub/Sub High Very Low Medium Low
Redis Streams High Low High Medium
HTTP/WebSocket Medium Medium High Medium
ZeroMQ Very High Very Low Medium High

Backward Compatibility

Existing MQTT code continues to work without changes:

# Old code
from aethermagic import AetherMagic

aether = AetherMagic(server="localhost", port=1883)

The new API is fully compatible and adds additional capabilities.

Load Balancing Support

AetherMagic now supports load-balanced task distribution to ensure each task is processed by only one worker:

Protocol-Specific Load Balancing

Redis Protocol

  • RedisLoadBalancedProtocol: Uses Redis lists (LPUSH/BRPOP) for atomic task distribution
  • Each task goes to exactly one available worker
  • FIFO processing with blocking pop operations
from aethermagic.protocols.redis_protocol import RedisLoadBalancedProtocol

worker = RedisLoadBalancedProtocol(config, consumer_id="worker_1")
await worker.subscribe_to_tasks("job", "task", "context", callback)

MQTT Protocol

  • Uses shared subscriptions with $share prefix
  • Broker distributes messages among group subscribers
  • Built-in load balancing at protocol level

ZeroMQ Protocol

  • Uses PUSH/PULL socket pattern for task distribution
  • Round-robin delivery to connected workers
  • No message duplication - perfect for task distribution

WebSocket Protocol

  • Implements random selection among subscribed clients
  • Tasks with 'shared:' or 'tasks:' prefixes are load balanced
  • Single delivery guaranteed per task

Multi-Protocol Load Balancing API

Use the new convenience methods for load-balanced task processing:

from aethermagic import AetherMagic, ProtocolType

# Create workers  
worker = AetherMagic(protocol_type=ProtocolType.REDIS, host="localhost", port=6379)
await worker.connect()

# Add load-balanced task handler
await worker.add_task(
    job="processing",
    task="compute", 
    context="demo",
    callback=task_handler,
    shared=True  # Enable load balancing
)

# Publish load-balanced tasks
await worker.perform_task(
    job="processing",
    task="compute",
    context="demo", 
    data={"work": "data"},
    shared=True  # Only one worker will receive this
)

Single Delivery Guarantees

  • Redis: Atomic LPUSH/BRPOP operations ensure single delivery
  • MQTT: Broker's shared subscription handles distribution

Multi-Protocol Setup (Channel Support)

AetherMagic supports running multiple protocols simultaneously using channel isolation:

import asyncio
from aethermagic import AetherMagic, AetherTask

async def multi_protocol_service():
    # MQTT for general messaging
    mqtt_service = AetherMagic(
        protocol_type='mqtt',
        host='mqtt.example.com',
        port=8883,
        ssl=True,
        union='production',
        channel='messaging'  # 🔥 Channel identifier
    )
    
    # Redis for caching and AI tasks
    redis_service = AetherMagic(
        protocol_type='redis', 
        host='redis.example.com',
        port=6379,
        ssl=True,
        union='production',
        channel='caching'  # 🔥 Different channel
    )
    
    # Start both services
    mqtt_task = asyncio.create_task(mqtt_service.main())
    redis_task = asyncio.create_task(redis_service.main())
    
    # Create tasks for different channels
    mqtt_task = AetherTask(
        job='notifications',
        task='send_email',
        channel='messaging'  # Uses MQTT
    )
    
    redis_task = AetherTask(
        job='ai',
        task='generate_image', 
        channel='caching'  # Uses Redis
    )
    
    # Execute tasks
    await mqtt_task.perform({'email': 'user@example.com'})
    await redis_task.perform({'prompt': 'sunset landscape'})

# Run the service
asyncio.run(multi_protocol_service())

Dependencies

AetherMagic automatically installs protocol-specific dependencies when you install the optional packages:

  • MQTT: asyncio-mqtt, paho-mqtt, aiomqtt
  • Redis: redis[hiredis] (includes high-performance hiredis parser)
  • WebSocket: aiohttp, websockets
  • ZeroMQ: pyzmq
  • RabbitMQ: aio-pika
  • ZeroMQ: PUSH/PULL pattern is inherently load-balanced
  • WebSocket: Random selection with connection tracking

All protocols now support the shared=True parameter for load-balanced task distribution.

See load_balanced_demo.py for complete working examples.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aethermagic-0.2.7.tar.gz (57.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aethermagic-0.2.7-py3-none-any.whl (33.4 kB view details)

Uploaded Python 3

File details

Details for the file aethermagic-0.2.7.tar.gz.

File metadata

  • Download URL: aethermagic-0.2.7.tar.gz
  • Upload date:
  • Size: 57.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.1

File hashes

Hashes for aethermagic-0.2.7.tar.gz
Algorithm Hash digest
SHA256 a8b2c55e86e2ae805f4b444d4cf57cce69d4a1bac7553c0de5d4193821712771
MD5 c2c6ee7a502a39ea2e93b1239fca8c4e
BLAKE2b-256 4bbdc3fc3959ee26fdb2283d7a4707535b9c340e7cfd9b91f0a7e2831b9788bd

See more details on using hashes here.

File details

Details for the file aethermagic-0.2.7-py3-none-any.whl.

File metadata

  • Download URL: aethermagic-0.2.7-py3-none-any.whl
  • Upload date:
  • Size: 33.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.1

File hashes

Hashes for aethermagic-0.2.7-py3-none-any.whl
Algorithm Hash digest
SHA256 f6afce75daac1f8120e2deedc6703cc511c2b8eb9e9273627f5c1402cdafd03b
MD5 28a130ebfdcaf678d019b160d7f9f041
BLAKE2b-256 cfe2d5187eeffcd203afc1045374e0134d27d3dfb8c5f83e8d6208cc4cc93dc7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page