Skip to main content

Async coordination system using Broker-Emitter-Consumer with testing, we simply call it: "Event Architecture based on Promises"

Project description

Pooter — Async Event Coordination System

Test Status License Python PyPI - Version

Pooter is an asynchronous coordination system built on a simple but powerful pattern:

Emitter → Broker → Consumer

We call this "Event Architecture based on Promises" — where multiple Emitters notify a Broker, which then synchronizes and dispatches events to Consumers once all conditions are fulfilled.

This pattern works on letting an Emitter emits the Broker then the Broker puts all of the other Emitters into the same Promises, which they promise to give give back the same signal (which is emit).

If all resolve, the promising section is finished, otherwise, it's cancelled by the Broker.


✨ Features

  • Broker-based event coordination
  • Awaitable emit + resolution logic
  • Modular EventBus pub/sub pattern
  • Fully tested with pytest-asyncio

📦 Installation

pip install pooter

Or for development:

git clone https://github.com/ticuong78/pooter
cd pooter
pip install -e .[dev]

🚀 Usage Example

# pyright: reportArgumentType=false

import asyncio
import logging

from src.archi.broker import Broker
from src.archi.emitter import Emitter
from src.archi.consumer import Consumer

logging.basicConfig(level=logging.INFO)

async def main():
    async def callback_consumer():
        await asyncio.sleep(0.5)
        print("Dũng xấu trai")
    
    def callback_consumer_2():
        print("Quyên đẹp trai")
    
    broker = Broker(timeout=1.0)

    emitter1 = Emitter("ONE")
    emitter2 = Emitter("TWO")
    consumer1 = Consumer(callback=callback_consumer)
    consumer2 = Consumer(callback=callback_consumer_2)

    broker.register_emitter(emitter1)
    broker.register_emitter(emitter2)
    broker.register_consumer(consumer1)
    broker.register_consumer(consumer2)

    # Start emitter1 (this starts the coordination session)
    emitter1_task = emitter1.emit()  # async def emit()

    # Emit emitter2 before timeout
    async def emit_emitter2():
        await asyncio.sleep(0.2)
        await emitter2.emit()

    # Register emitter3 after session starts, then emit
    async def register_and_emit_emitter3():
        await asyncio.sleep(0.3)  # after session starts
        emitter3 = Emitter("THREE")
        emitter3.resolve_callback = lambda uuid=emitter3.uuid: print(f"[Emitter {uuid}] internal resolved.")
        broker.register_emitter(emitter3)

        await asyncio.sleep(0.4) # this should be enough to resolve emitter3
        # await asyncio.sleep(0.4) # otherwise, this will be ignored
        await emitter3.emit()

    await asyncio.gather(emitter1_task, emit_emitter2(), register_and_emit_emitter3())

if __name__ == "__main__":
    asyncio.run(main())

🧪 Running Tests

pytest tests

Includes:

  • tests/test_broker.py
  • tests/test_emitter.py
  • tests/test_consumer.py
  • tests/test_events.py

📁 Project Structure

src/
├── archi/
│   ├── broker.py       # Orchestrates emitter/consumer coordination
│   ├── emitter.py      # Emits to broker
│   ├── consumer.py     # Handles post-resolution logic
│   └── events.py       # Internal event bus
tests/
    └── test_*.py       # Full pytest-asyncio test suite
main.py                 # Example use case
pyproject.toml          # Build + metadata

📜 License

MIT License.
Created by Lê Cườngcuongdayne17@gmail.com


🛠️ Contributing

Contributions are welcome and encouraged.

If you'd like to contribute to pooter, please follow these steps:

  1. Fork the repository
  2. Create a new branch:
    git checkout -b feature/your-feature-name
    
  3. Write clear, tested code
  4. Follow existing code style (black, ruff, mypy)
  5. Submit a Pull Request with a clear description

Please make sure all tests pass before submitting:

pytest

To run style checks:

black . && ruff . && mypy src/

For ideas, bugs, or PRs, visit: github.com/ticuong78/pooter

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pooter-0.4.3.tar.gz (11.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pooter-0.4.3-py3-none-any.whl (8.0 kB view details)

Uploaded Python 3

File details

Details for the file pooter-0.4.3.tar.gz.

File metadata

  • Download URL: pooter-0.4.3.tar.gz
  • Upload date:
  • Size: 11.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.2

File hashes

Hashes for pooter-0.4.3.tar.gz
Algorithm Hash digest
SHA256 dd941694ffec8018734416b78db80d8402a4ec20386967ac2504b759a9f53657
MD5 88e8742b40a0acff2cdc7a387b83ac2d
BLAKE2b-256 2cbc6a5a38e688c3660ff388b23c81b2ed8598f8feb0361ab747558ed7a46fee

See more details on using hashes here.

File details

Details for the file pooter-0.4.3-py3-none-any.whl.

File metadata

  • Download URL: pooter-0.4.3-py3-none-any.whl
  • Upload date:
  • Size: 8.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.2

File hashes

Hashes for pooter-0.4.3-py3-none-any.whl
Algorithm Hash digest
SHA256 7c589f153270548678da4fd7fc38c1ee4a257ee637b66e470883fc690a125dff
MD5 cda519f16a3a2f193d02a497856d8833
BLAKE2b-256 c2d641f6ba7dd5a839bd1e8377542bddd483a5e28bd7378172c3384477dc0f59

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page