Skip to main content

Mixed sync-async queue to interoperate between asyncio tasks and classic threads

Project description

https://travis-ci.com/aio-libs/janus.svg?branch=master https://codecov.io/gh/aio-libs/janus/branch/master/graph/badge.svg https://img.shields.io/pypi/v/janus.svg Chat on Gitter

Mixed sync-async queue, supposed to be used for communicating between classic synchronous (threaded) code and asynchronous (in terms of asyncio) one.

Like Janus god the queue object from the library has two faces: synchronous and asynchronous interface.

Synchronous is fully compatible with standard queue, asynchronous one follows asyncio queue design.

Usage example (Python 3.7+)

import asyncio
import janus


def threaded(sync_q):
    for i in range(100):
        sync_q.put(i)
    sync_q.join()


async def async_coro(async_q):
    for i in range(100):
        val = await async_q.get()
        assert val == i
        async_q.task_done()


async def main():
    queue = janus.Queue()
    loop = asyncio.get_running_loop()
    fut = loop.run_in_executor(None, threaded, queue.sync_q)
    await async_coro(queue.async_q)
    await fut
    queue.close()
    await queue.wait_closed()


asyncio.run(main())

Usage example (Python 3.5 and 3.6)

import asyncio
import janus

loop = asyncio.get_event_loop()


def threaded(sync_q):
    for i in range(100):
        sync_q.put(i)
    sync_q.join()


async def async_coro(async_q):
    for i in range(100):
        val = await async_q.get()
        assert val == i
        async_q.task_done()


async def main():
    queue = janus.Queue()
    fut = loop.run_in_executor(None, threaded, queue.sync_q)
    await async_coro(queue.async_q)
    await fut
    queue.close()
    await queue.wait_closed()

try:
    loop.run_until_complete(main())
finally:
    loop.close()

Communication channels

aio-libs google group: https://groups.google.com/forum/#!forum/aio-libs

Feel free to post your questions and ideas here.

gitter chat https://gitter.im/aio-libs/Lobby

License

janus library is offered under Apache 2 license.

Thanks

The library development is sponsored by DataRobot (https://datarobot.com)

Changes

0.6.1 (2020-10-26)

  • Raise RuntimeError on queue.join() after queue closing. #295
  • Replace timeout type from Optional[int] to Optional[float] #267

0.6.0 (2020-10-10)

  • Drop Python 3.5, the minimal supported version is Python 3.6
  • Support Python 3.9
  • Refomat with black

0.5.0 (2020-04-23)

  • Remove explicit loop arguments and forbid creating queues outside event loops #246

0.4.0 (2018-07-28)

  • Add py.typed macro #89
  • Drop python 3.4 support and fix minimal version python3.5.3 #88
  • Add property with that indicates if queue is closed #86

0.3.2 (2018-07-06)

  • Fixed python 3.7 support #97

0.3.1 (2018-01-30)

  • Fixed bug with join() in case tasks are added by sync_q.put() #75

0.3.0 (2017-02-21)

  • Expose unfinished_tasks property #34

0.2.4 (2016-12-05)

  • Restore tarball deploying

0.2.3 (2016-07-12)

  • Fix exception type

0.2.2 (2016-07-11)

  • Update asyncio.async() to use asyncio.ensure_future() #6

0.2.1 (2016-03-24)

  • Fix python setup.py test command #4

0.2.0 (2015-09-20)

  • Support Python 3.5

0.1.5 (2015-07-24)

  • Use loop.time() instead of time.monotonic()

0.1.1 (2015-06-12)

  • Fix some typos in README and setup.py
  • Add addtional checks for loop closing
  • Mention DataRobot

0.1.0 (2015-06-11)

  • Initial release

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for janus, version 0.6.1
Filename, size File type Python version Upload date Hashes
Filename, size janus-0.6.1-py3-none-any.whl (11.3 kB) File type Wheel Python version py3 Upload date Hashes View
Filename, size janus-0.6.1.tar.gz (19.2 kB) File type Source Python version None Upload date Hashes View

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring DigiCert DigiCert EV certificate Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page