Асинхронная клиент-серверная библиотека с файловой очередью на стороне клиента.
Project description
Асинхронная клиент-серверная библиотека с файловой очередью на стороне клиента
Почтовый голубь - библиотека для взаимодействия с приложениями на уровне протоколов ZMQ. Библиотека обеспечивает стабильное отправления сообщений и их получения. При потери связи с сервером клиент накапливает файловую очередь, а после восстановления связи отправляет все сообщения получателю через сервер переадресаций. В каждом клиенте находится свой сервер переадресаций, который способен запустиься для поддержания рассылки сообщений. Для безопасного общения по сети можно настроить сертификаты с двумя ключами. Также поддерживается шифрование сообщений на уровне клиента, что делает не доступным сообщения со стороны сервера. Данная библиотека является асинхронной, клиент не ждет мгновенного подтверждения. Когда сообщение посылается получателю, отправитель еще имеет копию сообщения, которое при случае может снова отправиться.
- Установка
- Создание клиента с файловой очередью
- Ожидаем получения сообщений
- Отправить и забыть
- Отправить и ждать ответа
- Запасной сервер переадресаций
- Клиент без файловой очереди
- Клиент-серверная синхронизация
- Пользовательская файловая очередь
- Безопасность и изолированность
- CURVE аутентификация
- Аутентификация и изолированность
Установка
pip install mail-pigeon
Создание клиента с файловой очередью
Использование синхронного клиента.
from pathlib import Path
from mail_pigeon import MailClient
from mail_pigeon.queue import FilesBox
name = 'app1'
path = Path(__file__).parent / name # где будут скапливаться файлы на отправку
fb = FilesBox(str(path)) # очередь писем на отправку
client = MailClient(
name_client=name,
is_master=True,
out_queue=fb
)
client.wait_server() # ожидает запуска сервера
Параметры MailClient:
name_client: Название клиента латиницей без пробелов. Оно пригодится для отправки сообщений другим участникам.host_server: Адрес клиента мастера. По умолчанию -27.0.0.1.port_server: Порт подключения. По умолчанию -5555.is_master: Будет ли этот клиент сервером. Если у вас несколько приложений, то только один может быть сервером переадресаций. Либо можно указать значениеNone, что скажет клиенту, запустить свой сервер, если нет другого. Если установитьTrue, но сервер уже запущен в другом приложение, то будет ошибка. В этом случе в других приложениях нужно указатьFalse.out_queue: Очередь писем на отправку. По умолчанию используется очередь внути памяти процесса. Также можно определить свой класс очереди, к примеру очеред через редис.cert_dir: Путь до сертификата или до пустой директории для генерации ключа.
Параметры FilesBox:
folder: Путь до директории с очерелью сообщений.
Использование асинхронного клиента.
from anyio import Path as AsyncPath
from mail_pigeon import AsyncMailClient
from mail_pigeon.queue import AsyncFilesBox
name = 'app1'
path = AsyncPath(__file__).parent / name # где будут скапливаться файлы на отправку
fb = AsyncFilesBox(str(path)) # очередь писем на отправку
client = AsyncMailClient(
name_client=name,
is_master=True,
out_queue=fb
)
await client.wait_server() # ожидает запуска сервера
Параметры AsyncMailClient:
name_client: Название клиента латиницей без пробелов. Оно пригодится для отправки сообщений другим участникам.host_server: Адрес клиента мастера. По умолчанию -27.0.0.1.port_server: Порт подключения. По умолчанию -5555.is_master: Будет ли этот клиент сервером. Если у вас несколько приложений, то только один может быть сервером переадресаций на хосте. Либо можно указать значениеNone, что скажет клиенту, запустить свой сервер, если нет другого. Если установитьTrue, но сервер уже запущен в другом приложение, то будет ошибка. В этом случе в других приложениях нужно указатьFalse.out_queue: Очередь писем на отправку. По умолчанию используется очередь внути памяти процесса. Также можно определить свой класс очереди, к примеру очеред через редис.cert_dir: Путь до сертификата или до пустой директории для генерации ключа.
Параметры AsyncFilesBox:
folder: Путь до директории с очередью сообщений.
Вне зависимости какой клиент будет использоваться, письма отправлются асинхронно в любом клиенте. Почти все методы, что есть в синхронном варианте клиента, присутствуют и в асинхронном клиенте.
Ожидаем получения сообщений
...
while True:
msg = client.get()
print('')
print(f'key: {msg.key}') # ИД сообщения в очереди отправителя
print(f'sender: {msg.sender}') # отправитель сообщения
print(f'recipient: {msg.recipient}') # получатель сообщения
print(f'content: {msg.content}') # контент
print('===========')
Метод .get() можно использовать как с блокировкой, так и с временной блокировкой. Когда метод используется с временной блокировкой timeout = 3 в секундах, то если результата не будет, метод вернет пустоту None. При успешном получение ответа, данные запроса будут находится в msg.content.
Отправить и забыть
Данный метод отправляет текст в другое приложение с названием клиента app2. Метод .send() способен отправить сообщение и про него забыть. Другая сторона должна ожидать сообщение через метод .get().
client.send(recipient='app2', content='hello world')
Параметры метода .send():
recipient: Получатель.content: Содержимое.wait: Ожидать ли получения ответа от запроса.
Отправить и ждать ответа
Если есть потребность в ожидания ответа от другого приложения, то последовательность действий будет такая:
- Приложение
app1посылает запрос и ждет ответа.
# app1
content = 'hello'
msg = client.send(recipient='app2', content=content, wait=True)
if msg is None:
return
content = f'{content} {msg.content}'
print(f'key: {msg.key}') # ИД сообщения в очереди у отправителя
print(f'sender: {msg.sender}') # отправитель сообщения
print(f'recipient: {msg.recipient}') # получатель сообщения
print(f'content: {content}') # контент 'hello world'
print('===========')
Может быть случай, когда от нас ушло сообщение, но на той стороне клиент выпал с ошибкой, и чтобы не ждать вечно потерянного сообщения, можно выставить на случай потере timeout. Если не использовать timeout, то как только на текущий клиент придет уведомление, что другой клиент отключился, то ожидание само прекратиться и client.send вернет пустоту. Также ожидание прекращается, если отключается сервер. Это тоже вернет пустоту. Такое ожидание может быть полезным, если нужно определить состояние связи с удаленным клиентом.
- Приложение
app2получает запрос, обрабатывает его и посылает ответ с таким же ключом как в запросе. Вот как это выглядит:
# app2
while True:
msg = client.get()
print('')
print(f'key: {msg.key}') # ИД сообщения в очереди у отправителя
print(f'sender: {msg.sender}') # отправитель сообщения
print(f'recipient: {msg.recipient}') # получатель сообщения
print(f'content: {msg.content}') # контент 'hello'
print('===========')
# обработка запроса
# ...
# и отправка
client.send(recipient=msg.sender, content='world') # под капотом используется ключ msg.key
- Или так... Приложение
app2получает запрос, обрабатывает его и посылает ответ с таким же ключом, но при помощи другого потока.
# app2
from queue import Queue
q = Queue()
# thread 1
while True:
msg = client.get()
print('')
print(f'key: {msg.key}') # ИД сообщения в очереди у отправителя
print(f'sender: {msg.sender}') # из другого приложения
print(f'recipient: {msg.recipient}') # здесь название нашего приложения
print(f'content: {msg.content}') # контент 'hello'
print('===========')
q.put(msg)
# обработка запроса и посылаем на отправку в другой поток через очередь
# thread 2
while True:
# Получаем из очереди и отправляем
msg = q.get()
client.send(recipient=msg.sender, content='world') # здесь тоже будет использован ключ из thread 1
Запасной сервер переадресаций
Предположим у вас есть клиент-сервер, который занимается переадресацией. Но если это приложение упадет или остановиться, то связь между клиентами будет нарушена. В этом случае можно сделать так, чтобы было возможно запустить дополнительный сервер в клиенте.
Приложение 1.
from pathlib import Path
from mail_pigeon import MailClient
from mail_pigeon.queue import FilesBox
name = 'app1'
path = Path(__file__).parent / name # очередь писем на отправку
client = MailClient(
name_client=name,
is_master=None,
out_queue=FilesBox(str(path))
)
client.wait_server() # ожидает запуска сервера
Приложение 2.
from pathlib import Path
from mail_pigeon import MailClient
from mail_pigeon.queue import FilesBox
name = 'app2'
path = Path(__file__).parent / name # очередь писем на отправку
client = MailClient(
name_client=name,
is_master=None,
out_queue=FilesBox(str(path))
)
client.wait_server() # ожидает запуска сервера
Когда установлен атрибут is_master=None, то это говорит клиенту, что запустить сервер, если нет другого. Если приложение app2 упадет, то будет запушен сервер внутри app1. Эти сервера должны находится на одном хосте.
Клиент без файловой очереди
Если нет необходимости в сохранение сообщений, когда приложение падает или выключается, то можно упустить создание очереди.
from mail_pigeon import MailClient
from mail_pigeon.queue import FilesBox
name = 'app1'
client = MailClient(
name_client=name,
is_master=None,
)
client.wait_server() # ожидает запуска сервера
В этом случае будет применена очеред внутри самого клиента. Она находится в памяти процесса.
Клиент-серверная синхронизация
У каждого клиента имеется список имен других клиентов. Только если клиент знает об участнике, он может отправлять ему сообщения. Об этом заботиться сервер, который обовещает всех клиентов, кто присоединяется, а кто уходит. Сервер в свою очеред поддерживает связь со всеми клиентами, и если один клиент перестает отвечать, то сервер его отключает и всех уведомляет об его уходе.
Пользовательская файловая очередь
Для создания своей очереди, к примеру через редис, есть специальный базовый класс BaseQueue или BaseAsyncQueue. Понадобиться определить следующие методы.
from typing import List
from mail_pigeon.queue import BaseQueue
class SimpleBox(BaseQueue):
def __init__(self, timeout_processing: int = None):
super().__init__(timeout_processing)
self._simple_box = {}
def _init_live_queue(self) -> List[str]:
"""Инициализация очереди при создание экземпляра.
Returns:
List[str]: Список.
"""
return []
def _remove_data(self, key: str):
"""Удаляет данные одного элемента.
Args:
key (str): Ключ.
"""
if key in self._simple_box:
del self._simple_box[key]
def _read_data(self, key: str) -> str:
"""Чтение данных по ключу.
Args:
key (str): Название.
Returns:
str: Прочитанные данные.
"""
return self._simple_box[key]
def _save_data(self, key: str, value: str):
"""Сохраняет данные.
Args:
value (str): Ключ.
value (str): Значение.
"""
self._simple_box[key] = value
В этих методах вам не нужно заботиться о конкурентности данных. Просто опишите откуда читать и куда сохранять.
Безопасность и изолированность
У библиотеки есть возможность отправлять сообщения в зашифрованном виде. В этом случае с таким клиентом можно будет общаться только при наличие общего пароля. Такие данные отправлются на сервер в зашифрованном виде. Единственное, что не шифруется это системные команды синхронизации между клиентом и сервером. Данный тип шифрование можно использовать, если мы не хотим, чтобы на сервере могли прочитать сообщение предназначенное другому клиенту.
from mail_pigeon import MailClient
from mail_pigeon.security import TypesEncryptors
# app1
encript1 = TypesEncryptors.HMAC('admin1')
# этот клиент выступает как сервер переадресаций
client1 = MailClient('app1', is_master=True, encryptor=encryptor1)
client1.wait_server() # ожидает запуска сервера
# app2
encript2 = TypesEncryptors.HMAC('admin2')
client2 = MailClient('app2', is_master=False, encryptor=encryptor2)
client2.wait_server() # ожидает запуска сервера
# app3
client3 = MailClient('app3', is_master=False, encryptor=encryptor2)
client3.wait_server() # ожидает запуска сервера
Несмотря на то, что сервер переадресаций находится в app1, но app2 и app3 могут общаться между собой, а app1 не может с ними общаться. Даже если app1 получить их сообщения из своего сервера в ручную, он не сможет их расшифровать. Это дает изолированность между отдельными группами и безопасное отправление сообщений.
CURVE аутентификация
Это сертификат с двумя ключами. Все сообщения между клиентами и сервером будут в защифрованном виде. Любой клиент который имеет публичный ключ сервера сможет соединиться с ним. В данном случае даже системные сообщения синхронизации будут в зашифрованном виде. При использование только этого метода защиты сообщения могут быть прочитаны только на сервере.
from mail_pigeon import MailClient
# app1
client1 = MailClient('app1', is_master=True, cert_dir='/certificate')
client1.wait_server() # ожидает запуска сервера
Для app1 будет создана директория по пути cert_dir. Для client1 там будет храниться приватный и публичный ключ сервера, а также приватный и публичный ключ текущего клиента.
Чтобы запустить второй клиент, нам потребуется публичный ключ сервера app1. Его нужно будет скопировать самостоятельно и положить в директорию для второго клиента.
from mail_pigeon import MailClient
# app2
client2 = MailClient('app2', is_master=False, cert_dir='/certificate2')
client2.wait_server() # ожидает запуска сервера
Аутентификация и изолированность
Это комбинированный подход к защите информации. В этом случае сообщения смогут прочитать только клиенты, которые имееют и публичный ключ сервера и пароль от группы.
from mail_pigeon import MailClient
from mail_pigeon.security import TypesEncryptors
# app1
encript1 = TypesEncryptors.HMAC('admin')
client1 = MailClient('app1', is_master=True, cert_dir='/certificate', encryptor=encryptor1)
client1.wait_server() # ожидает запуска сервера
Внешние ссылки
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mail_pigeon-1.6.15-py3-none-any.whl.
File metadata
- Download URL: mail_pigeon-1.6.15-py3-none-any.whl
- Upload date:
- Size: 44.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
320948e80ac09452ff5d09800f9cffc4bd3f493510cb79c018b9d784b4e1ab15
|
|
| MD5 |
edac4989b7af42e22b7694420199c08a
|
|
| BLAKE2b-256 |
a88e5ba4fb23e0466e52d70a8b3e38943a0a7fcb2cc611312ea7b61787fa0a6e
|
Provenance
The following attestation bundles were made for mail_pigeon-1.6.15-py3-none-any.whl:
Publisher:
build-wheel.yml on AntonGlyzin/mail_pigeon
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
mail_pigeon-1.6.15-py3-none-any.whl -
Subject digest:
320948e80ac09452ff5d09800f9cffc4bd3f493510cb79c018b9d784b4e1ab15 - Sigstore transparency entry: 786861119
- Sigstore integration time:
-
Permalink:
AntonGlyzin/mail_pigeon@f2b33b08b039e64245a02ba7e15b0bbb242f6a35 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/AntonGlyzin
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
build-wheel.yml@f2b33b08b039e64245a02ba7e15b0bbb242f6a35 -
Trigger Event:
push
-
Statement type: