Skip to main content

AMQP publisher and a facade for sending commands to the Telegram delivery service.

Project description

tg-delivery-client

AMQP publisher and facade for sending commands to a Telegram delivery service.

Requirements

  • Python ≥ 3.12
  • RabbitMQ (or any AMQP broker). To use priorities your queue must support x-max-priority.

Installation

Install from PyPI:

pip install tg-delivery-client

Install the latest development version from GitHub:

pip install git+<github-url>

Or add to your pyproject.toml (Poetry):

[tool.poetry.dependencies]
tg-delivery-client = "^0.1.0"

Quick Start

import asyncio
from tg_delivery_client.facade import create_amqp_client

async def main():
    client = await create_amqp_client(
        "amqps://user:pass@host/vhost",  # AMQP URL
        "tg-delivery",                   # queue name
    )

    async with client:
        await client.send_text(123456789, "Hello!", priority=5)
        await client.send_image(123456789, "https://img/url.jpg", priority=3, text="Caption")
        await client.send_i18n_text(123456789, "greeting.hello", priority=2, i18n_args={"name": "John"})
        await client.send_image_i18n(
            123456789,
            image_url="https://img/url.jpg",
            i18n_key="promo.banner",
            priority=1,
            i18n_args={"discount": 50},
        )

asyncio.run(main())

Facade API

Creating a client

from tg_delivery_client.facade import TgDeliveryClient, create_amqp_client
from tg_delivery_client.options import AmqpPublisherConfig

# Helper:
client = await create_amqp_client("amqp://...", "queue_name", config=AmqpPublisherConfig(...))

# Or manually:
from tg_delivery_client.amqp_publisher import AmqpCommandPublisher
sender = AmqpCommandPublisher("amqp://...", "queue_name", cfg=AmqpPublisherConfig())
client = TgDeliveryClient(sender)

TgDeliveryClient methods

async def send_text(
    chat_id: int,
    text: str,
    priority: int,
    *,
    reply_markup: dict[str, object] | None = None,
) -> None

async def send_image(
    chat_id: int,
    image_url: str,
    priority: int,
    *,
    text: str | None = None,
    reply_markup: dict[str, object] | None = None,
) -> None

async def send_i18n_text(
    chat_id: int,
    i18n_key: str,
    priority: int,
    *,
    i18n_args: dict[str, object] | None = None,
    reply_markup: dict[str, object] | None = None,
) -> None

async def send_image_i18n(
    chat_id: int,
    image_url: str,
    i18n_key: str,
    priority: int,
    *,
    reply_markup: dict[str, object] | None = None,
    i18n_args: dict[str, object] | None = None,
) -> None

All methods build a SendCommand and call sender.send() with PublishOptions(priority=...).


Direct AMQP publisher usage

import asyncio
from tg_delivery_client.amqp_publisher import AmqpCommandPublisher
from tg_delivery_client.domain import SendCommand
from tg_delivery_client.options import PublishOptions, AmqpPublisherConfig

async def main():
    cfg = AmqpPublisherConfig(ensure_queue=True, max_priority=10)
    pub = AmqpCommandPublisher("amqps://user:pass@host/vhost", "tg-delivery", cfg=cfg)
    await pub.connect()

    cmd = SendCommand(chat_id=1, text="hi")
    await pub.send(cmd, opts=PublishOptions(priority=3, headers={"x-tag": "demo"}))

    await pub.close()

asyncio.run(main())

Configuration

AmqpPublisherConfig

from dataclasses import dataclass
from tg_delivery_client.serializers import JsonCommandSerializer
from tg_delivery_client.retry import RetryPolicy

@dataclass(slots=True)
class AmqpPublisherConfig:
    ensure_queue: bool = True                 # declare queue on connect
    max_priority: int = 10                    # x-max-priority argument
    serializer: JsonCommandSerializer = JsonCommandSerializer()
    retry_policy: RetryPolicy = RetryPolicy() # backoff on ConnectionClosed/ChannelClosed

PublishOptions

from dataclasses import dataclass
from typing import Mapping

@dataclass(slots=True)
class PublishOptions:
    priority: int
    headers: Mapping[str, object] | None = None

RetryPolicy

from tg_delivery_client.retry import RetryPolicy

rp = RetryPolicy(
    attempts=5,
    base_delay=0.2,
    max_delay=2.0,
    multiplier=2.0,
    jitter=True,
)
# pass it into AmqpPublisherConfig(retry_policy=rp)

Command format

from dataclasses import dataclass, field

@dataclass(slots=True)
class SendCommand:
    chat_id: int
    text: str | None = None
    i18n_key: str | None = None
    i18n_args: dict[str, object] = field(default_factory=dict)
    reply_markup: dict[str, object] | None = None
    image_url: str | None = None

Serialization

Default: JSON (JsonCommandSerializer).

from typing import Protocol

class CommandSerializer(Protocol):
    content_type: str
    content_encoding: str
    def serialize(self, cmd: SendCommand) -> bytes: ...

Implement your own and pass it to AmqpPublisherConfig.serializer if needed.


Tests & Lint

pytest -q
ruff check .

See pyproject.toml for tool settings.


License

MIT. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tg_delivery_client-0.1.0.tar.gz (8.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tg_delivery_client-0.1.0-py3-none-any.whl (8.3 kB view details)

Uploaded Python 3

File details

Details for the file tg_delivery_client-0.1.0.tar.gz.

File metadata

  • Download URL: tg_delivery_client-0.1.0.tar.gz
  • Upload date:
  • Size: 8.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for tg_delivery_client-0.1.0.tar.gz
Algorithm Hash digest
SHA256 9b0513a918f3d158edd1d709de184873843bc9611d21f9bd8daa73fd0a4b3a12
MD5 6f75d0dce0b9e360998e40bf35f306b4
BLAKE2b-256 6be6e5b2bc9dc57c77efec1011c1071e2c6d677bc4900d6d5411ae3dee9274b7

See more details on using hashes here.

File details

Details for the file tg_delivery_client-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for tg_delivery_client-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b5a5f705e619e704c7b0a3aaa974b8ed0c8ab502278b776af9b594566e5a38d3
MD5 ab38e2207ad6f8e2d814f23ec472fd3c
BLAKE2b-256 c55a6233e0e7c32c8ff88bdb7042ecda47fbd92b3488c401f0a92c5b71c67fde

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page