Skip to main content

Conversation-oriented Pyrogram extension with per-update listeners and multi-session coordination

Project description

pyroflow

PyPI version Python versions License: MIT

Conversation-oriented Pyrogram extension with per-update listeners and multi-session coordination.

pyroflow builds on top of Pyrogram / Kurigram to replace the handler-based model with a conversation-first API — await a specific reply from a specific user instead of wiring up global handlers and managing state machines by hand.

pip install pyroflow

For Redis-backed coordination:

pip install pyroflow[redis]

Why pyroflow?

Pyrogram fires your handler for every incoming update of a given type. The moment you need a back-and-forth conversation you end up writing state machines, storing user_id → step in a dict, and hoping two updates don't race each other.

pyroflow solves all three problems:

Problem pyroflow solution
Waiting for a specific reply UpdateListenerawait the next update from a user
Duplicate processing across multiple bot sessions UpdateCoordinated — distributed lock per update
Replaying or inspecting previous handler steps UpdateHistory — per-update-type handler record

Installation

Minimum requirements: Python 3.9+

pip install pyroflow          # core
pip install pyroflow[redis]   # + Redis coordinator support
pip install pyroflow[dev]     # + development tools (hatch, twine)

Quick start

from pyroflow import Client, MessageListener

client = Client("my_session")
client.register_listener(MessageListener())

@client.on_message()
async def on_start(client, message):
    if message.text != "/start":
        return

    answer = await message.ask(
        chat_id=message.chat.id,
        text="What is your name?",
        listen_user_id=message.from_user.id,
        timeout=60,
    )
    await answer.reply(f"Hello, {answer.text}!")

client.run()

Core concepts

Listeners

A UpdateListener is a typed queue bound to a Pyrogram update type. Any coroutine can await the next matching update from a specific user or chat. An update claimed by a listener never reaches the normal handler pipeline.

from pyroflow import Client, MessageListener
from pyroflow.errors import ListenerTimeout

client = Client("my_session")
client.register_listener(MessageListener())

@client.on_message()
async def on_confirm(client, message):
    if message.text != "/confirm":
        return

    await message.reply("Send your confirmation code:")

    try:
        code_msg = await client.message_listen(
            chat_id=message.chat.id,
            user_id=message.from_user.id,
            timeout=120,
        )
    except ListenerTimeout:
        await message.reply("Timed out. Please try again.")
        return

    await code_msg.reply(f"Code received: {code_msg.text}")

client.run()

Shortcuts for the two most common listener types:

client.message_listen    # UpdateListener[Message]
client.callback_listen   # UpdateListener[CallbackQuery]

ask()

ask() is the high-level wrapper around listeners. It sends (or edits) a message and then suspends until a matching reply arrives — all in one await.

# Send a new message, then wait for reply
answer = await client.ask(
    chat_id=chat_id,
    text="Choose an option:",
    reply_markup=keyboard,
    listen_user_id=user_id,
    timeout=30,
)

# Edit an existing message, then wait for a callback query
callback = await client.ask(
    chat_id=chat_id,
    text="Updated — choose again:",
    message_id=sent_msg.id,
    listen_user_id=user_id,
    timeout=30,
    update_type=CallbackQuery,
)

Parameters:

Parameter Description
chat_id Target chat
text Message text
message_id If provided, edits the message instead of sending a new one
listen_user_id Filter the awaited update by user
listen_message_id Filter the awaited update by message
timeout Seconds to wait before raising ListenerTimeout
update_type Update type to wait for — determines the return type (default: Message)
meta Arbitrary metadata attached to the listener

Raises:

  • ListenerTimeout — no reply arrived within timeout seconds
  • ListenerCancelled — the listener was cancelled while waiting

Coordinators

A UpdateCoordinated acquires a distributed lock before processing an update. This ensures the same update is handled by exactly one session when the bot runs on multiple servers simultaneously.

from functools import partial
from redis.asyncio import Redis
from pyroflow import Client, MessageCoordinated, RedisUpdateCoordinator


client = Client("my_session")
redis = Redis(db=client.name)
coordinator_factory = partial(RedisUpdateCoordinator, redis)
coordinated = MessageCoordinated(coordinator_factory)
client.register_coordinated(coordinated)

client.run()

Supported backends:

Backend Extra
Redis pip install pyroflow[redis]

Lock states:

  • HANDLED — at least one handler completed without error; lock is released and other sessions skip the update.
  • None — no handler ran successfully; lock is released so another session may retry.

Histories

A UpdateHistory records which handlers ran successfully for each update. This enables features like back buttons that replay or inspect previous processing steps.

from pyroflow import Client, MessageHistory

client = Client("my_session")
client.register_history(MessageHistory())

client.run()

They can also be removed at runtime:

await client.unregister_listener(Message)
await client.unregister_coordinated(Message)
await client.unregister_history(Message)

Error handling

from pyroflow.errors import ListenerTimeout, ListenerCancelled

try:
    reply = await client.ask(chat_id, "Your input?", listen_user_id=uid, timeout=30)
except ListenerTimeout:
    await client.send_message(chat_id, "You took too long. Try again.")
except ListenerCancelled:
    await client.send_message(chat_id, "Session was cancelled.")

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyroflow-0.1.1.tar.gz (30.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyroflow-0.1.1-py3-none-any.whl (47.1 kB view details)

Uploaded Python 3

File details

Details for the file pyroflow-0.1.1.tar.gz.

File metadata

  • Download URL: pyroflow-0.1.1.tar.gz
  • Upload date:
  • Size: 30.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pyroflow-0.1.1.tar.gz
Algorithm Hash digest
SHA256 bc512f56ea3b00e98ac2f3cf60542ffb24022acbc41c6d77c2d2eccad5f70b63
MD5 5fd918b39bde91820c581fb8c437ae0d
BLAKE2b-256 133328e157c5a15f1f948fd8def2eb1ab90805c06e3b8357b686b911c0c99b15

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyroflow-0.1.1.tar.gz:

Publisher: pypi_release.yml on eeeob/pyroflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pyroflow-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: pyroflow-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 47.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pyroflow-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 09d281ea1894814a731115e631c0c81d640be2b4d9205fd05371619a63331815
MD5 dcbccb0f9ed213d5fec4be1fb116891e
BLAKE2b-256 6338813d71f1bac4da9942140425c0e7fb2b18ecbfa72e1efda8cd460c028dcf

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyroflow-0.1.1-py3-none-any.whl:

Publisher: pypi_release.yml on eeeob/pyroflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page