Skip to main content

Remote AMQP mock

Project description

AMQP Mock

Codecov PyPI PyPI - Downloads Python Version

Installation

pip3 install amqp-mock

Overview

Test Publishing

from amqp_mock import create_amqp_mock

# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
    # 2. Publish message via "system under test"
    publish_message([1, 2, 3], "exchange")

    # 3. Test message has been published
    messages = await mock.client.get_exchange_messages("exchange")
    assert messages[0].value == [1, 2, 3]

Full code available here: ./examples/publish_example.py

Test Consuming

from amqp_mock import create_amqp_mock, Message, MessageStatus

# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
    # 2. Mock next message
    await mock.client.publish_message("queue", Message([1, 2, 3]))

    # 3. Consume message via "system under test"
    consume_message("queue")

    # 4. Test message has been consumed
    history = await mock.client.get_queue_message_history("queue")
    assert history[0].status == MessageStatus.ACKED

Full code available here: ./examples/consume_example.py

Mock Server

Start server

import asyncio

from amqp_mock import AmqpServer, HttpServer, Storage, create_amqp_mock


async def run() -> None:
    storage = Storage()
    http_server = HttpServer(storage, port=8080)
    amqp_server = AmqpServer(storage, port=5672)
    async with create_amqp_mock(http_server, amqp_server):
        await asyncio.Future()

asyncio.run(run())

Publish message

POST /queues/{queue}/messages

{
    "id": "9e342ac1-eef6-40b1-9eaf-053ee7887968",
    "value": [1, 2, 3],
    "exchange": "",
    "routing_key": "",
    "properties": null
}
HTTP

$ http POST localhost/queues/test_queue/messages \
    value:='[1, 2, 3]' \
    exchange=test_exchange

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json

Python

from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
message = Message([1, 2, 3], exchange="test_exchange")
await mock_client.publish_message("test_queue", message)

Get queue message history

GET /queues/{queue}/messages/history

HTTP

$ http GET localhost/queues/test_queue/messages/history

HTTP/1.1 200 OK
Content-Length: 190
Content-Type: application/json; charset=utf-8

[
    {
        "message": {
            "exchange": "test_exchange",
            "id": "94459a41-9119-479a-98c9-80bc9dabb719",
            "properties": null,
            "routing_key": "",
            "value": [1, 2, 3]
        },
        "queue": "test_queue",
        "status": "ACKED"
    }
]

Python

from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
await mock_client.get_queue_message_history("test_queue")
# [
#   <QueuedMessage message=<Message value=[1, 2, 3], exchange='test_exchange', routing_key=''>,
#                  queue='test_queue',
#                  status=MessageStatus.ACKED>
# ]

Get exchange messages

GET /exchanges/{exchange}/messages

HTTP

$ http GET localhost/exchanges/test_exchange/messages

HTTP/1.1 200 OK
Content-Length: 423
Content-Type: application/json; charset=utf-8

[
    {
        "exchange": "test_exchange",
        "id": "63fd1646-bdc1-4baa-9780-e337a9ab109c",
        "properties": {
            "app_id": "",
            "cluster_id": "",
            "content_encoding": "",
            "content_type": "",
            "correlation_id": "",
            "delivery_mode": 1,
            "expiration": "",
            "headers": null,
            "message_id": "5ec9024c74eca2e419fd7e29f7be846c",
            "message_type": "",
            "priority": null,
            "reply_to": "",
            "timestamp": null,
            "user_id": ""
        },
        "routing_key": "",
        "value": [1, 2, 3]
    }
]

Python

from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
messages = await mock_client.get_exchange_messages("test_exchange")
# [
#   <Message value=[1, 2, 3], exchange='test_exchange', routing_key=''>
# ]

Delete exchange messages

DELETE /exchanges/{exchange}/messages

HTTP

$ http DELETE localhost/exchanges/test_exchange/messages

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json

Python

from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
await mock_client.delete_exchange_messages("test_exchange")

Reset

DELETE /

HTTP

$ http DELETE localhost/

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json

Python

from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
await mock_client.reset()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for amqp-mock, version 0.2.0
Filename, size File type Python version Upload date Hashes
Filename, size amqp_mock-0.2.0-py3-none-any.whl (19.9 kB) File type Wheel Python version py3 Upload date Hashes View
Filename, size amqp-mock-0.2.0.tar.gz (17.1 kB) File type Source Python version None Upload date Hashes View

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page