Skip to main content

A messaging library supporting multiple backends

Project description

ZephyrFlow

ZephyrFlow is a powerful and flexible messaging library that provides a unified interface for working with multiple message brokers. It currently supports Kafka, RabbitMQ, and Redis, offering both synchronous and asynchronous clients.

Features

  • Multiple Broker Support:
    • Apache Kafka
    • RabbitMQ
    • Redis Streams
  • Unified Interface: Consistent API across all message brokers
  • Async Support: Native async/await support for all clients
  • Type Safety: Full type hints support
  • Error Handling: Robust error handling and recovery mechanisms
  • Consumer Groups: Support for consumer groups in Kafka and RabbitMQ
  • Exchange Bindings: Advanced RabbitMQ exchange and queue bindings
  • Stream Processing: Redis Streams support for stream processing

Requirements

  • Python 3.8+
  • Redis 5.0+ (for Redis Streams support)
  • Kafka 2.0+
  • RabbitMQ 3.8+

Installation

# Install with poetry (recommended)
poetry add zephcast

# Install with pip
pip install zephcast

Quick Start

Async Iterator Pattern

All async clients in ZephyrFlow implement the async iterator pattern, allowing you to use them in async for loops:

async with client:  # Automatically connects and closes
    async for message in client:  # Uses receive() under the hood
        print(f"Received: {message}")

Kafka Example

from zephcast.kafka.async_client import AsyncKafkaClient

async def kafka_example():
    # Create a client
    client = AsyncKafkaClient(
        stream_name="my-topic",
        bootstrap_servers="localhost:9092"
    )
    
    # Using async context manager
    async with client:
        # Send messages
        await client.send("Hello Kafka!")
        
        # Receive messages
        async for message in client:
            print(f"Received: {message}")

RabbitMQ Example

from zephcast.rabbit.async_client import AsyncRabbitClient

async def rabbitmq_example():
    # Create a client
    client = AsyncRabbitClient(
        stream_name="my-routing-key",
        queue_name="my-queue",
        rabbitmq_url="amqp://guest:guest@localhost:5672/"
    )
    
    # Connect
    await client.connect()
    
    # Send messages
    await client.send("Hello RabbitMQ!")
    
    # Receive messages
    async for message in client.receive():
        print(f"Received: {message}")
        break
    
    # Close connection
    await client.close()

Redis Example

from zephcast.redis.async_client import AsyncRedisClient

async def redis_example():
    # Create a client
    client = AsyncRedisClient(
        stream_name="my-stream",
        redis_url="redis://localhost:6379"
    )
    
    # Connect
    await client.connect()
    
    # Send messages
    await client.send("Hello Redis!")
    
    # Receive messages
    async for message in client.receive():
        print(f"Received: {message}")
        break
    
    # Close connection
    await client.close()

Configuration

Environment Variables

  • KAFKA_BOOTSTRAP_SERVERS: Kafka bootstrap servers (default: "localhost:9092")
  • RABBITMQ_URL: RabbitMQ connection URL (default: "amqp://guest:guest@localhost:5672/")
  • REDIS_URL: Redis connection URL (default: "redis://localhost:6379")

Client Configuration

Each client accepts additional configuration parameters:

Kafka Client

  • group_id: Consumer group ID
  • auto_offset_reset: Offset reset strategy
  • security_protocol: Security protocol
  • sasl_mechanism: SASL mechanism
  • sasl_plain_username: SASL username
  • sasl_plain_password: SASL password

RabbitMQ Client

  • exchange_name: Exchange name
  • exchange_type: Exchange type
  • routing_key: Routing key
  • queue_name: Queue name
  • durable: Queue durability
  • auto_delete: Auto-delete queue

Redis Client

  • stream_max_len: Maximum stream length
  • consumer_group: Consumer group name
  • consumer_name: Consumer name
  • block_ms: Blocking time in milliseconds

Advanced Usage

Consumer Groups

# Kafka Consumer Group
client = AsyncKafkaClient(
    stream_name="my-topic",
    group_id="my-group",
    bootstrap_servers="localhost:9092"
)

# RabbitMQ Consumer Group
client = AsyncRabbitClient(
    stream_name="my-routing-key",
    queue_name="my-queue",
    consumer_group="my-group",
    rabbitmq_url="amqp://guest:guest@localhost:5672/"
)

# Redis Consumer Group
client = AsyncRedisClient(
    stream_name="my-stream",
    consumer_group="my-group",
    redis_url="redis://localhost:6379"
)

Error Handling

try:
    await client.connect()
    await client.send("message")
except ConnectionError:
    # Handle connection errors
    pass
except TimeoutError:
    # Handle timeout errors
    pass
except Exception as e:
    # Handle other errors
    pass
finally:
    await client.close()

Development

Prerequisites

  • Python 3.10+
  • Poetry
  • Docker (for running integration tests)

Setup

# Clone the repository
git clone https://github.com/zbytealchemy/zephcast.git
cd zephcast

# Install dependencies
poetry install

# Run tests
poetry run pytest

Running Integration Tests

Start the required services:

docker-compose up -d

Run the integration tests:

poetry run pytest tests/integration

Contributing

We use rebase workflow for pull requests and allow no more then 2 commits per PR.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgments

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zephcast-0.1.0.tar.gz (17.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

zephcast-0.1.0-py3-none-any.whl (24.3 kB view details)

Uploaded Python 3

File details

Details for the file zephcast-0.1.0.tar.gz.

File metadata

  • Download URL: zephcast-0.1.0.tar.gz
  • Upload date:
  • Size: 17.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.10.16 Linux/6.8.0-1017-azure

File hashes

Hashes for zephcast-0.1.0.tar.gz
Algorithm Hash digest
SHA256 98c8a6ad8d81c0437311d65c9dfa717d8793e133b53cba4dd4fbe602b9ce6de2
MD5 9be8702a0b24a0e471831247a342a472
BLAKE2b-256 a48f99fb4bea88aa1ff14d8520d38c8a17bf6a445dc74c02292bffa65a8f4b40

See more details on using hashes here.

File details

Details for the file zephcast-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: zephcast-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 24.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.10.16 Linux/6.8.0-1017-azure

File hashes

Hashes for zephcast-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ea2706752d5f8ae6267d18003eee852e7c203d5e90fae0e386192b81b25edc09
MD5 1b519da92d2f55d8e53a68fdc13b39b0
BLAKE2b-256 6299184e2de08acc4325454f8fe76bd01fdedb78af1d04411042c0037ac761ec

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page