Skip to main content

Async coordination system using Broker-Emitter-Consumer with testing, we simply call it: "Event Architecture based on Promises"

Project description

Pooter — Async Event Coordination System

Test Status License Python PyPI - Version

Pooter is an asynchronous coordination system built on a simple but powerful pattern:

Emitter → Broker → Consumer

We call this "Event Architecture based on Promises" — where multiple Emitters notify a Broker, which then synchronizes and dispatches events to Consumers once all conditions are fulfilled.

This pattern works on letting an Emitter emits the Broker then the Broker puts all of the other Emitters into the same Promises, which they promise to give give back the same signal (which is emit).

If all resolve, the promising section is finished, otherwise, it's cancelled by the Broker.


✨ Features

  • Broker-based event coordination
  • Awaitable emit + resolution logic
  • Modular EventBus pub/sub pattern
  • Fully tested with pytest-asyncio

📦 Installation

pip install pooter

Or for development:

git clone https://github.com/ticuong78/pooter
cd pooter
pip install -e .[dev]

🚀 Usage Example

# pyright: reportArgumentType=false

import asyncio
import logging

from src.archi.broker import Broker
from src.archi.emitter import Emitter
from src.archi.consumer import Consumer

logging.basicConfig(level=logging.INFO)

async def main():
    async def callback_consumer():
        await asyncio.sleep(0.5)
        print("Dũng xấu trai")
    
    def callback_consumer_2():
        print("Quyên đẹp trai")
    
    broker = Broker(timeout=1.0)

    emitter1 = Emitter("ONE")
    emitter2 = Emitter("TWO")
    consumer1 = Consumer(callback=callback_consumer)
    consumer2 = Consumer(callback=callback_consumer_2)

    broker.register_emitter(emitter1)
    broker.register_emitter(emitter2)
    broker.register_consumer(consumer1)
    broker.register_consumer(consumer2)

    # Start emitter1 (this starts the coordination session)
    emitter1_task = emitter1.emit()  # async def emit()

    # Emit emitter2 before timeout
    async def emit_emitter2():
        await asyncio.sleep(0.2)
        await emitter2.emit()

    # Register emitter3 after session starts, then emit
    async def register_and_emit_emitter3():
        await asyncio.sleep(0.3)  # after session starts
        emitter3 = Emitter("THREE")
        emitter3.resolve_callback = lambda uuid=emitter3.uuid: print(f"[Emitter {uuid}] internal resolved.")
        broker.register_emitter(emitter3)

        await asyncio.sleep(0.4) # this should be enough to resolve emitter3
        # await asyncio.sleep(0.4) # otherwise, this will be ignored
        await emitter3.emit()

    await asyncio.gather(emitter1_task, emit_emitter2(), register_and_emit_emitter3())

if __name__ == "__main__":
    asyncio.run(main())

🧪 Running Tests

pytest tests

Includes:

  • tests/test_broker.py
  • tests/test_emitter.py
  • tests/test_consumer.py
  • tests/test_events.py

📁 Project Structure

src/
├── archi/
│   ├── broker.py       # Orchestrates emitter/consumer coordination
│   ├── emitter.py      # Emits to broker
│   ├── consumer.py     # Handles post-resolution logic
│   └── events.py       # Internal event bus
tests/
    └── test_*.py       # Full pytest-asyncio test suite
main.py                 # Example use case
pyproject.toml          # Build + metadata

📜 License

MIT License.
Created by Lê Cườngcuongdayne17@gmail.com


🛠️ Contributing

Contributions are welcome and encouraged.

If you'd like to contribute to pooter, please follow these steps:

  1. Fork the repository
  2. Create a new branch:
    git checkout -b feature/your-feature-name
    
  3. Write clear, tested code
  4. Follow existing code style (black, ruff, mypy)
  5. Submit a Pull Request with a clear description

Please make sure all tests pass before submitting:

pytest

To run style checks:

black . && ruff . && mypy src/

For ideas, bugs, or PRs, visit: github.com/ticuong78/pooter

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pooter-0.4.4.tar.gz (11.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pooter-0.4.4-py3-none-any.whl (8.1 kB view details)

Uploaded Python 3

File details

Details for the file pooter-0.4.4.tar.gz.

File metadata

  • Download URL: pooter-0.4.4.tar.gz
  • Upload date:
  • Size: 11.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.2

File hashes

Hashes for pooter-0.4.4.tar.gz
Algorithm Hash digest
SHA256 09467560ed66824e673b88e1d3991c39db4ddd8b610dffe503b144a43e6f2e7e
MD5 59ac8c30935ad9e2dbe6cca7667c6049
BLAKE2b-256 c0f397635453f21a6db3d4478291724caba3766e8ebdf728feb37e81f3d3d262

See more details on using hashes here.

File details

Details for the file pooter-0.4.4-py3-none-any.whl.

File metadata

  • Download URL: pooter-0.4.4-py3-none-any.whl
  • Upload date:
  • Size: 8.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.2

File hashes

Hashes for pooter-0.4.4-py3-none-any.whl
Algorithm Hash digest
SHA256 1e2d4f8583d8a96a4a115e2c5f610139af7f20d94cf0268a19b226bbe897fde7
MD5 ae575b2ec92ba5dcb17d056ab5dbd94c
BLAKE2b-256 b2a44a58218c3fbe271d191f1b0b6c5626aaf240f22df0668ed8996bb8bac569

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page