Skip to main content

The aio rabbitmq HTTP API client.

Project description

羽箭 Yujian

一个异步的 RabbitMQ HTTP API 客户端。A asyncio Rabbitmq HTTP API client.

羽箭

写这个库的初衷是用在 Jiama 的 console 命令中,用于显示 RPC 的服务端和客户端;但找了一圈,却没有发现一个异步且支持Python3 的库,所以写了这个库。

Rabbitmq 的安装可以使用 docker 方式,具体参见官网

sudo docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management

在启用“管理插件”运行后,可以在 http://server-name:15672/api/ 查看 Rabbitmq HTTP API 的内容。

安装 Install

pip install wangong

接口 API

yujian.api.config

Rabbitmq HTTP API 接口配置字典,键定义客户端方法名称,值定义方法调用的 API 地址、请求方法、必填项、默认值等,如:

    'overview': {
        'uri': '/api/overview',
        'method': 'get',
        'option': {
            'columns': [
                'rabbitmq_version',
                'cluster_name',
                'queue_totals.messages',
                'object_totals.queues',
            ]
        },
    },
    'whoami': {'uri': '/api/whoami', 'method': 'get'},

你可以根据需要扩展此配置。

client = await Client().init('http://localhost:15672')

客户端初始化方法。

await client.close()

客户端销毁方法。

await client.declare_queue(name, vhost=None, **kwargs)

  • name str - 队列名称
  • vhost str - 队列所属的虚拟机,默认为 /
  • kwargs Any - 其他可用参数,具体参见 Rabbitmq HTTP API DOC

await client.list_queue(vhost, columns, **kwargs)

  • vhost str - 队列所属的虚拟机,默认为 /
  • columns list[str] - 返回结果中包含的列
  • kwargs Any - 其他可用参数

await delete_queue(name, vhost)

  • name str - 队列名称
  • vhost str - 队列所属的虚拟机

await client.declare_exchange(name, type, vhost, **kwargs)

  • name str - 交换机名称
  • type str - 交换机类型
  • vhost str - 交换机所属的虚拟机,默认为 /
  • kwargs Any - 其他可用参数,具体参见 Rabbitmq HTTP API DOC

await client.declare_binding(source, routing_key, destination, destination_type, vhost, **kwargs)

  • source str - 绑定的源,交换机名称
  • routing_key str - 绑定的路由键
  • destination str - 绑定的终点,交换机或队列的名称
  • destination_type str - 绑定的重点类型,exchange 或 queue
  • vhost str - 绑定所属的虚拟机,默认为 /
  • kwargs Any - 其他可用参数,具体参见 Rabbitmq HTTP API DOC

await client.publish_message(payload, routing_key, properties, exchange, vhost, **kwargs)

  • payload str - 消息内容
  • routing_key - 路由键
  • properties - 消息属性
  • exchange - 交换机
  • vhost - 虚拟机
  • kwargs - 其他可用参数,具体参见 Rabbitmq HTTP API DOC

await client.await client.invoke(act, **kwargs)

  • act str - 需要执行的动作,对应 yujian.api.config 中的键
  • kwargs Any - 需要传递的参数

client.getattr(method)

  • method str - 方法名称,对应 yujian.api.config 中的键

借助 __getattr__ 方法,你可以根据 Rabbitmq HTTP API DOC 的要求任意扩展 yujian.api.config,直接以键作为方法名在 client 对象上调用。

示例 Example

from loguru import logger

from yujian.api import Client


async def main():
    c = await Client().init('http://192.168.56.109:15672/')

    r20 = await c.whoami()
    r21 = await c.list_exchange(columns=['name'])
    r22 = await c.list_queue(
        columns=['vhost', 'name', 'node', 'messages'], sort='name', sort_reverse='true'
    )
    r23 = await c.list_user()
    r24 = await c.get_user(name='guest')
    r25 = await c.get_vhost(name='%2F')
    r26 = await c.get_permission('guest')
    r27 = await c.get_queue('test_queue_2')

    r30 = await c.invoke('declare_queue', name='test_queue')
    r31 = await c.invoke(
        'list_queue',
        columns=['vhost', 'name', 'node', 'messages'],
        sort='name',
        sort_reverse='true',
    )
    r32 = await c.invoke('delete_queue', name='test_queue')
    r33 = await c.invoke('declare_exchange', name='test_exchange', type='direct')
    r34 = await c.invoke(
        'get_message',
        queue='test_queue',
        count=5,
        ackmode='ack_requeue_true',
        encoding='auto',
    )
    r35 = await c.invoke(
        'publish_message',
        exchange='test_exchange',
        payload='ko ko ko',
        routing_key='test_queue',
        payload_encoding='string',
        properties={},
    )
    r36 = await c.invoke(
        'declare_binding',
        source='test_exchange',
        destination='test_queue',
        destination_type='queue',
        routing_key='test_queue',
    )
    r37 = await c.invoke('whoami')

    await c.close()


if __name__ == '__main__':
    config = {
        'handlers': [
            {
                'sink': sys.stdout,
                'format': '<green>{time:YYYY-MM-DD at HH:mm:ss}</green> {level} <level>{message}</level>',
            },
        ],
    }
    logger.configure(**config)

    asyncio.run(main())

License

MIT © Li zhigang

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

yujian-0.8.4.tar.gz (9.9 kB view details)

Uploaded Source

Built Distribution

yujian-0.8.4-py3-none-any.whl (7.8 kB view details)

Uploaded Python 3

File details

Details for the file yujian-0.8.4.tar.gz.

File metadata

  • Download URL: yujian-0.8.4.tar.gz
  • Upload date:
  • Size: 9.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.10.4

File hashes

Hashes for yujian-0.8.4.tar.gz
Algorithm Hash digest
SHA256 8f639920fdc3529fa8cd2ea089b7281b76ab87ee7880e9409fa7e95fe6247342
MD5 901d989fd79c3556126d8e1f6c1c8773
BLAKE2b-256 7cf4c3b3bbc69db08041ce08f4921fbafee3031e43f270054d6127acadcdbe2f

See more details on using hashes here.

File details

Details for the file yujian-0.8.4-py3-none-any.whl.

File metadata

  • Download URL: yujian-0.8.4-py3-none-any.whl
  • Upload date:
  • Size: 7.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.10.4

File hashes

Hashes for yujian-0.8.4-py3-none-any.whl
Algorithm Hash digest
SHA256 0bb08ad8d5d79a25eea5df51860b930f00fc456cfb6f442838babe443d54be00
MD5 50078b0639940bc82c6f515d17bf87fe
BLAKE2b-256 3e49dbf57914a357a4dab36f33f1c55784af2476631e3bbddd4329b9e6fa50e4

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page