asyncio-simple-mq
Project description
SimpleMQ
SimpleMQ - простой MQ, написанный на питоне с использованием стримов из библиотеки asyncio для написания сервера и сокетов из библиотеки socket для реализации постоянного соединения сервер-подписчик
Hello world
Publisher (Издатель) - отсылает сообщение на сервер Stream (Стрим) - Некоторое отдельное пространство для хранения сообщений. Сообщения могут храниться только внутри стримов, но при этом на сервере может быть создано множество стримов. Издатель отправляет сообщение в стрим. Подписчик в свою очередь пытается получить сообщение из стрима. Follower (Подписчик) - устанавливает постоянное TCP соединение с сервером и ждет пока в стриме появятся сообщения
В hello world примере мы рассмотрим только одного издателя, один стрим и одного подписчика
- Создаем конфигурационный файл settings.yaml с данными для запуска сервера, удобнее его положить рядом с server.py, который мы создадим далее
host:
port:
# пример
host: localhost
port: 9090
- Создаем server.py
# server.py
from simplemq.server.server import run_server
if __name__ == '__main__':
run_server(settings_yml_filepath='settings.yaml')
- Создаем follower.py
# follower.py
from simplemq.follower import Follower
from simplemq.bind import Bind
from simplemq.connection import Connection, ConnectionConfig
connection_config = ConnectionConfig(
host='localhost',
port=9090,
)
connection = Connection(connection_config=connection_config)
bind = Bind(route_string='hello_queue')
follower = Follower(connection=connection, bind=bind)
cursor = connection.cursor()
with cursor.session():
cursor.create_stream('hello_queue')
def handle_message(message):
from time import sleep
sleep(5)
print('message_body: ', message.message_body)
print('message_handled')
with follower.session():
for message in follower.get_messages():
handle_message(message=message)
follower.ack_message(message)
- Создаем publisher.py
# publisher.py
from simplemq.publisher.publisher import SocketBasedPublisher
from simplemq.bind import Bind
from simplemq.connection import Connection, ConnectionConfig
connection_config = ConnectionConfig(
host='localhost',
port=9090,
)
connection = Connection(connection_config=connection_config)
MESSAGE_BODY = 'hello world'
bind = Bind(route_string='hello_queue')
cursor = connection.cursor()
with cursor.session():
cursor.create_stream('hello_queue')
publisher = SocketBasedPublisher(connection=connection, bind=bind)
publisher.send_message(message_body=MESSAGE_BODY)
Тут появляются новые определения:
- Bind - являются роутером, то есть - в случае с издателем говорит в очередь с каким наименованием (значение аргумента routing_string) отправлять сообщение, в случае подписчика - из какой очереди ждать сообщения.
- Cursor - объект, который не участвует в обмене сообщениями, он нужен для утилити функций: создать стрим; посмотреть все стримы, которые есть на сервере и тд.
Демонстрация работы:
В этом примере мы сначала подключили подписчика, а только потом начали отправлять сообщения в стрим издателем. Мы также можем сначала заполнить стрим сообщениями и только потом включить подписчика, в таком случае он сразу получить сообщения, а не будет их ждать.
Потверждение сообщений Когда подписчик читает сообщение со стрима оно удаляется со стрима и переходит в PEL (Pending entry list). После ACK сообщения от подписчика об обработке этого сообщения оно исчезает и от туда. Есть 2 способа потвердить обработку сообщения:
- самому, вызвав у инстанса Follower метод ack_message
with follower.session():
for message in follower.get_messages():
handle_message(message=message)
follower.ack_message(message)
- передать True в аргумент auto_ack. В таком случае на сервер придет потверждение об обработке сообщения еще до вызова handle_message
with follower.session():
for message in follower.get_messages(auto_ack=True):
handle_message(message=message)
Worker
Подписчики являются воркерами. При чтении сообщения подписчиком это сообщение пропадает из стрима, поэтому второй подписчик, который подписан на тот же стрим его не прочитает. Запустим двух подписчиков, только один из них будет обрабывать сообщение 100 секунд, а второй 2. Пока первый подписчик обработывает сообщение (в нашем случае просто спит) второй подписчик может читать входящие сообщения.
# first_follower.py
from simplemq.follower import Follower
from simplemq.bind import Bind
from simplemq.connection import Connection, ConnectionConfig
connection_config = ConnectionConfig(
host='localhost',
port=9090,
)
connection = Connection(connection_config=connection_config)
bind = Bind(route_string='hello_queue')
follower = Follower(connection=connection, bind=bind)
cursor = connection.cursor()
with cursor.session():
cursor.create_stream('hello_queue')
def handle_message(message):
from time import sleep
sleep(100)
print('message_body: ', message.message_body)
print('message_handled')
with follower.session():
for message in follower.get_messages():
handle_message(message=message)
follower.ack_message(message)
# second_follower.py
from simplemq.follower import Follower
from simplemq.bind import Bind
from simplemq.connection import Connection, ConnectionConfig
connection_config = ConnectionConfig(
host='localhost',
port=9090,
)
connection = Connection(connection_config=connection_config)
bind = Bind(route_string='hello_queue')
follower = Follower(connection=connection, bind=bind)
cursor = connection.cursor()
with cursor.session():
cursor.create_stream('hello_queue')
def handle_message(message):
from time import sleep
sleep(2)
print('message_body: ', message.message_body)
print('message_handled')
with follower.session():
for message in follower.get_messages():
handle_message(message=message)
follower.ack_message(message)
Демонстрация работы:
Routing
У Bind есть два режима роутинга:
- Direct - режим по умолчанию, который мы использовали раньше. Издатель отправляет сообщения в стрим с наименованием равным значению аргумента routing_key. Подписчик читает сообщения из стрима с наименованием равным значению аргумента routing_key.
- REGEX_BASED - Издатель отправляет сообщения во все стримы, название которых удовлетворяет regex выражению из routing_key. Подписчик читает сообщения из всех стримов, название которых содержит удовлетворяет regex выражению из routing_key.
- RegexBased Routing cо стороны издателя
# publisher
from simplemq.publisher.publisher import SocketBasedPublisher
from simplemq.bind import Bind, BindTypes
from simplemq.connection import Connection, ConnectionConfig
connection_config = ConnectionConfig(
host='localhost',
port=9090,
)
connection = Connection(connection_config=connection_config)
message_body = 'hello world'
bind = Bind(route_string='.+_world', bind_type=BindTypes.REGEX_BASED)
cursor = connection.cursor()
with cursor.session():
cursor.create_stream('hello_queue')
publisher = SocketBasedPublisher(connection=connection, bind=bind)
publisher.send_message(message_body=message_body)
# first_follower.py
from simplemq.follower import Follower
from simplemq.bind import Bind
from simplemq.connection import Connection, ConnectionConfig
connection_config = ConnectionConfig(
host='localhost',
port=9090,
)
connection = Connection(connection_config=connection_config)
bind = Bind(route_string='new_world')
follower = Follower(connection=connection, bind=bind)
cursor = connection.cursor()
with cursor.session():
cursor.create_stream('new_world')
def handle_message(message):
from time import sleep
sleep(3)
print('message_body: ', message.message_body)
print('message_handled')
with follower.session():
for message in follower.get_messages():
handle_message(message=message)
follower.ack_message(message)
# second_follower.py
from simplemq.follower import Follower
from simplemq.bind import Bind
from simplemq.connection import Connection, ConnectionConfig
connection_config = ConnectionConfig(
host='localhost',
port=9090,
)
connection = Connection(connection_config=connection_config)
bind = Bind(route_string='old_world')
follower = Follower(connection=connection, bind=bind)
cursor = connection.cursor()
with cursor.session():
cursor.create_stream('old_world')
def handle_message(message):
from time import sleep
sleep(3)
print('message_body: ', message.message_body)
print('message_handled')
with follower.session():
for message in follower.get_messages():
handle_message(message=message)
follower.ack_message(message)
Демонстрация работы:
- RegexBased Routing cо стороны подписчика
# first_publisher.py
from simplemq.publisher.publisher import SocketBasedPublisher
from simplemq.bind import Bind
from simplemq.connection import Connection, ConnectionConfig
connection_config = ConnectionConfig(
host='localhost',
port=9090,
)
connection = Connection(connection_config=connection_config)
message_body = 'hello world'
bind = Bind(route_string='old_world')
cursor = connection.cursor()
with cursor.session():
cursor.create_stream('old_world')
publisher = SocketBasedPublisher(connection=connection, bind=bind)
publisher.send_message(message_body=message_body)
# second_publisher.py
from simplemq.publisher.publisher import SocketBasedPublisher
from simplemq.bind import Bind
from simplemq.connection import Connection, ConnectionConfig
connection_config = ConnectionConfig(
host='localhost',
port=9090,
)
connection = Connection(connection_config=connection_config)
message_body = 'hello world'
bind = Bind(route_string='new_world')
cursor = connection.cursor()
with cursor.session():
cursor.create_stream('new_world')
publisher = SocketBasedPublisher(connection=connection, bind=bind)
publisher.send_message(message_body=message_body)
# follower
from simplemq.follower import Follower
from simplemq.bind import Bind, BindTypes
from simplemq.connection import Connection, ConnectionConfig
connection_config = ConnectionConfig(
host='localhost',
port=9090,
)
connection = Connection(connection_config=connection_config)
bind = Bind(route_string='.+world', bind_type=BindTypes.REGEX_BASED)
follower = Follower(connection=connection, bind=bind)
def handle_message(message):
from time import sleep
sleep(3)
print('message_body: ', message.message_body)
print('message_handled')
with follower.session():
for message in follower.get_messages():
handle_message(message=message)
follower.ack_message(message)
Демонстрация работы:
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file asyncio-simple-mq-1.0.0.tar.gz
.
File metadata
- Download URL: asyncio-simple-mq-1.0.0.tar.gz
- Upload date:
- Size: 14.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | b21a29c1cd79725aa8ae18e3cbc11af7c3f6bd3f6ee348293b1dc2e59cfb86ac |
|
MD5 | de85d92bea723a5d3210b5ac908d1ac6 |
|
BLAKE2b-256 | 38350613589225596c67a16d9f91cf19118d0d82ace779299cc58c6e3bd990fd |