Remote AMQP mock
Project description
AMQP Mock
Installation
pip3 install amqp-mock
Overview
Test Publishing
from amqp_mock import create_amqp_mock
# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
# 2. Publish message via "system under test"
publish_message([1, 2, 3], "exchange")
# 3. Test message has been published
messages = await mock.client.get_exchange_messages("exchange")
assert messages[0].value == [1, 2, 3]
Full code available here: ./examples/publish_example.py
Test Consuming
from amqp_mock import create_amqp_mock, Message, MessageStatus
# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
# 2. Mock next message
await mock.client.publish_message("queue", Message([1, 2, 3]))
# 3. Consume message via "system under test"
consume_message("queue")
# 4. Test message has been consumed
history = await mock.client.get_queue_message_history("queue")
assert history[0].status == MessageStatus.ACKED
Full code available here: ./examples/consume_example.py
Mock Server
Start server
import asyncio
from amqp_mock import AmqpServer, HttpServer, Storage, create_amqp_mock
async def run() -> None:
storage = Storage()
http_server = HttpServer(storage, port=8080)
amqp_server = AmqpServer(storage, port=5672)
async with create_amqp_mock(http_server, amqp_server):
await asyncio.Future()
asyncio.run(run())
or via docker
docker run -p 8080:80 -p 5672:5672 tsv1/amqp-mock
Publish message
POST /queues/{queue}/messages
{
"id": "9e342ac1-eef6-40b1-9eaf-053ee7887968",
"value": [1, 2, 3],
"exchange": "",
"routing_key": "",
"properties": null
}
HTTP
$ http POST localhost/queues/test_queue/messages \
value:='[1, 2, 3]' \
exchange=test_exchange
HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json
Python
from amqp_mock import AmqpMockClient, Message
mock_client = AmqpMockClient()
message = Message([1, 2, 3], exchange="test_exchange")
await mock_client.publish_message("test_queue", message)
Get queue message history
GET /queues/{queue}/messages/history
HTTP
$ http GET localhost/queues/test_queue/messages/history
HTTP/1.1 200 OK
Content-Length: 190
Content-Type: application/json; charset=utf-8
[
{
"message": {
"exchange": "test_exchange",
"id": "94459a41-9119-479a-98c9-80bc9dabb719",
"properties": null,
"routing_key": "",
"value": [1, 2, 3]
},
"queue": "test_queue",
"status": "ACKED"
}
]
Python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
await mock_client.get_queue_message_history("test_queue")
# [
# <QueuedMessage message=<Message value=[1, 2, 3], exchange='test_exchange', routing_key=''>,
# queue='test_queue',
# status=MessageStatus.ACKED>
# ]
Get exchange messages
GET /exchanges/{exchange}/messages
HTTP
$ http GET localhost/exchanges/test_exchange/messages
HTTP/1.1 200 OK
Content-Length: 423
Content-Type: application/json; charset=utf-8
[
{
"exchange": "test_exchange",
"id": "63fd1646-bdc1-4baa-9780-e337a9ab109c",
"properties": {
"app_id": "",
"cluster_id": "",
"content_encoding": "",
"content_type": "",
"correlation_id": "",
"delivery_mode": 1,
"expiration": "",
"headers": null,
"message_id": "5ec9024c74eca2e419fd7e29f7be846c",
"message_type": "",
"priority": null,
"reply_to": "",
"timestamp": null,
"user_id": ""
},
"routing_key": "",
"value": [1, 2, 3]
}
]
Python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
messages = await mock_client.get_exchange_messages("test_exchange")
# [
# <Message value=[1, 2, 3], exchange='test_exchange', routing_key=''>
# ]
Delete exchange messages
DELETE /exchanges/{exchange}/messages
HTTP
$ http DELETE localhost/exchanges/test_exchange/messages
HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json
Python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
await mock_client.delete_exchange_messages("test_exchange")
Reset
DELETE /
HTTP
$ http DELETE localhost/
HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json
Python
from amqp_mock import AmqpMockClient
mock_client = AmqpMockClient()
await mock_client.reset()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file amqp_mock-0.6.2.tar.gz.
File metadata
- Download URL: amqp_mock-0.6.2.tar.gz
- Upload date:
- Size: 19.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d8496686a1d657f19a474dfe1fecbb12aeb071e8384c7234380c7f61cf7d85d7
|
|
| MD5 |
f0a97354d3d7641ec5a1ec658524f493
|
|
| BLAKE2b-256 |
058e8eef013c56efeab660c289eae69d65918f5dfe1282425bf554514ee0955e
|
File details
Details for the file amqp_mock-0.6.2-py3-none-any.whl.
File metadata
- Download URL: amqp_mock-0.6.2-py3-none-any.whl
- Upload date:
- Size: 18.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8d1f422777bdef0300da9fffba8faad02d55ada84ff09fde7423d52c5ff91a69
|
|
| MD5 |
de93b1aa7b03ece9bda1f841dab5da98
|
|
| BLAKE2b-256 |
402db039637bb1a6b28d17e11739589e2d62b7cd959f1e81f7641552616fa13b
|