Skip to main content

An Event Bus framework for event driven systems.

Project description

Inbound

PyPI - Python Version

Inbound is an asyncio framework for building event-driven systems in Python. It provides a seamless interface for working with different Message Brokers, including In-memory, RabbitMQ, and Kafka.

Features

  • Asynchronous Operations: Build scalable and responsive systems with asyncio.
  • Multiple Broker Support: Easily switch between In-memory, RabbitMQ, and soon to be more.
  • Elegant API: Subscribe to channels, publish events, and listen to event streams with a simple and intuitive interface.
  • Serialization Flexibility: Choose between serializers like msgpack and json based on your requirements or bring your own.
  • Streamlined Callbacks API: Implement event-based systems quickly and effectively.

Quick Start

Installation

pip install python-inbound

Basic Usage

import asyncio
from inbound import EventBus

async def consume(bus: EventBus):
    async with bus.subscribe("test-channel") as stream:
        async for envelope in stream:
            # Get the event from the envelope
            event = envelope.event
            print(event)
            # After processing the event, acknowledge it
            await envelope.ack()
            # Unless you want to reject the event
            # await envelope.nack()


async def produce(bus: EventBus):
    coros = [
        bus.publish(
            channel="test-channel",
            type="test-event",
            data={"message": f"Hello World {i}"},
        )
        for i in range(20)
    ]
    await asyncio.gather(*coros)

async def main():
    event_bus = EventBus()

    tasks = [asyncio.create_task(consume(event_bus)), asyncio.create_task(produce(event_bus))]
    async with event_bus:
        await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

if __name__ == "__main__":
    asyncio.run(main())

Using Callbacks API

import asyncio
from inbound import CallbackGroup, Event, broker_from_url

group = CallbackGroup()

@group.callback("test-channel", "test-event")
def callback(event: Event):
    # Note: Envelopes are automatically acknowledged
    # for callbacks
    print(event)

async def main():
    event_bus = EventBus(broker=broker_from_url("memory://?serializer=msgpack"))
    event_bus.add_group(group)

    async with event_bus:
        for i in range(3):
            await event_bus.publish(
                channel="test-channel",
                type="test-event",
                data={"message": f"Hello World {i}"},
            )
            await asyncio.sleep(0.01)
        await event_bus.wait_until_finished()

if __name__ == "__main__":
    asyncio.run(main())

You can find more examples in the examples directory.

Documentation

For comprehensive documentation, including advanced features and configurations, refer to Link to Documentation.

License

This project is licensed under MIT License.

Support & Feedback

If you encounter any issues or have feedback, please open an issue. We'd love to hear from you!

Made with ❤️ by Emergent Methods

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

python_inbound-1.0.6-py3-none-any.whl (17.9 kB view details)

Uploaded Python 3

File details

Details for the file python_inbound-1.0.6-py3-none-any.whl.

File metadata

File hashes

Hashes for python_inbound-1.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 9d921bf88c5597cfdf262a33c4456f3183085a683ced08eb40fe268ac9816ede
MD5 bbd1ce65191974e6ca12921988c43d91
BLAKE2b-256 d0453a3073f99b2d0dabcd7fc1744b68a4e9b994b995f3e73fcbd2f5eb1f7b03

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page