Service with Message Queue Python library
Project description
Message Queue Based Service
from mq4hemc import HemcMessage, HemcService
HemcService
- a class to create a separate thread which is extracting messages
from the message queue and passes them to user callback for processing.
It has a thread-safe methods to add messages to the queue synchronously
(wait until processed) and asynchronously.
HemcMessage
- a dataclass to inherit from to create a custom message.
Usage
Define a custom class for the messages:
@dataclass
class BigHemcMessage(HemcMessage):
payload: dict = field(default_factory=dict)
Create a service which is running in a separate thread and processes messages from the message queue:
service = HemcService()
Register a callback function to process messages from the message queue:
def process_cb(item: HemcMessage):
if hasattr(item, 'payload') and item.payload is not None:
# Simulate processing time
time.sleep(1)
logger.info(f"Processed message '{item.type}', payload: {item.payload}")
return item.type
service.register_process_cb(process_cb)
A callback method could also be passed to the constructor:
service = HemcService(process_cb)
Start a service:
service.start()
Send messages from another thread asynchronously (without waiting until the message is processed)
for i in range(3):
message = BigHemcMessage()
message.type = f"test{i}"
message.payload = {"key": f"value{i}"}
logger.info(f"Send {message.type} and do not wait for reply.")
status = service.send_async_msg(message)
Send a message from another thread synchronously (wait until the message is processed) and get the response from callback method.
message = BigHemcMessage()
message.type = "test_sync"
message.payload = {"key": "value"}
logger.info(f"Now send {message.type} and wait for reply...")
status = service.send_sync_msg(message)
logger.info(f"Message {message.type} processed, reply: {status}")
Stop the service.
service.stop()
service.join()
logger.info("Service stopped.")
This code will produce the following output. Please pay attention to the test_sync timestamps.
2024-05-02 14:39:28 - test_mq4hemc - INFO - Send test0 and do not wait for reply.
2024-05-02 14:39:28 - test_mq4hemc - INFO - Send test1 and do not wait for reply.
2024-05-02 14:39:28 - test_mq4hemc - INFO - Send test2 and do not wait for reply.
2024-05-02 14:39:28 - test_mq4hemc - INFO - Now send test_sync and wait for reply...
2024-05-02 14:39:29 - test_mq4hemc - INFO - Processed message 'test0'
2024-05-02 14:39:30 - test_mq4hemc - INFO - Processed message 'test1'
2024-05-02 14:39:31 - test_mq4hemc - INFO - Processed message 'test2'
2024-05-02 14:39:32 - test_mq4hemc - INFO - Processed message 'test_sync'
2024-05-02 14:39:32 - test_mq4hemc - INFO - Message test_sync processed, reply: test_sync
2024-05-02 14:39:32 - test_mq4hemc - INFO - Service stopped.
Yocto recipes
Use the following bitbake recipes python3-mq4hemc_0.0.20.bb
to install mq4hemc package:
From github.com:
DESCRIPTION = "Message Queue Based Service"
HOMEPAGE = "https://github.com/oshevchenko/mq4hemc"
LICENSE = "MIT"
LIC_FILES_CHKSUM = "file://LICENSE;md5=bd6d2057bbfc3f9442bc4dfd9a7c4580"
SRC_URI = "git://github.com/oshevchenko/mq4hemc.git;protocol=https"
SRCREV = "75a10cfbf3e16c6a2491b2295d4d4d63f4a952f1"
S = "${WORKDIR}/git"
inherit setuptools3
do_configure:prepend() {
cp ${S}/setup_git.txt ${S}/setup.py
}
From PyPi
DESCRIPTION = "Message Queue Based Service"
HOMEPAGE = "https://github.com/oshevchenko/mq4hemc"
LICENSE = "MIT"
LIC_FILES_CHKSUM = "file://LICENSE;md5=bd6d2057bbfc3f9442bc4dfd9a7c4580"
PYPI_PACKAGE = "mq4hemc"
SRC_URI[md5sum] = "5317fbdc5cc857b70f622005730ef66f"
inherit pypi setuptools3
do_configure:prepend() {
cp ${S}/setup_pypi.txt ${S}/setup.py
}
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file mq4hemc-0.2.0.tar.gz
.
File metadata
- Download URL: mq4hemc-0.2.0.tar.gz
- Upload date:
- Size: 13.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.8.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 |
6a9882c3e80c1ce7721668fad7a1582af7c30c50e345ed52c9cad51136c0f54b
|
|
MD5 |
6006401f83c4c0cb133db4fe61a5e1d6
|
|
BLAKE2b-256 |
47ab7b3dfe08adb54c7b7744f0e4563191a848286f45478388260bb2eaed6dbb
|
File details
Details for the file mq4hemc-0.2.0-py3-none-any.whl
.
File metadata
- Download URL: mq4hemc-0.2.0-py3-none-any.whl
- Upload date:
- Size: 7.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.8.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 |
3d79305c31a5dc846810e60434fbe26e50c467511948d8910ff0003038b3d109
|
|
MD5 |
32a68ff36aa8f9b73f9689a1f85e48a9
|
|
BLAKE2b-256 |
1754639f1b3e1c012126ebd49af6622640db15176b32771dd4276b28925a463d
|