Skip to main content

Conversation-oriented Pyrogram extension with per-update listeners and multi-session coordination

Project description

pyroflow

PyPI version Python versions License: MIT

Conversation-oriented Pyrogram extension with per-update listeners and multi-session coordination.

pyroflow builds on top of Pyrogram / Kurigram to replace the handler-based model with a conversation-first API — await a specific reply from a specific user instead of wiring up global handlers and managing state machines by hand.

pip install pyroflow

For Redis-backed coordination:

pip install pyroflow[redis]

Why pyroflow?

Pyrogram fires your handler for every incoming update of a given type. The moment you need a back-and-forth conversation you end up writing state machines, storing user_id → step in a dict, and hoping two updates don't race each other.

pyroflow solves all three problems:

Problem pyroflow solution
Waiting for a specific reply UpdateListenerawait the next update from a user
Duplicate processing across multiple bot sessions UpdateCoordinated — distributed lock per update
Replaying or inspecting previous handler steps UpdateHistory — per-update-type handler record

Installation

Minimum requirements: Python 3.9+

pip install pyroflow          # core
pip install pyroflow[redis]   # + Redis coordinator support
pip install pyroflow[dev]     # + development tools (hatch, twine)

Quick start

from pyroflow import Client, MessageListener

client = Client("my_session")
client.register_listener(MessageListener())

@client.on_message()
async def on_start(client, message):
    if message.text != "/start":
        return

    answer = await message.ask(
        chat_id=message.chat.id,
        text="What is your name?",
        listen_user_id=message.from_user.id,
        timeout=60,
    )
    await answer.reply(f"Hello, {answer.text}!")

client.run()

Core concepts

Listeners

A UpdateListener is a typed queue bound to a Pyrogram update type. Any coroutine can await the next matching update from a specific user or chat. An update claimed by a listener never reaches the normal handler pipeline.

from pyroflow import Client, MessageListener
from pyroflow.errors import ListenerTimeout

client = Client("my_session")
client.register_listener(MessageListener())

@client.on_message()
async def on_confirm(client, message):
    if message.text != "/confirm":
        return

    await message.reply("Send your confirmation code:")

    try:
        code_msg = await client.message_listen(
            chat_id=message.chat.id,
            user_id=message.from_user.id,
            timeout=120,
        )
    except ListenerTimeout:
        await message.reply("Timed out. Please try again.")
        return

    await code_msg.reply(f"Code received: {code_msg.text}")

client.run()

Shortcuts for the two most common listener types:

client.message_listen    # UpdateListener[Message]
client.callback_listen   # UpdateListener[CallbackQuery]

ask()

ask() is the high-level wrapper around listeners. It sends (or edits) a message and then suspends until a matching reply arrives — all in one await.

# Send a new message, then wait for reply
answer = await client.ask(
    chat_id=chat_id,
    text="Choose an option:",
    reply_markup=keyboard,
    listen_user_id=user_id,
    timeout=30,
)

# Edit an existing message, then wait for a callback query
callback = await client.ask(
    chat_id=chat_id,
    text="Updated — choose again:",
    message_id=sent_msg.id,
    listen_user_id=user_id,
    timeout=30,
    update_type=CallbackQuery,
)

Parameters:

Parameter Description
chat_id Target chat
text Message text
message_id If provided, edits the message instead of sending a new one
listen_user_id Filter the awaited update by user
listen_message_id Filter the awaited update by message
timeout Seconds to wait before raising ListenerTimeout
update_type Update type to wait for — determines the return type (default: Message)
meta Arbitrary metadata attached to the listener

Raises:

  • ListenerTimeout — no reply arrived within timeout seconds
  • ListenerCancelled — the listener was cancelled while waiting

Coordinators

A UpdateCoordinated acquires a distributed lock before processing an update. This ensures the same update is handled by exactly one session when the bot runs on multiple servers simultaneously.

from functools import partial
from redis.asyncio import Redis
from pyroflow import Client, MessageCoordinated, RedisUpdateCoordinator


client = Client("my_session")
redis = Redis(db=client.name)
coordinator_factory = partial(RedisUpdateCoordinator, redis)
coordinated = MessageCoordinated(coordinator_factory)
client.register_coordinated(coordinated)

client.run()

Supported backends:

Backend Extra
Redis pip install pyroflow[redis]

Lock states:

  • HANDLED — at least one handler completed without error; lock is released and other sessions skip the update.
  • None — no handler ran successfully; lock is released so another session may retry.

Histories

A UpdateHistory records which handlers ran successfully for each update. This enables features like back buttons that replay or inspect previous processing steps.

from pyroflow import Client, MessageHistory

client = Client("my_session")
client.register_history(MessageHistory())

client.run()

They can also be removed at runtime:

await client.unregister_listener(Message)
await client.unregister_coordinated(Message)
await client.unregister_history(Message)

Error handling

from pyroflow.errors import ListenerTimeout, ListenerCancelled

try:
    reply = await client.ask(chat_id, "Your input?", listen_user_id=uid, timeout=30)
except ListenerTimeout:
    await client.send_message(chat_id, "You took too long. Try again.")
except ListenerCancelled:
    await client.send_message(chat_id, "Session was cancelled.")

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyroflow-0.1.3.tar.gz (30.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyroflow-0.1.3-py3-none-any.whl (47.1 kB view details)

Uploaded Python 3

File details

Details for the file pyroflow-0.1.3.tar.gz.

File metadata

  • Download URL: pyroflow-0.1.3.tar.gz
  • Upload date:
  • Size: 30.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pyroflow-0.1.3.tar.gz
Algorithm Hash digest
SHA256 202b19e91fae7575211244d67c1522a71d5b1b1c19cd76fbfb00bb68eec1fead
MD5 9b6ad936f1e39ccb6673015a1ab4be2d
BLAKE2b-256 4227dc845ff19de672e27c2f335dcf18d4c27c400c9272907e04975a0a20f405

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyroflow-0.1.3.tar.gz:

Publisher: pypi_release.yml on eeeob/pyroflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pyroflow-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: pyroflow-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 47.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pyroflow-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 519937d8464c8dfadc5b809038471032de99f0f946d90ecd1503f26372f8a40e
MD5 05a3ba962f1294ebd6b4da9ebc7e5030
BLAKE2b-256 e16228a34c561434c6f5d559984c8607fdf1fdaaa303f15d1523341a6b0ad8f6

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyroflow-0.1.3-py3-none-any.whl:

Publisher: pypi_release.yml on eeeob/pyroflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page