Skip to main content

Асинхронная клиент-серверная библиотека с файловой очередью на стороне клиента.

Project description

Асинхронная клиент-серверная библиотека с файловой очередью на стороне клиента

Почтовый голубь - библиотека для взаимодействия с приложениями на уровне протоколов ZMQ. Библиотека обеспечивает стабильное отправления сообщений и их получения. При потери связи с сервером клиент накапливает файловую очередь, а после восстановления связи отправляет все сообщения получателю через сервер переадресаций. В каждом клиенте находится свой сервер переадресаций, который способен запустиься для поддержания рассылки сообщений. Для безопасного общения по сети можно настроить сертификаты с двумя ключами. Также поддерживается шифрование сообщений на уровне клиента, что делает не доступным сообщения со стороны сервера. Данная библиотека является асинхронной, клиент не ждет мгновенного подтверждения. Когда сообщение посылается получателю, отправитель еще имеет копию сообщения, которое при случае может снова отправиться.

  1. Установка
  2. Создание клиента с файловой очередью
  3. Ожидаем получения сообщений
  4. Отправить и забыть
  5. Отправить и ждать ответа
  6. Запасной сервер переадресаций
  7. Клиент без файловой очереди
  8. Клиент-серверная синхронизация
  9. Пользовательская файловая очередь
  10. Безопасность и изолированность
  11. CURVE аутентификация
  12. Аутентификация и изолированность

Установка

pip install mail-pigeon

Создание клиента с файловой очередью

Использование синхронного клиента.

from pathlib import Path
from mail_pigeon import MailClient
from mail_pigeon.queue import FilesBox

name = 'app1'

path = Path(__file__).parent / name # где будут скапливаться файлы на отправку
fb = FilesBox(str(path)) # очередь писем на отправку
client = MailClient(
        name_client=name, 
        is_master=True, 
        out_queue=fb
    )
client.wait_server() # ожидает запуска сервера

Параметры MailClient:

  • name_client : Название клиента латиницей без пробелов. Оно пригодится для отправки сообщений другим участникам.
  • host_server : Адрес клиента мастера. По умолчанию - 27.0.0.1.
  • port_server : Порт подключения. По умолчанию - 5555.
  • is_master : Будет ли этот клиент сервером. Если у вас несколько приложений, то только один может быть сервером переадресаций. Либо можно указать значение None, что скажет клиенту, запустить свой сервер, если нет другого. Если установить True, но сервер уже запущен в другом приложение, то будет ошибка. В этом случе в других приложениях нужно указать False.
  • out_queue : Очередь писем на отправку. По умолчанию используется очередь внути памяти процесса. Также можно определить свой класс очереди, к примеру очеред через редис.
  • cert_dir: Путь до сертификата или до пустой директории для генерации ключа.

Параметры FilesBox:

  • folder : Путь до директории с очерелью сообщений.

Использование асинхронного клиента.

from anyio import Path as AsyncPath
from mail_pigeon import AsyncMailClient
from mail_pigeon.queue import AsyncFilesBox

name = 'app1'

path = AsyncPath(__file__).parent / name # где будут скапливаться файлы на отправку
fb = AsyncFilesBox(str(path)) # очередь писем на отправку
client = AsyncMailClient(
        name_client=name, 
        is_master=True,
        out_queue=fb
    )
await client.wait_server() # ожидает запуска сервера

Параметры AsyncMailClient:

  • name_client : Название клиента латиницей без пробелов. Оно пригодится для отправки сообщений другим участникам.
  • host_server : Адрес клиента мастера. По умолчанию - 27.0.0.1.
  • port_server : Порт подключения. По умолчанию - 5555.
  • is_master : Будет ли этот клиент сервером. Если у вас несколько приложений, то только один может быть сервером переадресаций на хосте. Либо можно указать значение None, что скажет клиенту, запустить свой сервер, если нет другого. Если установить True, но сервер уже запущен в другом приложение, то будет ошибка. В этом случе в других приложениях нужно указать False.
  • out_queue : Очередь писем на отправку. По умолчанию используется очередь внути памяти процесса. Также можно определить свой класс очереди, к примеру очеред через редис.
  • cert_dir: Путь до сертификата или до пустой директории для генерации ключа.

Параметры AsyncFilesBox:

  • folder : Путь до директории с очередью сообщений.

Вне зависимости какой клиент будет использоваться, письма отправлются асинхронно в любом клиенте. Почти все методы, что есть в синхронном варианте клиента, присутствуют и в асинхронном клиенте.


Ожидаем получения сообщений

...

while True:
    msg = client.get()
    print('')
    print(f'key: {msg.key}') # ИД сообщения в очереди отправителя
    print(f'sender: {msg.sender}') # отправитель сообщения
    print(f'recipient: {msg.recipient}') # получатель сообщения
    print(f'content: {msg.content}') # контент
    print('===========')

Метод .get() можно использовать как с блокировкой, так и с временной блокировкой. Когда метод используется с временной блокировкой timeout = 3 в секундах, то если результата не будет, метод вернет пустоту None. При успешном получение ответа, данные запроса будут находится в msg.content.


Отправить и забыть

Данный метод отправляет текст в другое приложение с названием клиента app2. Метод .send() способен отправить сообщение и про него забыть. Другая сторона должна ожидать сообщение через метод .get().

client.send(recipient='app2', content='hello world')

Параметры метода .send():

  • recipient : Получатель.
  • content : Содержимое.
  • wait : Ожидать ли получения ответа от запроса.

Отправить и ждать ответа

Если есть потребность в ожидания ответа от другого приложения, то последовательность действий будет такая:

  • Приложение app1 посылает запрос и ждет ответа.
# app1
content = 'hello'
msg = client.send(recipient='app2', content=content, wait=True) 
if msg is None:
    return
content = f'{content} {msg.content}'
print(f'key: {msg.key}') # ИД сообщения в очереди у отправителя
print(f'sender: {msg.sender}') # отправитель сообщения
print(f'recipient: {msg.recipient}') # получатель сообщения
print(f'content: {content}') # контент 'hello world'
print('===========')

Может быть случай, когда от нас ушло сообщение, но на той стороне клиент выпал с ошибкой, и чтобы не ждать вечно потерянного сообщения, можно выставить на случай потере timeout. Если не использовать timeout, то как только на текущий клиент придет уведомление, что другой клиент отключился, то ожидание само прекратиться и client.send вернет пустоту. Также ожидание прекращается, если отключается сервер. Это тоже вернет пустоту. Такое ожидание может быть полезным, если нужно определить состояние связи с удаленным клиентом.

  • Приложение app2 получает запрос, обрабатывает его и посылает ответ с таким же ключом как в запросе. Вот как это выглядит:
# app2
while True:
    msg = client.get()
    print('')
    print(f'key: {msg.key}') # ИД сообщения в очереди у отправителя
    print(f'sender: {msg.sender}') # отправитель сообщения
    print(f'recipient: {msg.recipient}') # получатель сообщения
    print(f'content: {msg.content}') # контент 'hello'
    print('===========')
    # обработка запроса 
    # ...
    # и отправка
    client.send(recipient=msg.sender, content='world') # под капотом используется ключ msg.key
  • Или так... Приложение app2 получает запрос, обрабатывает его и посылает ответ с таким же ключом, но при помощи другого потока.
# app2

from queue import Queue
q = Queue()
# thread 1
while True:
    msg = client.get()
    print('')
    print(f'key: {msg.key}') # ИД сообщения в очереди у отправителя
    print(f'sender: {msg.sender}') # из другого приложения
    print(f'recipient: {msg.recipient}') # здесь название нашего приложения
    print(f'content: {msg.content}') # контент 'hello'
    print('===========')
    q.put(msg)
    # обработка запроса и посылаем на отправку в другой поток через очередь
    

# thread 2
while True:
    # Получаем из очереди и отправляем
    msg = q.get()
    client.send(recipient=msg.sender, content='world') # здесь тоже будет использован ключ из thread 1

Запасной сервер переадресаций

Предположим у вас есть клиент-сервер, который занимается переадресацией. Но если это приложение упадет или остановиться, то связь между клиентами будет нарушена. В этом случае можно сделать так, чтобы было возможно запустить дополнительный сервер в клиенте.

Приложение 1.

from pathlib import Path
from mail_pigeon import MailClient
from mail_pigeon.queue import FilesBox

name = 'app1'

path = Path(__file__).parent / name # очередь писем на отправку
client = MailClient(
        name_client=name, 
        is_master=None, 
        out_queue=FilesBox(str(path))
    )
client.wait_server() # ожидает запуска сервера

Приложение 2.

from pathlib import Path
from mail_pigeon import MailClient
from mail_pigeon.queue import FilesBox

name = 'app2'

path = Path(__file__).parent / name # очередь писем на отправку
client = MailClient(
        name_client=name, 
        is_master=None, 
        out_queue=FilesBox(str(path))
    )
client.wait_server() # ожидает запуска сервера

Когда установлен атрибут is_master=None, то это говорит клиенту, что запустить сервер, если нет другого. Если приложение app2 упадет, то будет запушен сервер внутри app1. Эти сервера должны находится на одном хосте.


Клиент без файловой очереди

Если нет необходимости в сохранение сообщений, когда приложение падает или выключается, то можно упустить создание очереди.

from mail_pigeon import MailClient
from mail_pigeon.queue import FilesBox

name = 'app1'

client = MailClient(
        name_client=name, 
        is_master=None, 
    )
client.wait_server() # ожидает запуска сервера

В этом случае будет применена очеред внутри самого клиента. Она находится в памяти процесса.


Клиент-серверная синхронизация

У каждого клиента имеется список имен других клиентов. Только если клиент знает об участнике, он может отправлять ему сообщения. Об этом заботиться сервер, который обовещает всех клиентов, кто присоединяется, а кто уходит. Сервер в свою очеред поддерживает связь со всеми клиентами, и если один клиент перестает отвечать, то сервер его отключает и всех уведомляет об его уходе.


Пользовательская файловая очередь

Для создания своей очереди, к примеру через редис, есть специальный базовый класс BaseQueue или BaseAsyncQueue. Понадобиться определить следующие методы.

from typing import List
from mail_pigeon.queue import BaseQueue


class SimpleBox(BaseQueue):
    
    def __init__(self, timeout_processing: int = None):
        super().__init__(timeout_processing)
        self._simple_box = {}

    def _init_live_queue(self) -> List[str]:
        """Инициализация очереди при создание экземпляра.

        Returns:
            List[str]: Список.
        """
        return []
            
    def _remove_data(self, key: str):
        """Удаляет данные одного элемента.

        Args:
            key (str): Ключ.
        """
        if key in self._simple_box:
            del self._simple_box[key]

    def _read_data(self, key: str) -> str:
        """Чтение данных по ключу.

        Args:
            key (str): Название.

        Returns:
            str: Прочитанные данные.
        """
        return self._simple_box[key]

    def _save_data(self, key: str, value: str):
        """Сохраняет данные.

        Args:
            value (str): Ключ.
            value (str): Значение.
        """
        self._simple_box[key] = value

В этих методах вам не нужно заботиться о конкурентности данных. Просто опишите откуда читать и куда сохранять.


Безопасность и изолированность

У библиотеки есть возможность отправлять сообщения в зашифрованном виде. В этом случае с таким клиентом можно будет общаться только при наличие общего пароля. Такие данные отправлются на сервер в зашифрованном виде. Единственное, что не шифруется это системные команды синхронизации между клиентом и сервером. Данный тип шифрование можно использовать, если мы не хотим, чтобы на сервере могли прочитать сообщение предназначенное другому клиенту.

from mail_pigeon import MailClient
from mail_pigeon.security import TypesEncryptors

# app1
encript1 = TypesEncryptors.HMAC('admin1')
# этот клиент выступает как сервер переадресаций
client1 = MailClient('app1', is_master=True, encryptor=encryptor1)
client1.wait_server() # ожидает запуска сервера

# app2
encript2 = TypesEncryptors.HMAC('admin2')
client2 = MailClient('app2', is_master=False, encryptor=encryptor2)
client2.wait_server() # ожидает запуска сервера
# app3
client3 = MailClient('app3', is_master=False, encryptor=encryptor2)
client3.wait_server() # ожидает запуска сервера

Несмотря на то, что сервер переадресаций находится в app1, но app2 и app3 могут общаться между собой, а app1 не может с ними общаться. Даже если app1 получить их сообщения из своего сервера в ручную, он не сможет их расшифровать. Это дает изолированность между отдельными группами и безопасное отправление сообщений.


CURVE аутентификация

Это сертификат с двумя ключами. Все сообщения между клиентами и сервером будут в защифрованном виде. Любой клиент который имеет публичный ключ сервера сможет соединиться с ним. В данном случае даже системные сообщения синхронизации будут в зашифрованном виде. При использование только этого метода защиты сообщения могут быть прочитаны только на сервере.

from mail_pigeon import MailClient
# app1
client1 = MailClient('app1', is_master=True, cert_dir='/certificate')
client1.wait_server() # ожидает запуска сервера

Для app1 будет создана директория по пути cert_dir. Для client1 там будет храниться приватный и публичный ключ сервера, а также приватный и публичный ключ текущего клиента.

Чтобы запустить второй клиент, нам потребуется публичный ключ сервера app1. Его нужно будет скопировать самостоятельно и положить в директорию для второго клиента.

from mail_pigeon import MailClient
# app2
client2 = MailClient('app2', is_master=False, cert_dir='/certificate2')
client2.wait_server() # ожидает запуска сервера

Аутентификация и изолированность

Это комбинированный подход к защите информации. В этом случае сообщения смогут прочитать только клиенты, которые имееют и публичный ключ сервера и пароль от группы.

from mail_pigeon import MailClient
from mail_pigeon.security import TypesEncryptors

# app1
encript1 = TypesEncryptors.HMAC('admin')
client1 = MailClient('app1', is_master=True, cert_dir='/certificate', encryptor=encryptor1)
client1.wait_server() # ожидает запуска сервера

Внешние ссылки

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mail_pigeon-1.6.15-py3-none-any.whl (44.7 kB view details)

Uploaded Python 3

File details

Details for the file mail_pigeon-1.6.15-py3-none-any.whl.

File metadata

  • Download URL: mail_pigeon-1.6.15-py3-none-any.whl
  • Upload date:
  • Size: 44.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for mail_pigeon-1.6.15-py3-none-any.whl
Algorithm Hash digest
SHA256 320948e80ac09452ff5d09800f9cffc4bd3f493510cb79c018b9d784b4e1ab15
MD5 edac4989b7af42e22b7694420199c08a
BLAKE2b-256 a88e5ba4fb23e0466e52d70a8b3e38943a0a7fcb2cc611312ea7b61787fa0a6e

See more details on using hashes here.

Provenance

The following attestation bundles were made for mail_pigeon-1.6.15-py3-none-any.whl:

Publisher: build-wheel.yml on AntonGlyzin/mail_pigeon

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page