Skip to main content

asyncio-simple-mq

Project description

SimpleMQ

SimpleMQ - простой MQ, написанный на питоне с использованием стримов из библиотеки asyncio для написания сервера и сокетов из библиотеки socket для реализации постоянного соединения сервер-подписчик

  1. Hello world
  2. Worker
  3. Routing

Hello world

Publisher (Издатель) - отсылает сообщение на сервер Stream (Стрим) - Некоторое отдельное пространство для хранения сообщений. Сообщения могут храниться только внутри стримов, но при этом на сервере может быть создано множество стримов. Издатель отправляет сообщение в стрим. Подписчик в свою очередь пытается получить сообщение из стрима. Follower (Подписчик) - устанавливает постоянное TCP соединение с сервером и ждет пока в стриме появятся сообщения

В hello world примере мы рассмотрим только одного издателя, один стрим и одного подписчика alt

  1. Создаем конфигурационный файл settings.yaml с данными для запуска сервера, удобнее его положить рядом с server.py, который мы создадим далее
host:
port:

# пример
host: localhost
port: 9090
  1. Создаем server.py
# server.py

from simplemq.server.server import run_server

if __name__ == '__main__':
    run_server(settings_yml_filepath='settings.yaml')
  1. Создаем follower.py
# follower.py

from simplemq.follower import Follower
from simplemq.bind import Bind
from simplemq.connection import Connection, ConnectionConfig

connection_config = ConnectionConfig(
    host='localhost',
    port=9090,
)

connection = Connection(connection_config=connection_config)

bind = Bind(route_string='hello_queue')
follower = Follower(connection=connection, bind=bind)

cursor = connection.cursor()
with cursor.session():
    cursor.create_stream('hello_queue')


def handle_message(message):
    from time import sleep
    sleep(5)
    print('message_body: ', message.message_body)
    print('message_handled')


with follower.session():
    for message in follower.get_messages():
        handle_message(message=message)
        follower.ack_message(message)
  1. Создаем publisher.py
# publisher.py

from simplemq.publisher.publisher import SocketBasedPublisher
from simplemq.bind import Bind
from simplemq.connection import Connection, ConnectionConfig

connection_config = ConnectionConfig(
    host='localhost',
    port=9090,
)

connection = Connection(connection_config=connection_config)

MESSAGE_BODY = 'hello world'

bind = Bind(route_string='hello_queue')
cursor = connection.cursor()
with cursor.session():
    cursor.create_stream('hello_queue')

publisher = SocketBasedPublisher(connection=connection, bind=bind)
publisher.send_message(message_body=MESSAGE_BODY)

Тут появляются новые определения:

  1. Bind - являются роутером, то есть - в случае с издателем говорит в очередь с каким наименованием (значение аргумента routing_string) отправлять сообщение, в случае подписчика - из какой очереди ждать сообщения.
  2. Cursor - объект, который не участвует в обмене сообщениями, он нужен для утилити функций: создать стрим; посмотреть все стримы, которые есть на сервере и тд.

Демонстрация работы: alt

В этом примере мы сначала подключили подписчика, а только потом начали отправлять сообщения в стрим издателем. Мы также можем сначала заполнить стрим сообщениями и только потом включить подписчика, в таком случае он сразу получить сообщения, а не будет их ждать.

Потверждение сообщений Когда подписчик читает сообщение со стрима оно удаляется со стрима и переходит в PEL (Pending entry list). После ACK сообщения от подписчика об обработке этого сообщения оно исчезает и от туда. Есть 2 способа потвердить обработку сообщения:

  • самому, вызвав у инстанса Follower метод ack_message
with follower.session():
    for message in follower.get_messages():
        handle_message(message=message)
        follower.ack_message(message)
  • передать True в аргумент auto_ack. В таком случае на сервер придет потверждение об обработке сообщения еще до вызова handle_message
with follower.session():
    for message in follower.get_messages(auto_ack=True):
        handle_message(message=message)

Worker

alt

Подписчики являются воркерами. При чтении сообщения подписчиком это сообщение пропадает из стрима, поэтому второй подписчик, который подписан на тот же стрим его не прочитает. Запустим двух подписчиков, только один из них будет обрабывать сообщение 100 секунд, а второй 2. Пока первый подписчик обработывает сообщение (в нашем случае просто спит) второй подписчик может читать входящие сообщения.

# first_follower.py

from simplemq.follower import Follower
from simplemq.bind import Bind
from simplemq.connection import Connection, ConnectionConfig

connection_config = ConnectionConfig(
    host='localhost',
    port=9090,
)

connection = Connection(connection_config=connection_config)

bind = Bind(route_string='hello_queue')
follower = Follower(connection=connection, bind=bind)

cursor = connection.cursor()
with cursor.session():
    cursor.create_stream('hello_queue')


def handle_message(message):
    from time import sleep
    sleep(100)
    print('message_body: ', message.message_body)
    print('message_handled')


with follower.session():
    for message in follower.get_messages():
        handle_message(message=message)
        follower.ack_message(message)
# second_follower.py

from simplemq.follower import Follower
from simplemq.bind import Bind
from simplemq.connection import Connection, ConnectionConfig

connection_config = ConnectionConfig(
    host='localhost',
    port=9090,
)

connection = Connection(connection_config=connection_config)

bind = Bind(route_string='hello_queue')
follower = Follower(connection=connection, bind=bind)

cursor = connection.cursor()
with cursor.session():
    cursor.create_stream('hello_queue')


def handle_message(message):
    from time import sleep
    sleep(2)
    print('message_body: ', message.message_body)
    print('message_handled')


with follower.session():
    for message in follower.get_messages():
        handle_message(message=message)
        follower.ack_message(message)

Демонстрация работы: alt

Routing

У Bind есть два режима роутинга:

  • Direct - режим по умолчанию, который мы использовали раньше. Издатель отправляет сообщения в стрим с наименованием равным значению аргумента routing_key. Подписчик читает сообщения из стрима с наименованием равным значению аргумента routing_key.
  • REGEX_BASED - Издатель отправляет сообщения во все стримы, название которых удовлетворяет regex выражению из routing_key. Подписчик читает сообщения из всех стримов, название которых содержит удовлетворяет regex выражению из routing_key.
  1. RegexBased Routing cо стороны издателя alt
# publisher

from simplemq.publisher.publisher import SocketBasedPublisher
from simplemq.bind import Bind, BindTypes
from simplemq.connection import Connection, ConnectionConfig

connection_config = ConnectionConfig(
    host='localhost',
    port=9090,
)

connection = Connection(connection_config=connection_config)

message_body = 'hello world'

bind = Bind(route_string='.+_world', bind_type=BindTypes.REGEX_BASED)
cursor = connection.cursor()
with cursor.session():
    cursor.create_stream('hello_queue')

publisher = SocketBasedPublisher(connection=connection, bind=bind)
publisher.send_message(message_body=message_body)
# first_follower.py

from simplemq.follower import Follower
from simplemq.bind import Bind
from simplemq.connection import Connection, ConnectionConfig

connection_config = ConnectionConfig(
    host='localhost',
    port=9090,
)

connection = Connection(connection_config=connection_config)

bind = Bind(route_string='new_world')
follower = Follower(connection=connection, bind=bind)

cursor = connection.cursor()
with cursor.session():
    cursor.create_stream('new_world')


def handle_message(message):
    from time import sleep
    sleep(3)
    print('message_body: ', message.message_body)
    print('message_handled')


with follower.session():
    for message in follower.get_messages():
        handle_message(message=message)
        follower.ack_message(message)
# second_follower.py

from simplemq.follower import Follower
from simplemq.bind import Bind
from simplemq.connection import Connection, ConnectionConfig

connection_config = ConnectionConfig(
    host='localhost',
    port=9090,
)

connection = Connection(connection_config=connection_config)

bind = Bind(route_string='old_world')
follower = Follower(connection=connection, bind=bind)

cursor = connection.cursor()
with cursor.session():
    cursor.create_stream('old_world')


def handle_message(message):
    from time import sleep
    sleep(3)
    print('message_body: ', message.message_body)
    print('message_handled')


with follower.session():
    for message in follower.get_messages():
        handle_message(message=message)
        follower.ack_message(message)

Демонстрация работы: alt

  1. RegexBased Routing cо стороны подписчика alt
# first_publisher.py

from simplemq.publisher.publisher import SocketBasedPublisher
from simplemq.bind import Bind
from simplemq.connection import Connection, ConnectionConfig

connection_config = ConnectionConfig(
    host='localhost',
    port=9090,
)

connection = Connection(connection_config=connection_config)

message_body = 'hello world'

bind = Bind(route_string='old_world')
cursor = connection.cursor()
with cursor.session():
    cursor.create_stream('old_world')

publisher = SocketBasedPublisher(connection=connection, bind=bind)
publisher.send_message(message_body=message_body)
# second_publisher.py

from simplemq.publisher.publisher import SocketBasedPublisher
from simplemq.bind import Bind
from simplemq.connection import Connection, ConnectionConfig

connection_config = ConnectionConfig(
    host='localhost',
    port=9090,
)

connection = Connection(connection_config=connection_config)

message_body = 'hello world'

bind = Bind(route_string='new_world')
cursor = connection.cursor()
with cursor.session():
    cursor.create_stream('new_world')

publisher = SocketBasedPublisher(connection=connection, bind=bind)
publisher.send_message(message_body=message_body)
# follower

from simplemq.follower import Follower
from simplemq.bind import Bind, BindTypes
from simplemq.connection import Connection, ConnectionConfig

connection_config = ConnectionConfig(
    host='localhost',
    port=9090,
)

connection = Connection(connection_config=connection_config)

bind = Bind(route_string='.+world', bind_type=BindTypes.REGEX_BASED)
follower = Follower(connection=connection, bind=bind)


def handle_message(message):
    from time import sleep
    sleep(3)
    print('message_body: ', message.message_body)
    print('message_handled')


with follower.session():
    for message in follower.get_messages():
        handle_message(message=message)
        follower.ack_message(message)

Демонстрация работы: alt

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncio-simple-mq-1.0.0.tar.gz (14.0 kB view details)

Uploaded Source

File details

Details for the file asyncio-simple-mq-1.0.0.tar.gz.

File metadata

  • Download URL: asyncio-simple-mq-1.0.0.tar.gz
  • Upload date:
  • Size: 14.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.0

File hashes

Hashes for asyncio-simple-mq-1.0.0.tar.gz
Algorithm Hash digest
SHA256 b21a29c1cd79725aa8ae18e3cbc11af7c3f6bd3f6ee348293b1dc2e59cfb86ac
MD5 de85d92bea723a5d3210b5ac908d1ac6
BLAKE2b-256 38350613589225596c67a16d9f91cf19118d0d82ace779299cc58c6e3bd990fd

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page