A messaging library supporting multiple backends
Project description
ZephCast
ZephCast is a powerful and flexible messaging library that provides a unified interface for working with multiple message brokers. It currently supports Kafka, RabbitMQ, and Redis, offering both synchronous and asynchronous clients.
Features
- Multiple Broker Support:
- Apache Kafka
- RabbitMQ
- Redis Streams
- Unified Interface: Consistent API across all message brokers
- Async Support: Native async/await support for all clients
- Type Safety: Full type hints support
- Error Handling: Robust error handling and recovery mechanisms
- Consumer Groups: Support for consumer groups in Kafka and RabbitMQ
- Exchange Bindings: Advanced RabbitMQ exchange and queue bindings
- Stream Processing: Redis Streams support for stream processing
Requirements
- Python 3.8+
- Redis 5.0+ (for Redis Streams support)
- Kafka 2.0+
- RabbitMQ 3.8+
Installation
# Install with poetry (recommended)
poetry add zephcast
# Install with pip
pip install zephcast
Quick Start
Async Iterator Pattern
All async clients in ZephCast:w implement the async iterator pattern, allowing you to use them in async for loops:
async with client: # Automatically connects and closes
async for message in client: # Uses receive() under the hood
print(f"Received: {message}")
Kafka Example
from zephcast.kafka.async_client import AsyncKafkaClient
async def kafka_example():
# Create a client
client = AsyncKafkaClient(
stream_name="my-topic",
bootstrap_servers="localhost:9092"
)
# Using async context manager
async with client:
# Send messages
await client.send("Hello Kafka!")
# Receive messages
async for message in client:
print(f"Received: {message}")
RabbitMQ Example
from zephcast.rabbit.async_client import AsyncRabbitClient
async def rabbitmq_example():
# Create a client
client = AsyncRabbitClient(
stream_name="my-routing-key",
queue_name="my-queue",
rabbitmq_url="amqp://guest:guest@localhost:5672/"
)
# Connect
await client.connect()
# Send messages
await client.send("Hello RabbitMQ!")
# Receive messages
async for message in client.receive():
print(f"Received: {message}")
break
# Close connection
await client.close()
Redis Example
from zephcast.redis.async_client import AsyncRedisClient
async def redis_example():
# Create a client
client = AsyncRedisClient(
stream_name="my-stream",
redis_url="redis://localhost:6379"
)
# Connect
await client.connect()
# Send messages
await client.send("Hello Redis!")
# Receive messages
async for message in client.receive():
print(f"Received: {message}")
break
# Close connection
await client.close()
Configuration
Environment Variables
KAFKA_BOOTSTRAP_SERVERS: Kafka bootstrap servers (default: "localhost:9092")RABBITMQ_URL: RabbitMQ connection URL (default: "amqp://guest:guest@localhost:5672/")REDIS_URL: Redis connection URL (default: "redis://localhost:6379")
Client Configuration
Each client accepts additional configuration parameters:
Kafka Client
group_id: Consumer group IDauto_offset_reset: Offset reset strategysecurity_protocol: Security protocolsasl_mechanism: SASL mechanismsasl_plain_username: SASL usernamesasl_plain_password: SASL password
RabbitMQ Client
exchange_name: Exchange nameexchange_type: Exchange typerouting_key: Routing keyqueue_name: Queue namedurable: Queue durabilityauto_delete: Auto-delete queue
Redis Client
stream_max_len: Maximum stream lengthconsumer_group: Consumer group nameconsumer_name: Consumer nameblock_ms: Blocking time in milliseconds
Advanced Usage
Consumer Groups
# Kafka Consumer Group
client = AsyncKafkaClient(
stream_name="my-topic",
group_id="my-group",
bootstrap_servers="localhost:9092"
)
# RabbitMQ Consumer Group
client = AsyncRabbitClient(
stream_name="my-routing-key",
queue_name="my-queue",
consumer_group="my-group",
rabbitmq_url="amqp://guest:guest@localhost:5672/"
)
# Redis Consumer Group
client = AsyncRedisClient(
stream_name="my-stream",
consumer_group="my-group",
redis_url="redis://localhost:6379"
)
Error Handling
try:
await client.connect()
await client.send("message")
except ConnectionError:
# Handle connection errors
pass
except TimeoutError:
# Handle timeout errors
pass
except Exception as e:
# Handle other errors
pass
finally:
await client.close()
Development
Prerequisites
- Python 3.10+
- Poetry
- Docker (for running integration tests)
Setup
# Clone the repository
git clone https://github.com/zbytealchemy/zephcast.git
cd zephcast
# Install dependencies
poetry install
# Run tests
poetry run pytest
Running Integration Tests
Start the required services:
docker-compose up -d
Run the integration tests:
poetry run pytest tests/integration
Contributing
We use rebase workflow for pull requests and allow no more then 2 commits per PR.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
License
This project is licensed under the MIT License - see the LICENSE file for details.
Acknowledgments
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file zephcast-0.4.0.tar.gz.
File metadata
- Download URL: zephcast-0.4.0.tar.gz
- Upload date:
- Size: 17.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.7.1 CPython/3.10.16 Linux/6.8.0-1017-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2ee259a407fbd312c0a3ca154e1891e07581f9325d79680a18385164ea66216b
|
|
| MD5 |
f99ae53644dc5503b0cf21ccc4f1a493
|
|
| BLAKE2b-256 |
9b2314525fa12a8a36a3ef17781d747cac76cfe866f9e1b424e5620c5784338f
|
File details
Details for the file zephcast-0.4.0-py3-none-any.whl.
File metadata
- Download URL: zephcast-0.4.0-py3-none-any.whl
- Upload date:
- Size: 24.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.7.1 CPython/3.10.16 Linux/6.8.0-1017-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b60201e0ae00771b4a22ee1171143d887a9483fce1961afe7d3504e2d052eaec
|
|
| MD5 |
09a7e61b2ea1d7b51cb39ce03b2e91c9
|
|
| BLAKE2b-256 |
e4450ad5263bc81ecdfca5c1cff10f3019f89296797a31ead3af390cccd21591
|