Skip to main content

A messaging library supporting multiple backends

Project description

🌪️ ZephCast

A powerful and flexible messaging library for Python

Unified interface for Kafka, RabbitMQ, and Redis with both sync and async support

PyPI version Documentation Status Python versions License Tests Total Downloads Monthly Downloads Ruff Type Checking: mypy

🔄 One API, Multiple Brokers, Infinite Possibilities 🔄

ZephCast provides a clean, consistent API for working with multiple message brokers, making it easy to switch between them or use them together in your applications. Whether you need the robust features of RabbitMQ, the scalability of Kafka, or the simplicity of Redis, ZephCast has you covered with both synchronous and asynchronous interfaces.

📚 Documentation

Full documentation is available at zephcast.readthedocs.io.

✨ Features

🔄 Unified Interface

Consistent API across all message brokers

⚡ Async Support

Native async/await support for all clients

🧩 Modular Design

Install only the dependencies you need

🛡️ Type Safety

Full type hints support with mypy validation

🔄 Consumer Groups

Support for consumer groups in all brokers

🛠️ Error Handling

Robust error handling and recovery mechanisms

Supported Brokers

  • Apache Kafka: Industry-standard distributed streaming platform
  • RabbitMQ: Feature-rich message broker supporting multiple messaging patterns
  • Redis Streams: Lightweight, in-memory data structure store

📋 Requirements

  • Python 3.10+
  • Redis 5.0+ (for Redis Streams support)
  • Kafka 2.0+
  • RabbitMQ 3.8+

🔧 Installation

Basic Installation
# Install with poetry (recommended)
poetry add zephcast

# Install with pip
pip install zephcast
Optional Dependencies

ZephCast uses a modular dependency system. You can install only what you need:

# Install everything
pip install zephcast[all]
Broker-specific installations
# Install with specific broker support
pip install zephcast[kafka]    # Kafka support (sync and async)
pip install zephcast[rabbit]   # RabbitMQ support (sync and async)
pip install zephcast[redis]    # Redis support (sync and async)
Async-only installations
# Install only async support
pip install zephcast[async]    # All async clients
pip install zephcast[aio]      # Alias for async, all async clients
pip install zephcast[async-kafka]   # Only async Kafka
pip install zephcast[async-rabbit]  # Only async RabbitMQ
pip install zephcast[async-redis]   # Only async Redis
Sync-only installations
# Install only sync support
pip install zephcast[sync]     # All sync clients
pip install zephcast[sync-kafka]    # Only sync Kafka
pip install zephcast[sync-rabbit]   # Only sync RabbitMQ
pip install zephcast[sync-redis]    # Only sync Redis

🚀 Quick Start

Async Iterator Pattern

All async clients in ZephCast implement the async iterator pattern, allowing you to use them in async for loops:

async with client:  # Automatically connects and closes
    async for message in client:  # Uses receive() under the hood
        print(f"Received: {message}")

Kafka Example

from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig

async def kafka_example():
    # Create a client
    client = KafkaClient(
        stream_name="my-topic",
        config=KafkaConfig(
            bootstrap_servers="localhost:9092"
        )
    )
    
    # Using async context manager
    async with client:
        await client.send("Hello Kafka!")
        
        # Receive messages
        async for message in client:
            print(f"Received: {message}")
            break

RabbitMQ Example

from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig

async def rabbitmq_example():
    client = RabbitClient(
        stream_name="my-routing-key",
        config=RabbitConfig(
            queue_name="my-queue",
            rabbitmq_url="amqp://guest:guest@localhost:5672/"
        )
    )
    
    # Using async context manager
    async with client:
        # Send messages
        await client.send("Hello RabbitMQ!")
        
        # Receive messages
        async for message in client:
            print(f"Received: {message}")
            break

Redis Example

from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig

async def redis_example():
    client = RedisClient(
        stream_name="my-stream",
        config=RedisConfig(
            redis_url="redis://localhost:6379"
        )
    )
    
    # Using async context manager
    async with client:
        # Send messages
        await client.send("Hello Redis!")
        
        # Receive messages
        async for message in client:
            print(f"Received: {message}")
            break

⚙️ Configuration

Environment Variables

ZephCast automatically reads configuration from environment variables:

  • KAFKA_BOOTSTRAP_SERVERS: Kafka bootstrap servers (default: "localhost:9092")
  • RABBITMQ_URL: RabbitMQ connection URL (default: "amqp://guest:guest@localhost:5672/")
  • REDIS_URL: Redis connection URL (default: "redis://localhost:6379")

Client Configuration

Each client uses a dedicated config class for type-safe configuration:

Kafka Config

from zephcast.aio.kafka.config import KafkaConfig

config = KafkaConfig(
    bootstrap_servers="localhost:9092",
    group_id="my-group",
    auto_offset_reset="earliest",
    security_protocol="PLAINTEXT",
    # SASL authentication
    sasl_mechanism="PLAIN",
    sasl_plain_username="user",
    sasl_plain_password="password"
)

RabbitMQ Config

from zephcast.aio.rabbit.config import RabbitConfig

config = RabbitConfig(
    rabbitmq_url="amqp://guest:guest@localhost:5672/",
    exchange_name="my-exchange",
    exchange_type="direct",  # direct, fanout, topic, headers
    queue_name="my-queue",
    durable=True,
    auto_delete=False
)

Redis Config

from zephcast.aio.redis.config import RedisConfig

config = RedisConfig(
    redis_url="redis://localhost:6379",
    stream_max_len=1000,  # Maximum stream length
    consumer_group="my-group",
    consumer_name="consumer-1",
    block_ms=5000  # Blocking time in milliseconds
)

🔍 Advanced Usage

Consumer Groups

All clients support consumer groups for distributed message processing:

# Kafka Consumer Group
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig

client = KafkaClient(
    stream_name="my-topic",
    config=KafkaConfig(
        bootstrap_servers="localhost:9092",
        group_id="my-group"
    )
)

# RabbitMQ Consumer Group
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig

client = RabbitClient(
    stream_name="my-routing-key",
    config=RabbitConfig(
        queue_name="my-queue",
        rabbitmq_url="amqp://guest:guest@localhost:5672/",
        consumer_group="my-group"
    )
)

# Redis Consumer Group
from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig

client = RedisClient(
    stream_name="my-stream",
    config=RedisConfig(
        redis_url="redis://localhost:6379",
        consumer_group="my-group"
    )
)

Error Handling

ZephCast provides robust error handling mechanisms:

from zephcast.core.exceptions import ZephCastError, ConnectionError

try:
    async with client:
        await client.send("message")
        async for message in client:
            process_message(message)
except ConnectionError:
    # Handle connection errors
    logger.error("Connection failed")
except TimeoutError:
    # Handle timeout errors
    logger.error("Operation timed out")
except ZephCastError as e:
    # Handle ZephCast-specific errors
    logger.error(f"ZephCast error: {e}")
except Exception as e:
    # Handle other errors
    logger.error(f"Unexpected error: {e}")

Retry Mechanisms

ZephCast includes built-in retry mechanisms for handling transient failures:

from zephcast.aio.retry import RetryConfig

# Configure retry behavior
retry_config = RetryConfig(
    max_retries=3,
    retry_delay=1.0,  # seconds
    backoff_factor=2.0,
    exceptions=(ConnectionError, TimeoutError)
)

# Apply retry to client operations
from zephcast.aio.retry import with_retry

@with_retry(retry_config)
async def send_with_retry(client, message):
    await client.send(message)

## 👥 Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

1. Fork the repository
2. Create your feature branch (`git checkout -b feature/amazing-feature`)
3. Commit your changes (`git commit -m 'Add some amazing feature'`)
4. Push to the branch (`git push origin feature/amazing-feature`)
5. Open a Pull Request

Please read [CONTRIBUTING.md](CONTRIBUTING.md) for details on our code of conduct and the process for submitting pull requests.

## 📄 License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

## 🙏 Acknowledgments

- Thanks to all contributors who have helped shape ZephCast
- Inspired by the need for a unified messaging interface across different brokers

## Development

### Prerequisites

- Python 3.10+
- Poetry
- Docker (for running integration tests)

### Setup

```bash
# Clone the repository
git clone https://github.com/zbytealchemy/zephcast.git
cd zephcast

# Install dependencies
make install

# Run unit tests
make unit-test

Running Integration Tests

Start the required services:

docker-compose up -d

Run the integration tests:

make integration-test

Contributing

We use rebase workflow for pull requests and allow no more then 2 commits per PR.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgments

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zephcast-0.6.0.tar.gz (25.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

zephcast-0.6.0-py3-none-any.whl (36.3 kB view details)

Uploaded Python 3

File details

Details for the file zephcast-0.6.0.tar.gz.

File metadata

  • Download URL: zephcast-0.6.0.tar.gz
  • Upload date:
  • Size: 25.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.10.16 Linux/6.8.0-1021-azure

File hashes

Hashes for zephcast-0.6.0.tar.gz
Algorithm Hash digest
SHA256 31437773a605558ed1247898a15546cf16b4a5a893c7204654f78e9ce9e924e3
MD5 5d188e639f39e7441b44555bd83e0449
BLAKE2b-256 0dde865d8f3b16a4f14cd06630dcd3adc42550ff326ad375e41f1c674cbd0441

See more details on using hashes here.

File details

Details for the file zephcast-0.6.0-py3-none-any.whl.

File metadata

  • Download URL: zephcast-0.6.0-py3-none-any.whl
  • Upload date:
  • Size: 36.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.10.16 Linux/6.8.0-1021-azure

File hashes

Hashes for zephcast-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cf17c93955da54fdc5b0d8189ec8d4a791dc01dfb168b5a16d6f3ccab72b88a8
MD5 a066c359711fa84a66ca6ed570a6233b
BLAKE2b-256 b400530ac1eddefb67264f7ccd1d8f40f8489f775fbf5fb4af1dc4e515c4310f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page