Skip to main content

A messaging library supporting multiple backends

Project description

ZephCast

ZephCast is a powerful and flexible messaging library that provides a unified interface for working with multiple message brokers. It currently supports Kafka, RabbitMQ, and Redis, offering both synchronous and asynchronous clients.

Features

  • Multiple Broker Support:
    • Apache Kafka
    • RabbitMQ
    • Redis Streams
  • Unified Interface: Consistent API across all message brokers
  • Async Support: Native async/await support for all clients
  • Type Safety: Full type hints support
  • Error Handling: Robust error handling and recovery mechanisms
  • Consumer Groups: Support for consumer groups in Kafka and RabbitMQ
  • Exchange Bindings: Advanced RabbitMQ exchange and queue bindings
  • Stream Processing: Redis Streams support for stream processing

Requirements

  • Python 3.8+
  • Redis 5.0+ (for Redis Streams support)
  • Kafka 2.0+
  • RabbitMQ 3.8+

Installation

# Install with poetry (recommended)
poetry add zephcast

# Install with pip
pip install zephcast

Quick Start

Async Iterator Pattern

All async clients in ZephCast:w implement the async iterator pattern, allowing you to use them in async for loops:

async with client:  # Automatically connects and closes
    async for message in client:  # Uses receive() under the hood
        print(f"Received: {message}")

Kafka Example

from zephcast.kafka.async_client import AsyncKafkaClient

async def kafka_example():
    # Create a client
    client = AsyncKafkaClient(
        stream_name="my-topic",
        bootstrap_servers="localhost:9092"
    )
    
    # Using async context manager
    async with client:
        # Send messages
        await client.send("Hello Kafka!")
        
        # Receive messages
        async for message in client:
            print(f"Received: {message}")

RabbitMQ Example

from zephcast.rabbit.async_client import AsyncRabbitClient

async def rabbitmq_example():
    # Create a client
    client = AsyncRabbitClient(
        stream_name="my-routing-key",
        queue_name="my-queue",
        rabbitmq_url="amqp://guest:guest@localhost:5672/"
    )
    
    # Connect
    await client.connect()
    
    # Send messages
    await client.send("Hello RabbitMQ!")
    
    # Receive messages
    async for message in client.receive():
        print(f"Received: {message}")
        break
    
    # Close connection
    await client.close()

Redis Example

from zephcast.redis.async_client import AsyncRedisClient

async def redis_example():
    # Create a client
    client = AsyncRedisClient(
        stream_name="my-stream",
        redis_url="redis://localhost:6379"
    )
    
    # Connect
    await client.connect()
    
    # Send messages
    await client.send("Hello Redis!")
    
    # Receive messages
    async for message in client.receive():
        print(f"Received: {message}")
        break
    
    # Close connection
    await client.close()

Configuration

Environment Variables

  • KAFKA_BOOTSTRAP_SERVERS: Kafka bootstrap servers (default: "localhost:9092")
  • RABBITMQ_URL: RabbitMQ connection URL (default: "amqp://guest:guest@localhost:5672/")
  • REDIS_URL: Redis connection URL (default: "redis://localhost:6379")

Client Configuration

Each client accepts additional configuration parameters:

Kafka Client

  • group_id: Consumer group ID
  • auto_offset_reset: Offset reset strategy
  • security_protocol: Security protocol
  • sasl_mechanism: SASL mechanism
  • sasl_plain_username: SASL username
  • sasl_plain_password: SASL password

RabbitMQ Client

  • exchange_name: Exchange name
  • exchange_type: Exchange type
  • routing_key: Routing key
  • queue_name: Queue name
  • durable: Queue durability
  • auto_delete: Auto-delete queue

Redis Client

  • stream_max_len: Maximum stream length
  • consumer_group: Consumer group name
  • consumer_name: Consumer name
  • block_ms: Blocking time in milliseconds

Advanced Usage

Consumer Groups

# Kafka Consumer Group
client = AsyncKafkaClient(
    stream_name="my-topic",
    group_id="my-group",
    bootstrap_servers="localhost:9092"
)

# RabbitMQ Consumer Group
client = AsyncRabbitClient(
    stream_name="my-routing-key",
    queue_name="my-queue",
    consumer_group="my-group",
    rabbitmq_url="amqp://guest:guest@localhost:5672/"
)

# Redis Consumer Group
client = AsyncRedisClient(
    stream_name="my-stream",
    consumer_group="my-group",
    redis_url="redis://localhost:6379"
)

Error Handling

try:
    await client.connect()
    await client.send("message")
except ConnectionError:
    # Handle connection errors
    pass
except TimeoutError:
    # Handle timeout errors
    pass
except Exception as e:
    # Handle other errors
    pass
finally:
    await client.close()

Development

Prerequisites

  • Python 3.10+
  • Poetry
  • Docker (for running integration tests)

Setup

# Clone the repository
git clone https://github.com/zbytealchemy/zephcast.git
cd zephcast

# Install dependencies
poetry install

# Run tests
poetry run pytest

Running Integration Tests

Start the required services:

docker-compose up -d

Run the integration tests:

poetry run pytest tests/integration

Contributing

We use rebase workflow for pull requests and allow no more then 2 commits per PR.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgments

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zephcast-0.3.0.tar.gz (17.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

zephcast-0.3.0-py3-none-any.whl (24.3 kB view details)

Uploaded Python 3

File details

Details for the file zephcast-0.3.0.tar.gz.

File metadata

  • Download URL: zephcast-0.3.0.tar.gz
  • Upload date:
  • Size: 17.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.10.16 Linux/6.8.0-1017-azure

File hashes

Hashes for zephcast-0.3.0.tar.gz
Algorithm Hash digest
SHA256 f1d926963c3e771c0af045880f78375e743a5d4a1f8f07506e27194899c4dc6b
MD5 b1e075f049d4f49fcd087456c66c430a
BLAKE2b-256 b1e2d509eade9e606c7342895b82669b9186f02853bb34f870a99ffad7bc1f45

See more details on using hashes here.

File details

Details for the file zephcast-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: zephcast-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 24.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.10.16 Linux/6.8.0-1017-azure

File hashes

Hashes for zephcast-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ede643dcec8142414c005da5a104a414ce97548afcd0a66eed203bc8f8c1ba8f
MD5 edf50f82d799c72efa9fc19dc1708d38
BLAKE2b-256 404b836927ab64e4400f5ee645c38a94c721047ec0c6e19a0aa35481b06c8bf6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page