Conversation-oriented Pyrogram extension with per-update listeners and multi-session coordination
Project description
pyroflow
Conversation-oriented Pyrogram extension with per-update listeners and multi-session coordination.
pyroflow builds on top of Pyrogram / Kurigram to replace the handler-based model with a conversation-first API — await a specific reply from a specific user instead of wiring up global handlers and managing state machines by hand.
pip install pyroflow
For Redis-backed coordination:
pip install pyroflow[redis]
Why pyroflow?
Pyrogram fires your handler for every incoming update of a given type. The moment you need a back-and-forth conversation you end up writing state machines, storing user_id → step in a dict, and hoping two updates don't race each other.
pyroflow solves all three problems:
| Problem | pyroflow solution |
|---|---|
| Waiting for a specific reply | UpdateListener — await the next update from a user |
| Duplicate processing across multiple bot sessions | UpdateCoordinated — distributed lock per update |
| Replaying or inspecting previous handler steps | UpdateHistory — per-update-type handler record |
Installation
Minimum requirements: Python 3.9+
pip install pyroflow # core
pip install pyroflow[redis] # + Redis coordinator support
pip install pyroflow[dev] # + development tools (hatch, twine)
Quick start
from pyroflow import Client, MessageListener
client = Client("my_session")
client.register_listener(MessageListener())
@client.on_message()
async def on_start(client, message):
if message.text != "/start":
return
answer = await message.ask(
chat_id=message.chat.id,
text="What is your name?",
listen_user_id=message.from_user.id,
timeout=60,
)
await answer.reply(f"Hello, {answer.text}!")
client.run()
Core concepts
Listeners
A UpdateListener is a typed queue bound to a Pyrogram update type. Any coroutine can await the next matching update from a specific user or chat. An update claimed by a listener never reaches the normal handler pipeline.
from pyroflow import Client, MessageListener
from pyroflow.errors import ListenerTimeout
client = Client("my_session")
client.register_listener(MessageListener())
@client.on_message()
async def on_confirm(client, message):
if message.text != "/confirm":
return
await message.reply("Send your confirmation code:")
try:
code_msg = await client.message_listen(
chat_id=message.chat.id,
user_id=message.from_user.id,
timeout=120,
)
except ListenerTimeout:
await message.reply("Timed out. Please try again.")
return
await code_msg.reply(f"Code received: {code_msg.text}")
client.run()
Shortcuts for the two most common listener types:
client.message_listen # UpdateListener[Message]
client.callback_listen # UpdateListener[CallbackQuery]
ask()
ask() is the high-level wrapper around listeners. It sends (or edits) a message and then suspends until a matching reply arrives — all in one await.
# Send a new message, then wait for reply
answer = await client.ask(
chat_id=chat_id,
text="Choose an option:",
reply_markup=keyboard,
listen_user_id=user_id,
timeout=30,
)
# Edit an existing message, then wait for a callback query
callback = await client.ask(
chat_id=chat_id,
text="Updated — choose again:",
message_id=sent_msg.id,
listen_user_id=user_id,
timeout=30,
update_type=CallbackQuery,
)
Parameters:
| Parameter | Description |
|---|---|
chat_id |
Target chat |
text |
Message text |
message_id |
If provided, edits the message instead of sending a new one |
listen_user_id |
Filter the awaited update by user |
listen_message_id |
Filter the awaited update by message |
timeout |
Seconds to wait before raising ListenerTimeout |
update_type |
Update type to wait for — determines the return type (default: Message) |
meta |
Arbitrary metadata attached to the listener |
Raises:
ListenerTimeout— no reply arrived withintimeoutsecondsListenerCancelled— the listener was cancelled while waiting
Coordinators
A UpdateCoordinated acquires a distributed lock before processing an update. This ensures the same update is handled by exactly one session when the bot runs on multiple servers simultaneously.
from functools import partial
from redis.asyncio import Redis
from pyroflow import Client, MessageCoordinated, RedisUpdateCoordinator
client = Client("my_session")
redis = Redis(db=client.name)
coordinator_factory = partial(RedisUpdateCoordinator, redis)
coordinated = MessageCoordinated(coordinator_factory)
client.register_coordinated(coordinated)
client.run()
Supported backends:
| Backend | Extra |
|---|---|
| Redis | pip install pyroflow[redis] |
Lock states:
HANDLED— at least one handler completed without error; lock is released and other sessions skip the update.None— no handler ran successfully; lock is released so another session may retry.
Histories
A UpdateHistory records which handlers ran successfully for each update. This enables features like back buttons that replay or inspect previous processing steps.
from pyroflow import Client, MessageHistory
client = Client("my_session")
client.register_history(MessageHistory())
client.run()
They can also be removed at runtime:
await client.unregister_listener(Message)
await client.unregister_coordinated(Message)
await client.unregister_history(Message)
Error handling
from pyroflow.errors import ListenerTimeout, ListenerCancelled
try:
reply = await client.ask(chat_id, "Your input?", listen_user_id=uid, timeout=30)
except ListenerTimeout:
await client.send_message(chat_id, "You took too long. Try again.")
except ListenerCancelled:
await client.send_message(chat_id, "Session was cancelled.")
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyroflow-0.1.0.tar.gz.
File metadata
- Download URL: pyroflow-0.1.0.tar.gz
- Upload date:
- Size: 30.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7e6fbe57bdcc814e7c2886734806b27af1e04773a8901f4a9d38d62d48715dca
|
|
| MD5 |
247d2d2c59ec6a1265b8962cb6a1b1be
|
|
| BLAKE2b-256 |
e25aff7cf81f089ce84e10cf5dae39a312875f56ebc00355dd345268aa29b379
|
Provenance
The following attestation bundles were made for pyroflow-0.1.0.tar.gz:
Publisher:
pypi_release.yml on eeeob/pyroflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pyroflow-0.1.0.tar.gz -
Subject digest:
7e6fbe57bdcc814e7c2886734806b27af1e04773a8901f4a9d38d62d48715dca - Sigstore transparency entry: 1778487808
- Sigstore integration time:
-
Permalink:
eeeob/pyroflow@66ca1201878e74a8b9b8b610fe51c878a6240b9e -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/eeeob
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
pypi_release.yml@66ca1201878e74a8b9b8b610fe51c878a6240b9e -
Trigger Event:
push
-
Statement type:
File details
Details for the file pyroflow-0.1.0-py3-none-any.whl.
File metadata
- Download URL: pyroflow-0.1.0-py3-none-any.whl
- Upload date:
- Size: 47.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
36083435da6b5e5335bed7b79d28c0e080206e7b521ac98cbe60da8b6b0008c3
|
|
| MD5 |
ee7e5687b2412efc0e96249f79dcd253
|
|
| BLAKE2b-256 |
3442eb99482bce17eb87d8a0070140e3d2d2303d34ca10d93eed363f37d92116
|
Provenance
The following attestation bundles were made for pyroflow-0.1.0-py3-none-any.whl:
Publisher:
pypi_release.yml on eeeob/pyroflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pyroflow-0.1.0-py3-none-any.whl -
Subject digest:
36083435da6b5e5335bed7b79d28c0e080206e7b521ac98cbe60da8b6b0008c3 - Sigstore transparency entry: 1778488216
- Sigstore integration time:
-
Permalink:
eeeob/pyroflow@66ca1201878e74a8b9b8b610fe51c878a6240b9e -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/eeeob
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
pypi_release.yml@66ca1201878e74a8b9b8b610fe51c878a6240b9e -
Trigger Event:
push
-
Statement type: