Skip to main content

An Event Bus framework for event driven systems.

Project description

Inbound

PyPI - Python Version

Inbound is an asyncio framework for building event-driven systems in Python. It provides a seamless interface for working with different Message Brokers, including In-memory, RabbitMQ, and Kafka.

Features

  • Asynchronous Operations: Build scalable and responsive systems with asyncio.
  • Multiple Broker Support: Easily switch between In-memory, RabbitMQ, and soon to be more.
  • Elegant API: Subscribe to channels, publish events, and listen to event streams with a simple and intuitive interface.
  • Serialization Flexibility: Choose between serializers like msgpack and json based on your requirements or bring your own.
  • Streamlined Callbacks API: Implement event-based systems quickly and effectively.

Quick Start

Installation

pip install python-inbound

Basic Usage

import asyncio
from inbound import EventBus

async def consume(bus: EventBus):
    async with bus.subscribe("test-channel") as stream:
        async for event in stream:
            print(event)

async def produce(bus: EventBus):
    coros = [
        bus.publish(
            channel="test-channel",
            type="test-event",
            data={"message": f"Hello World {i}"},
        )
        for i in range(20)
    ]
    await asyncio.gather(*coros)

async def main():
    event_bus = EventBus()

    tasks = [asyncio.create_task(consume(event_bus)), asyncio.create_task(produce(event_bus))]
    async with event_bus:
        await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

if __name__ == "__main__":
    asyncio.run(main())

Using Callbacks API

import asyncio
from inbound import CallbackGroup, Event, broker_from_url

group = CallbackGroup()

@group.callback("test-channel", "test-event")
def callback(event: Event):
    print(event)

async def main():
    event_bus = EventBus(broker=broker_from_url("memory://?serializer=msgpack"))
    event_bus.add_group(group)

    async with event_bus:
        for i in range(3):
            await event_bus.publish(
                channel="test-channel",
                type="test-event",
                data={"message": f"Hello World {i}"},
            )
            await asyncio.sleep(0.01)
        await event_bus.wait_until_finished()

if __name__ == "__main__":
    asyncio.run(main())

You can find more examples in the examples directory.

Documentation

For comprehensive documentation, including advanced features and configurations, refer to Link to Documentation.

License

This project is licensed under MIT License.

Support & Feedback

If you encounter any issues or have feedback, please open an issue. We'd love to hear from you!

Made with ❤️ by Emergent Methods

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

python_inbound-0.3.2-py3-none-any.whl (15.2 kB view details)

Uploaded Python 3

File details

Details for the file python_inbound-0.3.2-py3-none-any.whl.

File metadata

  • Download URL: python_inbound-0.3.2-py3-none-any.whl
  • Upload date:
  • Size: 15.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.12.7 Linux/6.1.0-17-amd64

File hashes

Hashes for python_inbound-0.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 cfbba111b8c4c1f5b979dd920310c396bb629cd3431b8aac0f6d70476f027de8
MD5 2df3e2c31603831ad05cd7dba9a210b4
BLAKE2b-256 8673fbad5304554ee8539d63955953944b3abc8b32660de6f248a131219983ae

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page