Skip to main content

An Event Bus framework for event driven systems.

Project description

Inbound

PyPI - Python Version

Inbound is an asyncio framework for building event-driven systems in Python. It provides a seamless interface for working with different Message Brokers, including In-memory, RabbitMQ, and Kafka.

Features

  • Asynchronous Operations: Build scalable and responsive systems with asyncio.
  • Multiple Broker Support: Easily switch between In-memory, RabbitMQ, and soon to be more.
  • Elegant API: Subscribe to channels, publish events, and listen to event streams with a simple and intuitive interface.
  • Serialization Flexibility: Choose between serializers like msgpack and json based on your requirements or bring your own.
  • Streamlined Callbacks API: Implement event-based systems quickly and effectively.

Quick Start

Installation

pip install python-inbound

Basic Usage

import asyncio
from inbound import EventBus

async def consume(bus: EventBus):
    async with bus.subscribe("test-channel") as stream:
        async for envelope in stream:
            # Get the event from the envelope
            event = envelope.event
            print(event)
            # After processing the event, acknowledge it
            await envelope.ack()
            # Unless you want to reject the event
            # await envelope.nack()


async def produce(bus: EventBus):
    coros = [
        bus.publish(
            channel="test-channel",
            type="test-event",
            data={"message": f"Hello World {i}"},
        )
        for i in range(20)
    ]
    await asyncio.gather(*coros)

async def main():
    event_bus = EventBus()

    tasks = [asyncio.create_task(consume(event_bus)), asyncio.create_task(produce(event_bus))]
    async with event_bus:
        await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

if __name__ == "__main__":
    asyncio.run(main())

Using Callbacks API

import asyncio
from inbound import CallbackGroup, Event, broker_from_url

group = CallbackGroup()

@group.callback("test-channel", "test-event")
def callback(event: Event):
    # Note: Envelopes are automatically acknowledged
    # for callbacks
    print(event)

async def main():
    event_bus = EventBus(broker=broker_from_url("memory://?serializer=msgpack"))
    event_bus.add_group(group)

    async with event_bus:
        for i in range(3):
            await event_bus.publish(
                channel="test-channel",
                type="test-event",
                data={"message": f"Hello World {i}"},
            )
            await asyncio.sleep(0.01)
        await event_bus.wait_until_finished()

if __name__ == "__main__":
    asyncio.run(main())

You can find more examples in the examples directory.

Documentation

For comprehensive documentation, including advanced features and configurations, refer to Link to Documentation.

License

This project is licensed under MIT License.

Support & Feedback

If you encounter any issues or have feedback, please open an issue. We'd love to hear from you!

Made with ❤️ by Emergent Methods

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

python_inbound-1.0.11-py3-none-any.whl (18.0 kB view details)

Uploaded Python 3

File details

Details for the file python_inbound-1.0.11-py3-none-any.whl.

File metadata

File hashes

Hashes for python_inbound-1.0.11-py3-none-any.whl
Algorithm Hash digest
SHA256 b5d89e35d4a756d31fefca0b8a93ec97672b50d71d41b37a5eb31f316252e66c
MD5 35fcd2318fa4cc7ae033f6905dbbd9a8
BLAKE2b-256 1d5fcb5c67e05a386ca4876fc47077ed43c35c257c0686c5025b5e3c9411bfce

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page