Remote AMQP mock
Project description
AMQP Mock
Installation
pip3 install amqp-mock
Overview
Test Publishing
from amqp_mock import create_amqp_mock
# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
# 2. Publish message via "system under test"
publish_message([1, 2, 3], "exchange")
# 3. Test message has been published
messages = await mock.client.get_exchange_messages("exchange")
assert messages[0].value == [1, 2, 3]
Full code available here: ./examples/publish_example.py
Test Consuming
from amqp_mock import create_amqp_mock, Message, MessageStatus
# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
# 2. Mock next message
await mock.client.publish_message("queue", Message([1, 2, 3]))
# 3. Consume message via "system under test"
consume_message("queue")
# 4. Test message has been consumed
history = await mock.client.get_queue_message_history("queue")
assert history[0].status == MessageStatus.ACKED
Full code available here: ./examples/consume_example.py
Mock Server
Start server
import asyncio
from amqp_mock import AmqpServer, HttpServer, Storage, create_amqp_mock
async def run() -> None:
storage = Storage()
http_server = HttpServer(storage, port=8080)
amqp_server = AmqpServer(storage, port=5672)
async with create_amqp_mock(http_server, amqp_server):
await asyncio.Future()
asyncio.run(run())
or via docker
docker run -p 8080:80 -p 5672:5672 tsv1/amqp-mock
Publish message
POST /queues/{queue}/messages
{
"id": "9e342ac1-eef6-40b1-9eaf-053ee7887968",
"value": [1, 2, 3],
"exchange": "",
"routing_key": "",
"properties": null
}
HTTP
$ http POST localhost/queues/test_queue/messages \
value:='[1, 2, 3]' \
exchange=test_exchange
HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json
Python
from amqp_mock import AmqpMockClient, Message
mock_client = AmqpMockClient()
message = Message([1, 2, 3], exchange="test_exchange")
await mock_client.publish_message("test_queue", message)
Get queue message history
GET /queues/{queue}/messages/history
HTTP
$ http GET localhost/queues/test_queue/messages/history
HTTP/1.1 200 OK
Content-Length: 190
Content-Type: application/json; charset=utf-8
[
{
"message": {
"exchange": "test_exchange",
"id": "94459a41-9119-479a-98c9-80bc9dabb719",
"properties": null,
"routing_key": "",
"value": [1, 2, 3]
},
"queue": "test_queue",
"status": "ACKED"
}
]
Python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
await mock_client.get_queue_message_history("test_queue")
# [
# <QueuedMessage message=<Message value=[1, 2, 3], exchange='test_exchange', routing_key=''>,
# queue='test_queue',
# status=MessageStatus.ACKED>
# ]
Get exchange messages
GET /exchanges/{exchange}/messages
HTTP
$ http GET localhost/exchanges/test_exchange/messages
HTTP/1.1 200 OK
Content-Length: 423
Content-Type: application/json; charset=utf-8
[
{
"exchange": "test_exchange",
"id": "63fd1646-bdc1-4baa-9780-e337a9ab109c",
"properties": {
"app_id": "",
"cluster_id": "",
"content_encoding": "",
"content_type": "",
"correlation_id": "",
"delivery_mode": 1,
"expiration": "",
"headers": null,
"message_id": "5ec9024c74eca2e419fd7e29f7be846c",
"message_type": "",
"priority": null,
"reply_to": "",
"timestamp": null,
"user_id": ""
},
"routing_key": "",
"value": [1, 2, 3]
}
]
Python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
messages = await mock_client.get_exchange_messages("test_exchange")
# [
# <Message value=[1, 2, 3], exchange='test_exchange', routing_key=''>
# ]
Delete exchange messages
DELETE /exchanges/{exchange}/messages
HTTP
$ http DELETE localhost/exchanges/test_exchange/messages
HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json
Python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
await mock_client.delete_exchange_messages("test_exchange")
Reset
DELETE /
HTTP
$ http DELETE localhost/
HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json
Python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
await mock_client.reset()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
amqp_mock-0.6.2.tar.gz
(19.5 kB
view details)
Built Distribution
amqp_mock-0.6.2-py3-none-any.whl
(18.7 kB
view details)
File details
Details for the file amqp_mock-0.6.2.tar.gz
.
File metadata
- Download URL: amqp_mock-0.6.2.tar.gz
- Upload date:
- Size: 19.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.11.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d8496686a1d657f19a474dfe1fecbb12aeb071e8384c7234380c7f61cf7d85d7 |
|
MD5 | f0a97354d3d7641ec5a1ec658524f493 |
|
BLAKE2b-256 | 058e8eef013c56efeab660c289eae69d65918f5dfe1282425bf554514ee0955e |
File details
Details for the file amqp_mock-0.6.2-py3-none-any.whl
.
File metadata
- Download URL: amqp_mock-0.6.2-py3-none-any.whl
- Upload date:
- Size: 18.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.11.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8d1f422777bdef0300da9fffba8faad02d55ada84ff09fde7423d52c5ff91a69 |
|
MD5 | de93b1aa7b03ece9bda1f841dab5da98 |
|
BLAKE2b-256 | 402db039637bb1a6b28d17e11739589e2d62b7cd959f1e81f7641552616fa13b |