AMQP publisher and a facade for sending commands to the Telegram delivery service.
Project description
tg-delivery-client
AMQP publisher and facade for sending commands to a Telegram delivery service.
Requirements
- Python ≥ 3.12
- RabbitMQ (or any AMQP broker). To use priorities your queue must support
x-max-priority.
Installation
Install from PyPI:
pip install tg-delivery-client
Install the latest development version from GitHub:
pip install git+<github-url>
Or add to your pyproject.toml (Poetry):
[tool.poetry.dependencies]
tg-delivery-client = "^0.1.0"
Quick Start
import asyncio
from tg_delivery_client.facade import create_amqp_client
async def main():
client = await create_amqp_client(
"amqps://user:pass@host/vhost", # AMQP URL
"tg-delivery", # queue name
)
async with client:
await client.send_text(123456789, "Hello!", priority=5)
await client.send_image(123456789, "https://img/url.jpg", priority=3, text="Caption")
await client.send_i18n_text(123456789, "greeting.hello", priority=2, i18n_args={"name": "John"})
await client.send_image_i18n(
123456789,
image_url="https://img/url.jpg",
i18n_key="promo.banner",
priority=1,
i18n_args={"discount": 50},
)
asyncio.run(main())
Facade API
Creating a client
from tg_delivery_client.facade import TgDeliveryClient, create_amqp_client
from tg_delivery_client.options import AmqpPublisherConfig
# Helper:
client = await create_amqp_client("amqp://...", "queue_name", config=AmqpPublisherConfig(...))
# Or manually:
from tg_delivery_client.amqp_publisher import AmqpCommandPublisher
sender = AmqpCommandPublisher("amqp://...", "queue_name", cfg=AmqpPublisherConfig())
client = TgDeliveryClient(sender)
TgDeliveryClient methods
async def send_text(
chat_id: int,
text: str,
priority: int,
*,
reply_markup: dict[str, object] | None = None,
) -> None
async def send_image(
chat_id: int,
image_url: str,
priority: int,
*,
text: str | None = None,
reply_markup: dict[str, object] | None = None,
) -> None
async def send_i18n_text(
chat_id: int,
i18n_key: str,
priority: int,
*,
i18n_args: dict[str, object] | None = None,
reply_markup: dict[str, object] | None = None,
) -> None
async def send_image_i18n(
chat_id: int,
image_url: str,
i18n_key: str,
priority: int,
*,
reply_markup: dict[str, object] | None = None,
i18n_args: dict[str, object] | None = None,
) -> None
All methods build a SendCommand and call sender.send() with PublishOptions(priority=...).
Direct AMQP publisher usage
import asyncio
from tg_delivery_client.amqp_publisher import AmqpCommandPublisher
from tg_delivery_client.domain import SendCommand
from tg_delivery_client.options import PublishOptions, AmqpPublisherConfig
async def main():
cfg = AmqpPublisherConfig(ensure_queue=True, max_priority=10)
pub = AmqpCommandPublisher("amqps://user:pass@host/vhost", "tg-delivery", cfg=cfg)
await pub.connect()
cmd = SendCommand(chat_id=1, text="hi")
await pub.send(cmd, opts=PublishOptions(priority=3, headers={"x-tag": "demo"}))
await pub.close()
asyncio.run(main())
Configuration
AmqpPublisherConfig
from dataclasses import dataclass
from tg_delivery_client.serializers import JsonCommandSerializer
from tg_delivery_client.retry import RetryPolicy
@dataclass(slots=True)
class AmqpPublisherConfig:
ensure_queue: bool = True # declare queue on connect
max_priority: int = 10 # x-max-priority argument
serializer: JsonCommandSerializer = JsonCommandSerializer()
retry_policy: RetryPolicy = RetryPolicy() # backoff on ConnectionClosed/ChannelClosed
PublishOptions
from dataclasses import dataclass
from typing import Mapping
@dataclass(slots=True)
class PublishOptions:
priority: int
headers: Mapping[str, object] | None = None
RetryPolicy
from tg_delivery_client.retry import RetryPolicy
rp = RetryPolicy(
attempts=5,
base_delay=0.2,
max_delay=2.0,
multiplier=2.0,
jitter=True,
)
# pass it into AmqpPublisherConfig(retry_policy=rp)
Command format
from dataclasses import dataclass, field
@dataclass(slots=True)
class SendCommand:
chat_id: int
text: str | None = None
i18n_key: str | None = None
i18n_args: dict[str, object] = field(default_factory=dict)
reply_markup: dict[str, object] | None = None
image_url: str | None = None
Serialization
Default: JSON (JsonCommandSerializer).
from typing import Protocol
class CommandSerializer(Protocol):
content_type: str
content_encoding: str
def serialize(self, cmd: SendCommand) -> bytes: ...
Implement your own and pass it to AmqpPublisherConfig.serializer if needed.
Tests & Lint
pytest -q
ruff check .
See pyproject.toml for tool settings.
License
MIT. See LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file tg_delivery_client-0.1.0.tar.gz.
File metadata
- Download URL: tg_delivery_client-0.1.0.tar.gz
- Upload date:
- Size: 8.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9b0513a918f3d158edd1d709de184873843bc9611d21f9bd8daa73fd0a4b3a12
|
|
| MD5 |
6f75d0dce0b9e360998e40bf35f306b4
|
|
| BLAKE2b-256 |
6be6e5b2bc9dc57c77efec1011c1071e2c6d677bc4900d6d5411ae3dee9274b7
|
File details
Details for the file tg_delivery_client-0.1.0-py3-none-any.whl.
File metadata
- Download URL: tg_delivery_client-0.1.0-py3-none-any.whl
- Upload date:
- Size: 8.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b5a5f705e619e704c7b0a3aaa974b8ed0c8ab502278b776af9b594566e5a38d3
|
|
| MD5 |
ab38e2207ad6f8e2d814f23ec472fd3c
|
|
| BLAKE2b-256 |
c55a6233e0e7c32c8ff88bdb7042ecda47fbd92b3488c401f0a92c5b71c67fde
|