Skip to main content

Conversation-oriented Pyrogram extension with per-update listeners and multi-session coordination

Project description

pyroflow

PyPI version Python versions License: MIT

Conversation-oriented Pyrogram extension with per-update listeners and multi-session coordination.

pyroflow builds on top of Pyrogram / Kurigram to replace the handler-based model with a conversation-first API — await a specific reply from a specific user instead of wiring up global handlers and managing state machines by hand.

pip install pyroflow

For Redis-backed coordination:

pip install pyroflow[redis]

Why pyroflow?

Pyrogram fires your handler for every incoming update of a given type. The moment you need a back-and-forth conversation you end up writing state machines, storing user_id → step in a dict, and hoping two updates don't race each other.

pyroflow solves all three problems:

Problem pyroflow solution
Waiting for a specific reply UpdateListenerawait the next update from a user
Duplicate processing across multiple bot sessions UpdateCoordinated — distributed lock per update
Replaying or inspecting previous handler steps UpdateHistory — per-update-type handler record

Installation

Minimum requirements: Python 3.9+

pip install pyroflow          # core
pip install pyroflow[redis]   # + Redis coordinator support
pip install pyroflow[dev]     # + development tools (hatch, twine)

Quick start

from pyroflow import Client, MessageListener

client = Client("my_session")
client.register_listener(MessageListener())

@client.on_message()
async def on_start(client, message):
    if message.text != "/start":
        return

    answer = await message.ask(
        chat_id=message.chat.id,
        text="What is your name?",
        listen_user_id=message.from_user.id,
        timeout=60,
    )
    await answer.reply(f"Hello, {answer.text}!")

client.run()

Core concepts

Listeners

A UpdateListener is a typed queue bound to a Pyrogram update type. Any coroutine can await the next matching update from a specific user or chat. An update claimed by a listener never reaches the normal handler pipeline.

from pyroflow import Client, MessageListener
from pyroflow.errors import ListenerTimeout

client = Client("my_session")
client.register_listener(MessageListener())

@client.on_message()
async def on_confirm(client, message):
    if message.text != "/confirm":
        return

    await message.reply("Send your confirmation code:")

    try:
        code_msg = await client.message_listen(
            chat_id=message.chat.id,
            user_id=message.from_user.id,
            timeout=120,
        )
    except ListenerTimeout:
        await message.reply("Timed out. Please try again.")
        return

    await code_msg.reply(f"Code received: {code_msg.text}")

client.run()

Shortcuts for the two most common listener types:

client.message_listen    # UpdateListener[Message]
client.callback_listen   # UpdateListener[CallbackQuery]

ask()

ask() is the high-level wrapper around listeners. It sends (or edits) a message and then suspends until a matching reply arrives — all in one await.

# Send a new message, then wait for reply
answer = await client.ask(
    chat_id=chat_id,
    text="Choose an option:",
    reply_markup=keyboard,
    listen_user_id=user_id,
    timeout=30,
)

# Edit an existing message, then wait for a callback query
callback = await client.ask(
    chat_id=chat_id,
    text="Updated — choose again:",
    message_id=sent_msg.id,
    listen_user_id=user_id,
    timeout=30,
    update_type=CallbackQuery,
)

Parameters:

Parameter Description
chat_id Target chat
text Message text
message_id If provided, edits the message instead of sending a new one
listen_user_id Filter the awaited update by user
listen_message_id Filter the awaited update by message
timeout Seconds to wait before raising ListenerTimeout
update_type Update type to wait for — determines the return type (default: Message)
meta Arbitrary metadata attached to the listener

Raises:

  • ListenerTimeout — no reply arrived within timeout seconds
  • ListenerCancelled — the listener was cancelled while waiting

Coordinators

A UpdateCoordinated acquires a distributed lock before processing an update. This ensures the same update is handled by exactly one session when the bot runs on multiple servers simultaneously.

from functools import partial
from redis.asyncio import Redis
from pyroflow import Client, MessageCoordinated, RedisUpdateCoordinator


client = Client("my_session")
redis = Redis(db=client.name)
coordinator_factory = partial(RedisUpdateCoordinator, redis)
coordinated = MessageCoordinated(coordinator_factory)
client.register_coordinated(coordinated)

client.run()

Supported backends:

Backend Extra
Redis pip install pyroflow[redis]

Lock states:

  • HANDLED — at least one handler completed without error; lock is released and other sessions skip the update.
  • None — no handler ran successfully; lock is released so another session may retry.

Histories

A UpdateHistory records which handlers ran successfully for each update. This enables features like back buttons that replay or inspect previous processing steps.

from pyroflow import Client, MessageHistory

client = Client("my_session")
client.register_history(MessageHistory())

client.run()

They can also be removed at runtime:

await client.unregister_listener(Message)
await client.unregister_coordinated(Message)
await client.unregister_history(Message)

Error handling

from pyroflow.errors import ListenerTimeout, ListenerCancelled

try:
    reply = await client.ask(chat_id, "Your input?", listen_user_id=uid, timeout=30)
except ListenerTimeout:
    await client.send_message(chat_id, "You took too long. Try again.")
except ListenerCancelled:
    await client.send_message(chat_id, "Session was cancelled.")

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyroflow-0.1.2.tar.gz (30.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyroflow-0.1.2-py3-none-any.whl (47.1 kB view details)

Uploaded Python 3

File details

Details for the file pyroflow-0.1.2.tar.gz.

File metadata

  • Download URL: pyroflow-0.1.2.tar.gz
  • Upload date:
  • Size: 30.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pyroflow-0.1.2.tar.gz
Algorithm Hash digest
SHA256 de20b3dc97e742001d0630f0756deea45ce95d322bf84d839c635a6eb1b62919
MD5 bfb24c5ec5513c321b34f8f771c19bed
BLAKE2b-256 01b929f17f98f53840cf0f7c29f96ce6cbf48359593941f37ef68a686c0c969f

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyroflow-0.1.2.tar.gz:

Publisher: pypi_release.yml on eeeob/pyroflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pyroflow-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: pyroflow-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 47.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pyroflow-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 31d979dfc710e589aebe9d482e5cd269fc29b99964622f2fd2affef562d8b8ca
MD5 0e02ba25f905c8dbac9fc767bdc4d371
BLAKE2b-256 2a578ba52e897fccb390dc522a79f6cc1b4bfbc21803dddb9839af6ed8f9c2b8

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyroflow-0.1.2-py3-none-any.whl:

Publisher: pypi_release.yml on eeeob/pyroflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page