Skip to main content

Python telegram API adapter for FastAPI and asyncio

Project description

A library for simultaneous operation of a large number of telegram bots with a single code base.

Python - telegram API adapter. Based in Pydantic model libraries

You need to implement a web interface for a webhook yourself. I recommend FastAPI.

pip install megabot

  1. Fill in the dispatcher tokens in a convenient way for you in the format dict(int(bot_id), str(token)
  2. Define callback, message, and command handlers
  3. Connect the dispatcher to processing incoming webhooks.
  4. Done!

The filtering of the callbacks was taken from aiogram, the syntax is identical

example main.py

from megabot.dispatcher import Dispatcher
from megabot.poster import MessageService
from megabot.utils.callback_data import CallbackData

menu_cat = CallbackData('cat', 'id') # Callback fabric 
message_service = MessageService()

async def hello_word(callback: CallbackQuery):
    print(callback.data)
    await message_service.callback_answer(self.dp.tokens[bot_id], callback.id)
    
dp = Dispatcher()

bot_id = 9***9
token = '9***9:AA*****AA'
dp.tokens[bot_id] = token

dp.register_handler_callback(hello_word, callback_filter=menu_cat.filter())

webhook router file:

from main import dp

async def data_router(self, bot_id: int, data: dict) -> NoReturn:
    if not(dp.tokens.get(bot_id, None)):
        dp.tokens[bot_id] = token

    await dp.message_router(bot_id, data)

for local testing: NOT FOR PROD!!! add to main.py

from megabot.poster import get_updates
offset = 0

async def check_updates():
    global offset
    while True:
        result, error = await get_updates(token, offset)
        if result:
            result_list = result['result']
            if len(result_list) > 0:
                print(result)
                offset = result_list[-1]['update_id']+1
            await dp.message_router(bot_id, result)
        else:
            print(error)
        await asyncio.sleep(1)


await def start_bot()
    asyncio.create_task(check_updates())

add start_bot() to main file web_hooks

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

megabot-0.0.6.tar.gz (12.2 kB view hashes)

Uploaded Source

Built Distribution

megabot-0.0.6-py3-none-any.whl (16.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page