Skip to main content

An Event Bus framework for event driven systems.

Project description

Inbound

PyPI - Python Version

Inbound is an asyncio framework for building event-driven systems in Python. It provides a seamless interface for working with different Message Brokers, including In-memory, RabbitMQ, and Kafka.

Features

  • Asynchronous Operations: Build scalable and responsive systems with asyncio.
  • Multiple Broker Support: Easily switch between In-memory, RabbitMQ, and soon to be more.
  • Elegant API: Subscribe to channels, publish events, and listen to event streams with a simple and intuitive interface.
  • Serialization Flexibility: Choose between serializers like msgpack and json based on your requirements or bring your own.
  • Streamlined Callbacks API: Implement event-based systems quickly and effectively.

Quick Start

Installation

pip install python-inbound

Basic Usage

import asyncio
from inbound import EventBus, create_event_bus

async def consume(bus: EventBus):
    async with bus.subscribe("test-channel") as stream:
        async for event in stream:
            print(event)

async def produce(bus: EventBus):
    coros = [
        bus.publish(
            channel="test-channel",
            type="test-event",
            data={"message": f"Hello World {i}"},
        )
        for i in range(20)
    ]
    await asyncio.gather(*coros)

async def main():
    event_bus = await create_event_bus()

    tasks = [asyncio.create_task(consume(event_bus)), asyncio.create_task(produce(event_bus))]
    async with event_bus:
        await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

if __name__ == "__main__":
    asyncio.run(main())

Using Callbacks API

import asyncio
from inbound import CallbackGroup, Event, create_event_bus

group = CallbackGroup()

@group.callback("test-channel", "test-event")
def callback(event: Event):
    print(event)

async def main():
    event_bus = await create_event_bus()
    event_bus.add_group(group)

    async with event_bus:
        for i in range(3):
            await event_bus.publish(
                channel="test-channel",
                type="test-event",
                data={"message": f"Hello World {i}"},
            )
            await asyncio.sleep(0.01)
        await event_bus.wait_until_finished()

if __name__ == "__main__":
    asyncio.run(main())

You can find more examples in the examples directory.

Documentation

For comprehensive documentation, including advanced features and configurations, refer to Link to Documentation.

License

This project is licensed under MIT License.

Support & Feedback

If you encounter any issues or have feedback, please open an issue. We'd love to hear from you!

Made with ❤️ by Emergent Methods

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

python_inbound-0.1.0-py3-none-any.whl (14.6 kB view details)

Uploaded Python 3

File details

Details for the file python_inbound-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: python_inbound-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 14.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.11.4 Linux/5.15.109+

File hashes

Hashes for python_inbound-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0865217021d2f8d1760e4c254dcd65078fa65705db55ab21b850f4d01deab484
MD5 ca472a4be2e01bd1f2552f0b9dfcbe2e
BLAKE2b-256 b0ee23e2ee78d9778ef7a0382f1e48c960b61af2873df4c708820d38bd396510

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page