Skip to main content

aio pubsub

Project description

aio_pubsub ########

  1. 安装 ==========

.. code-block:: shell

pip install aiopubsub-py3 pip install aiopubsub-py3[redis]

  1. 示例 ==========
  • 2.1 发布

.. code-block:: python

from aiopubsub import Pubsub

async def main():
    async with Pubsub(Pubsub.REDIS, port=16379, namespace="cs", role=PubsubRole.PUB) as pub:
        count = await pub.publish("foo", {"test": 1})
        print(count)
        count = await pub.publish("foo", {"test": 2})
        print(count)
        count = await pub.publish("foo", {"test": 3})
        print(count)
  • 2.2 订阅

.. code-block:: python

from aiopubsub import Pubsub, PubsubRole


async def print_msg(channel, msg):
    print(f"sub msg: {channel} -- {msg}")

async def main():
    channels = ["foo"]
    async with Pubsub(Pubsub.REDIS, port=16379, namespace="cs", role=PubsubRole.SUB) as sub:
        await sub.subscribe(*channels)
        async for k in sub.listen(handler=print_msg):
            print(k)
        await sub.unsubscribe(*channels)
  • 2.3 模糊订阅

.. code-block:: python

from aiopubsub import Pubsub

async def print_msg(channel, msg):
    print(f"psub msg: {channel} -- {msg}")

async def main():
    channels = ["foo*"]
    async with Pubsub(Pubsub.REDIS, port=16379, namespace="cs", role=PubsubRole.SUB) as psub:
        await psub.psubscribe(*channels)
        async for k in psub.listen(handler=print_msg):
            print(k)
        await psub.unsubscribe(*channels)
  • 2.4 自动识别角色【根据第一次调用函数决策,未close之前不能切换角色】

.. code-block:: python

from aiopubsub import Pubsub

async def print_msg(channel, msg):
    print(f"psub msg: {channel} -- {msg}")

async def main():
    channels = ["foo*"]
    pubsub = Pubsub(Pubsub.REDIS, port=16379, namespace="cs")
    await pubsub.publish("foo", {"test": 1})  # 角色设置为PUB,执行成功

    await pubsub.subscribe(*channels)  # 角色为PUB,无法订阅抛出异常

    await pubsub.close()  # 角色释放

    await pubsub.subscribe(*channels)  # 角色设置为SUB,执行成功

    print(pubsub.backend.role)

    await pubsub.publish("foo", {"test": 1})  # 角色为SUB,无法发布抛出异常

    async with pubsub:
        await pubsub.unsubscribe(*channels)  # 角色为SUB,执行成功
    # async with 退出作用域,角色释放

    await pubsub.publish("foo", {"test": 1})  # 角色设置为PUB,执行成功

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiopubsub-py3-1.0.9.tar.gz (8.0 kB view details)

Uploaded Source

Built Distribution

aiopubsub_py3-1.0.9-py3-none-any.whl (8.8 kB view details)

Uploaded Python 3

File details

Details for the file aiopubsub-py3-1.0.9.tar.gz.

File metadata

  • Download URL: aiopubsub-py3-1.0.9.tar.gz
  • Upload date:
  • Size: 8.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.9.7

File hashes

Hashes for aiopubsub-py3-1.0.9.tar.gz
Algorithm Hash digest
SHA256 e533553632367893ce352beeb7250d25902790e35e533b15581607b371855d8c
MD5 75bc74f2a2b3ed025b3555380392875d
BLAKE2b-256 88afd06decbfe6c5b9faa79addffabe2eb618ed9f05dcd036cfa6aef015ab1ec

See more details on using hashes here.

File details

Details for the file aiopubsub_py3-1.0.9-py3-none-any.whl.

File metadata

File hashes

Hashes for aiopubsub_py3-1.0.9-py3-none-any.whl
Algorithm Hash digest
SHA256 fd7656ffac1454d5622ecac288970672b34a6388ae58ca812204b080f993494d
MD5 3a581a3480fc9762caf84047a2030ee8
BLAKE2b-256 157db2729bad60fe3decf7882a84a4379f01c53d4e78eaf8fa3a95b60e17dc91

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page