Skip to main content

Async coordination system using Broker-Emitter-Consumer with testing, we simply call it: "Event Architecture based on Promises"

Project description

Pooter — Async Event Coordination System

Test Status License Python PyPI - Version

Pooter is an asynchronous coordination system built on a simple but powerful pattern:

Emitter → Broker → Consumer

We call this "Event Architecture based on Promises" — where multiple Emitters notify a Broker, which then synchronizes and dispatches events to Consumers once all conditions are fulfilled.

This pattern works on letting an Emitter emits the Broker then the Broker puts all of the other Emitters into the same Promises, which they promise to give give back the same signal (which is emit).

If all resolve, the promising section is finished, otherwise, it's cancelled by the Broker.


✨ Features

  • Broker-based event coordination
  • Awaitable emit + resolution logic
  • Modular EventBus pub/sub pattern
  • Fully tested with pytest-asyncio

📦 Installation

pip install pooter

Or for development:

git clone https://github.com/ticuong78/pooter
cd pooter
pip install -e .[dev]

🚀 Usage Example

# pyright: reportArgumentType=false

import asyncio
import logging

from src.archi.broker import Broker
from src.archi.emitter import Emitter
from src.archi.consumer import Consumer

logging.basicConfig(level=logging.INFO)

async def main():
    async def callback_consumer():
        await asyncio.sleep(0.5)
        print("Dũng xấu trai")
    
    def callback_consumer_2():
        print("Quyên đẹp trai")
    
    broker = Broker(timeout=1.0)

    emitter1 = Emitter("ONE")
    emitter2 = Emitter("TWO")
    consumer1 = Consumer(callback=callback_consumer)
    consumer2 = Consumer(callback=callback_consumer_2)

    broker.register_emitter(emitter1)
    broker.register_emitter(emitter2)
    broker.register_consumer(consumer1)
    broker.register_consumer(consumer2)

    # Start emitter1 (this starts the coordination session)
    emitter1_task = emitter1.emit()  # async def emit()

    # Emit emitter2 before timeout
    async def emit_emitter2():
        await asyncio.sleep(0.2)
        await emitter2.emit()

    # Register emitter3 after session starts, then emit
    async def register_and_emit_emitter3():
        await asyncio.sleep(0.3)  # after session starts
        emitter3 = Emitter("THREE")
        emitter3.resolve_callback = lambda uuid=emitter3.uuid: print(f"[Emitter {uuid}] internal resolved.")
        broker.register_emitter(emitter3)

        await asyncio.sleep(0.4) # this should be enough to resolve emitter3
        # await asyncio.sleep(0.4) # otherwise, this will be ignored
        await emitter3.emit()

    await asyncio.gather(emitter1_task, emit_emitter2(), register_and_emit_emitter3())

if __name__ == "__main__":
    asyncio.run(main())

🧪 Running Tests

pytest tests

Includes:

  • tests/test_broker.py
  • tests/test_emitter.py
  • tests/test_consumer.py
  • tests/test_events.py

📁 Project Structure

src/
├── archi/
│   ├── broker.py       # Orchestrates emitter/consumer coordination
│   ├── emitter.py      # Emits to broker
│   ├── consumer.py     # Handles post-resolution logic
│   └── events.py       # Internal event bus
tests/
    └── test_*.py       # Full pytest-asyncio test suite
main.py                 # Example use case
pyproject.toml          # Build + metadata

📜 License

MIT License.
Created by Lê Cườngcuongdayne17@gmail.com


🛠️ Contributing

Contributions are welcome and encouraged.

If you'd like to contribute to pooter, please follow these steps:

  1. Fork the repository
  2. Create a new branch:
    git checkout -b feature/your-feature-name
    
  3. Write clear, tested code
  4. Follow existing code style (black, ruff, mypy)
  5. Submit a Pull Request with a clear description

Please make sure all tests pass before submitting:

pytest

To run style checks:

black . && ruff . && mypy src/

For ideas, bugs, or PRs, visit: github.com/ticuong78/pooter

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pooter-0.4.1.tar.gz (11.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pooter-0.4.1-py3-none-any.whl (7.9 kB view details)

Uploaded Python 3

File details

Details for the file pooter-0.4.1.tar.gz.

File metadata

  • Download URL: pooter-0.4.1.tar.gz
  • Upload date:
  • Size: 11.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.2

File hashes

Hashes for pooter-0.4.1.tar.gz
Algorithm Hash digest
SHA256 463f7336000c868b37168858a10c55e47b85663a3272fa7495d36883db60e7ca
MD5 1eef8b2cafa614adc419ff26ed6822c9
BLAKE2b-256 5c9466c6c932b102c06dcd0b92468f27be492e597140b9e26f6006c9e40ce877

See more details on using hashes here.

File details

Details for the file pooter-0.4.1-py3-none-any.whl.

File metadata

  • Download URL: pooter-0.4.1-py3-none-any.whl
  • Upload date:
  • Size: 7.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.2

File hashes

Hashes for pooter-0.4.1-py3-none-any.whl
Algorithm Hash digest
SHA256 138db88754dd7a8cbcc4b45f6007c6730c0acf81935cbec346777bb78075a49e
MD5 3e7e4d795312950c664cd3f530c3f0ec
BLAKE2b-256 87c564c93e3e392d96a419fc1e4d25ce3f89cca257881dfda179156001528c21

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page