Skip to main content

Generic asynchronous message channel with routing by predicators

Project description

message-channel

PyPI PyPI - License PyPI - Python Version Test

This library provides a message channel object which subtract particular messages from mass of messages. It's like group by of SQL or ReactiveX but for asynchronous reader.

Installation

pip install python-message-channel

Usage

For example, assume that you have a string stream which messages are prefixed by a, b, ... e and you'd like to split subchannels for messages prefixed by b or d like below.

=============================================
---------------------------------> a:foo
--------------------+
--------------------|------------> c:foo
--------------------|------------> d:foo
--------------------|------------> e:foo
====================|========================
channel             |
                   =|========================
                    +------------> b:foo
                   ==========================
                   subchannel `m.startswith('b')`

This library is a tool for handling such situation. First, create a Channel instance from a steram reader and you can receive messages by channel.recv() method. In this example, we use asyncio.Queue as a stream.

import asyncio

from message_channel import Channel

async def main():
    # Create original stream
    stream = asyncio.Queue()

    # Create stream reader
    async def reader():
        return await stream.get()

    # Create stream channel
    async with Channel(reader) as channel:
        stream.put_nowait('a:foo')
        stream.put_nowait('b:foo')
        stream.put_nowait('c:foo')
        stream.put_nowait('d:foo')
        stream.put_nowait('e:foo')
        assert (await channel.recv()) == 'a:foo'
        assert (await channel.recv()) == 'b:foo'
        assert (await channel.recv()) == 'c:foo'
        assert (await channel.recv()) == 'd:foo'
        assert (await channel.recv()) == 'e:foo'


if __name__ == '__main__':
    asyncio.run(main())

And you can split the channel by channel.split() method by a predicator like

    async with Channel(reader) as channel:
        def predicator(m):
            return m.startswith('b:')

        async with channel.split(predicator) as sub:
            stream.put_nowait('a:foo')
            stream.put_nowait('b:foo')
            stream.put_nowait('c:foo')
            stream.put_nowait('d:foo')
            stream.put_nowait('e:foo')
            # sub receive messages starts from 'b:'
            assert (await sub.recv()) == 'b:foo'
            # channel (original) receive messages other than above
            assert (await channel.recv()) == 'a:foo'
            assert (await channel.recv()) == 'c:foo'
            assert (await channel.recv()) == 'd:foo'
            assert (await channel.recv()) == 'e:foo'

API documentation

https://fixpoint.github.io/python-message-channel/

powered by pdoc.

License

Distributed under the terms of the MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

python-message-channel-0.2.1.tar.gz (5.2 kB view details)

Uploaded Source

Built Distribution

python_message_channel-0.2.1-py3-none-any.whl (5.4 kB view details)

Uploaded Python 3

File details

Details for the file python-message-channel-0.2.1.tar.gz.

File metadata

  • Download URL: python-message-channel-0.2.1.tar.gz
  • Upload date:
  • Size: 5.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.4 CPython/3.9.1 Linux/5.4.0-1036-azure

File hashes

Hashes for python-message-channel-0.2.1.tar.gz
Algorithm Hash digest
SHA256 c64d1903545208d60366dd91f6f9c0cfdcfc09851cb35f860cc4c004e87b6beb
MD5 631dc4ea7ff10a55218c775322056b45
BLAKE2b-256 594cbeb72ceaae1be379a6717bf37a274fc566be401541d2deff33deace6858f

See more details on using hashes here.

File details

Details for the file python_message_channel-0.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for python_message_channel-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8f02486585c7a25a55a73f202f2e026d3d9f4d9b6c9c5f0ebc933f6f272f79e1
MD5 2b269b0b5f418c2d29479f62fff6dcb6
BLAKE2b-256 2b2f2d35c2dfcf4af463f906d2ae564d33c1cfc142ea0e616f0e7fac8ab4d756

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page