An Event Bus framework for event driven systems.
Project description
Inbound
Inbound is an asyncio framework for building event-driven systems in Python. It provides a seamless interface for working with different Message Brokers, including In-memory, RabbitMQ, and Kafka.
Features
- Asynchronous Operations: Build scalable and responsive systems with asyncio.
- Multiple Broker Support: Easily switch between In-memory, RabbitMQ, and soon to be more.
- Elegant API: Subscribe to channels, publish events, and listen to event streams with a simple and intuitive interface.
- Serialization Flexibility: Choose between serializers like msgpack and json based on your requirements or bring your own.
- Streamlined Callbacks API: Implement event-based systems quickly and effectively.
Quick Start
Installation
pip install python-inbound
Basic Usage
import asyncio
from inbound import EventBus, create_event_bus
async def consume(bus: EventBus):
async with bus.subscribe("test-channel") as stream:
async for event in stream:
print(event)
async def produce(bus: EventBus):
coros = [
bus.publish(
channel="test-channel",
type="test-event",
data={"message": f"Hello World {i}"},
)
for i in range(20)
]
await asyncio.gather(*coros)
async def main():
event_bus = await create_event_bus()
tasks = [asyncio.create_task(consume(event_bus)), asyncio.create_task(produce(event_bus))]
async with event_bus:
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
if __name__ == "__main__":
asyncio.run(main())
Using Callbacks API
import asyncio
from inbound import CallbackGroup, Event, create_event_bus
group = CallbackGroup()
@group.callback("test-channel", "test-event")
def callback(event: Event):
print(event)
async def main():
event_bus = await create_event_bus()
event_bus.add_group(group)
async with event_bus:
for i in range(3):
await event_bus.publish(
channel="test-channel",
type="test-event",
data={"message": f"Hello World {i}"},
)
await asyncio.sleep(0.01)
await event_bus.wait_until_finished()
if __name__ == "__main__":
asyncio.run(main())
You can find more examples in the examples directory.
Documentation
For comprehensive documentation, including advanced features and configurations, refer to Link to Documentation.
License
This project is licensed under MIT License.
Support & Feedback
If you encounter any issues or have feedback, please open an issue. We'd love to hear from you!
Made with ❤️ by Emergent Methods
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file python_inbound-0.1.0-py3-none-any.whl.
File metadata
- Download URL: python_inbound-0.1.0-py3-none-any.whl
- Upload date:
- Size: 14.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.5.1 CPython/3.11.4 Linux/5.15.109+
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0865217021d2f8d1760e4c254dcd65078fa65705db55ab21b850f4d01deab484
|
|
| MD5 |
ca472a4be2e01bd1f2552f0b9dfcbe2e
|
|
| BLAKE2b-256 |
b0ee23e2ee78d9778ef7a0382f1e48c960b61af2873df4c708820d38bd396510
|