Skip to main content

Written with native Asyncio NSQ package

Project description

ansq - Async NSQ

PyPI version Tests Coverage
PyPI - Python Version

Written with native Asyncio NSQ package


  • Full TCP wrapper
  • One connection for writer and reader
  • Self-healing: when the NSQ connection is lost, reconnects, sends identify and auth commands, subscribes to previous topic/channel
  • Many helper-methods in each class


  • Docs
  • Lookupd tool
  • HTTP API wrapper
  • Deflate, Snappy compressions
  • TLSv1

How to


Write and read messages:

import asyncio
from ansq import open_connection

async def main():
    nsq = await open_connection()
    print(await'test_topic', 'test_message'))
    # <NSQResponseSchema frame_type:FrameType.RESPONSE, body:b'OK', is_ok:True>
    print(await nsq.dpub('test_topic', 'test_message', 3))
    # <NSQResponseSchema frame_type:FrameType.RESPONSE, body:b'OK', is_ok:True>
    print(await nsq.mpub('test_topic', list('test_message')))
    # <NSQResponseSchema frame_type:FrameType.RESPONSE, body:b'OK', is_ok:True>

    await nsq.subscribe('test_topic', 'channel1', 2)
    processed_messages = 0
    async for message in nsq.messages():
        print('Message #{}: {}'.format(processed_messages, message))
        # Message #0: test_message
        # Message #1: t
        # Message #2: e
        # Message #3: s
        # Message #4: t
        # ...
        # Message #10: test_message
        await message.fin()
        processed_messages += 1

        if processed_messages == 10:

    single_message = await nsq.wait_for_message()
    print('Single message: ' + str(single_message))
    # message.body is bytes,
    # __str__ method decodes bytes
    # Prints decoded message.body

    # Also it has real good repr
    # <NSQMessage id="0d406ce4661af003", body=b'e', attempts=1,
    #     timestamp=1590162134305413767, timeout=60000,
    #     initialized_at=1590162194.8242455, is_timed_out=False,
    #     is_processed=False>

    # Very long task
    # ...
    # We need to touch message or it will be timed out
    await single_message.touch()
    # Continue very long task
    # ...

    # Something went wrong in task
    # in except handler re-queue message
    await single_message.req()

    # Connection should be closed
    await nsq.close()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()


import asyncio
from ansq import open_connection
from ansq.tcp.connection import NSQConnection

async def main(nsq: NSQConnection):
    await nsq.subscribe('test_topic', 'channel1', 2)
    while True:
        async for message in nsq.messages():
            print('Message: ' + str(message))
            # message.body is bytes,
            # __str__ method decodes bytes
            # Something do with messages...
            # Then, mark as processed it
            await message.fin()

        # If you doesn't break the loop
        # and doesn't set auth_reconnect parameter to False,
        # but you have reached this point,
        # it's means that the NSQ connection is lost
        # and cannot reconnect.
        # Errors in ansq package are logging with ERROR level,
        # so you can see errors in console.
        print('Connection status: ' + str(nsq.status))
        # Prints one of this:
        # Connection status: ConnectionStatus.CLOSING
        # Connection status: ConnectionStatus.CLOSED

        # You can reconnect here in try-except block
        # or just leave the function and finish the program.
        # It's all depends on the design of your application.

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    nsq_connection = loop.run_until_complete(open_connection())

    except KeyboardInterrupt:

    # You should close connection correctly

Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for ansq, version 0.0.18
Filename, size File type Python version Upload date Hashes
Filename, size ansq-0.0.18.tar.gz (18.9 kB) File type Source Python version None Upload date Hashes View
Filename, size ansq-0.0.18-py3-none-any.whl (22.3 kB) File type Wheel Python version py3 Upload date Hashes View

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Huawei Huawei PSF Sponsor Microsoft Microsoft PSF Sponsor NVIDIA NVIDIA PSF Sponsor Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page