Skip to main content

An Event Bus framework for event driven systems.

Project description

Inbound

PyPI - Python Version

Inbound is an asyncio framework for building event-driven systems in Python. It provides a seamless interface for working with different Message Brokers, including In-memory, RabbitMQ, and Kafka.

Features

  • Asynchronous Operations: Build scalable and responsive systems with asyncio.
  • Multiple Broker Support: Easily switch between In-memory, RabbitMQ, and soon to be more.
  • Elegant API: Subscribe to channels, publish events, and listen to event streams with a simple and intuitive interface.
  • Serialization Flexibility: Choose between serializers like msgpack and json based on your requirements or bring your own.
  • Streamlined Callbacks API: Implement event-based systems quickly and effectively.

Quick Start

Installation

pip install python-inbound

Basic Usage

import asyncio
from inbound import EventBus

async def consume(bus: EventBus):
    async with bus.subscribe("test-channel") as stream:
        async for event in stream:
            print(event)

async def produce(bus: EventBus):
    coros = [
        bus.publish(
            channel="test-channel",
            type="test-event",
            data={"message": f"Hello World {i}"},
        )
        for i in range(20)
    ]
    await asyncio.gather(*coros)

async def main():
    event_bus = EventBus()

    tasks = [asyncio.create_task(consume(event_bus)), asyncio.create_task(produce(event_bus))]
    async with event_bus:
        await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

if __name__ == "__main__":
    asyncio.run(main())

Using Callbacks API

import asyncio
from inbound import CallbackGroup, Event, broker_from_url

group = CallbackGroup()

@group.callback("test-channel", "test-event")
def callback(event: Event):
    print(event)

async def main():
    event_bus = EventBus(broker=broker_from_url("memory://?serializer=msgpack"))
    event_bus.add_group(group)

    async with event_bus:
        for i in range(3):
            await event_bus.publish(
                channel="test-channel",
                type="test-event",
                data={"message": f"Hello World {i}"},
            )
            await asyncio.sleep(0.01)
        await event_bus.wait_until_finished()

if __name__ == "__main__":
    asyncio.run(main())

You can find more examples in the examples directory.

Documentation

For comprehensive documentation, including advanced features and configurations, refer to Link to Documentation.

License

This project is licensed under MIT License.

Support & Feedback

If you encounter any issues or have feedback, please open an issue. We'd love to hear from you!

Made with ❤️ by Emergent Methods

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

python_inbound-0.4.0-py3-none-any.whl (16.6 kB view details)

Uploaded Python 3

File details

Details for the file python_inbound-0.4.0-py3-none-any.whl.

File metadata

File hashes

Hashes for python_inbound-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2eb311aafa535f5121174743d50a05efc86528625a35f80352b8a39d8af90d54
MD5 b80e47db92c7399c4028c2ecbd9b1282
BLAKE2b-256 b8fc1f8309a4416723ff043e92247340fffc39b7132346058457645e06bb4720

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page