Skip to main content

A messaging library supporting multiple backends

Project description

ZephCast

ZephCast is a powerful and flexible messaging library that provides a unified interface for working with multiple message brokers. It currently supports Kafka, RabbitMQ, and Redis, offering both synchronous and asynchronous clients.

Features

  • Multiple Broker Support:
    • Apache Kafka
    • RabbitMQ
    • Redis Streams
  • Unified Interface: Consistent API across all message brokers
  • Async Support: Native async/await support for all clients
  • Type Safety: Full type hints support
  • Error Handling: Robust error handling and recovery mechanisms
  • Consumer Groups: Support for consumer groups in Kafka and RabbitMQ
  • Exchange Bindings: Advanced RabbitMQ exchange and queue bindings
  • Stream Processing: Redis Streams support for stream processing

Requirements

  • Python 3.8+
  • Redis 5.0+ (for Redis Streams support)
  • Kafka 2.0+
  • RabbitMQ 3.8+

Installation

# Install with poetry (recommended)
poetry add zephcast

# Install with pip
pip install zephcast

Quick Start

Async Iterator Pattern

All async clients in ZephCast:w implement the async iterator pattern, allowing you to use them in async for loops:

async with client:  # Automatically connects and closes
    async for message in client:  # Uses receive() under the hood
        print(f"Received: {message}")

Kafka Example

from zephcast.kafka.async_client import AsyncKafkaClient

async def kafka_example():
    # Create a client
    client = AsyncKafkaClient(
        stream_name="my-topic",
        bootstrap_servers="localhost:9092"
    )
    
    # Using async context manager
    async with client:
        # Send messages
        await client.send("Hello Kafka!")
        
        # Receive messages
        async for message in client:
            print(f"Received: {message}")

RabbitMQ Example

from zephcast.rabbit.async_client import AsyncRabbitClient

async def rabbitmq_example():
    # Create a client
    client = AsyncRabbitClient(
        stream_name="my-routing-key",
        queue_name="my-queue",
        rabbitmq_url="amqp://guest:guest@localhost:5672/"
    )
    
    # Connect
    await client.connect()
    
    # Send messages
    await client.send("Hello RabbitMQ!")
    
    # Receive messages
    async for message in client.receive():
        print(f"Received: {message}")
        break
    
    # Close connection
    await client.close()

Redis Example

from zephcast.redis.async_client import AsyncRedisClient

async def redis_example():
    # Create a client
    client = AsyncRedisClient(
        stream_name="my-stream",
        redis_url="redis://localhost:6379"
    )
    
    # Connect
    await client.connect()
    
    # Send messages
    await client.send("Hello Redis!")
    
    # Receive messages
    async for message in client.receive():
        print(f"Received: {message}")
        break
    
    # Close connection
    await client.close()

Configuration

Environment Variables

  • KAFKA_BOOTSTRAP_SERVERS: Kafka bootstrap servers (default: "localhost:9092")
  • RABBITMQ_URL: RabbitMQ connection URL (default: "amqp://guest:guest@localhost:5672/")
  • REDIS_URL: Redis connection URL (default: "redis://localhost:6379")

Client Configuration

Each client accepts additional configuration parameters:

Kafka Client

  • group_id: Consumer group ID
  • auto_offset_reset: Offset reset strategy
  • security_protocol: Security protocol
  • sasl_mechanism: SASL mechanism
  • sasl_plain_username: SASL username
  • sasl_plain_password: SASL password

RabbitMQ Client

  • exchange_name: Exchange name
  • exchange_type: Exchange type
  • routing_key: Routing key
  • queue_name: Queue name
  • durable: Queue durability
  • auto_delete: Auto-delete queue

Redis Client

  • stream_max_len: Maximum stream length
  • consumer_group: Consumer group name
  • consumer_name: Consumer name
  • block_ms: Blocking time in milliseconds

Advanced Usage

Consumer Groups

# Kafka Consumer Group
client = AsyncKafkaClient(
    stream_name="my-topic",
    group_id="my-group",
    bootstrap_servers="localhost:9092"
)

# RabbitMQ Consumer Group
client = AsyncRabbitClient(
    stream_name="my-routing-key",
    queue_name="my-queue",
    consumer_group="my-group",
    rabbitmq_url="amqp://guest:guest@localhost:5672/"
)

# Redis Consumer Group
client = AsyncRedisClient(
    stream_name="my-stream",
    consumer_group="my-group",
    redis_url="redis://localhost:6379"
)

Error Handling

try:
    await client.connect()
    await client.send("message")
except ConnectionError:
    # Handle connection errors
    pass
except TimeoutError:
    # Handle timeout errors
    pass
except Exception as e:
    # Handle other errors
    pass
finally:
    await client.close()

Development

Prerequisites

  • Python 3.10+
  • Poetry
  • Docker (for running integration tests)

Setup

# Clone the repository
git clone https://github.com/zbytealchemy/zephcast.git
cd zephcast

# Install dependencies
poetry install

# Run tests
poetry run pytest

Running Integration Tests

Start the required services:

docker-compose up -d

Run the integration tests:

poetry run pytest tests/integration

Contributing

We use rebase workflow for pull requests and allow no more then 2 commits per PR.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgments

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zephcast-0.2.0.tar.gz (17.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

zephcast-0.2.0-py3-none-any.whl (24.3 kB view details)

Uploaded Python 3

File details

Details for the file zephcast-0.2.0.tar.gz.

File metadata

  • Download URL: zephcast-0.2.0.tar.gz
  • Upload date:
  • Size: 17.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.10.16 Linux/6.8.0-1017-azure

File hashes

Hashes for zephcast-0.2.0.tar.gz
Algorithm Hash digest
SHA256 998478f211c83c14e1d19ba8b6aad8f8ae8e9b0e3e22a1c93b28fb6bd78af4d0
MD5 4c8082743d684232a4fd133ec6e7cf92
BLAKE2b-256 aaeb65956cb939e4e4cec806f1b0264721f38a1faf5df585e6e8db815f48c921

See more details on using hashes here.

File details

Details for the file zephcast-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: zephcast-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 24.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.10.16 Linux/6.8.0-1017-azure

File hashes

Hashes for zephcast-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 55c1f32abe86830a76cc9d1563e96abbfa1d60a09475edcc88e4be71dde00439
MD5 dbf2e51760038ebc5388a60bd2adca5d
BLAKE2b-256 bbdc0302f309e17959fb939c91f0d222763910bda1d9bdb21778b62c14fda43c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page