Skip to main content

Thread-safe async-aware queue for Python

Project description

Mixed sync-async queue, supposed to be used for communicating between classic synchronous (threaded) code and asynchronous one, between two asynchronous codes in different threads, and for any other combination that you want. Based on the queue module. Built on the aiologic package. Inspired by the janus library.

Like Culsans god, the queue object from the library has two faces: synchronous and asynchronous interface. Unlike Janus library, synchronous interface supports eventlet, gevent, and threading, while asynchronous interface supports asyncio, trio, and anyio.

Synchronous is fully compatible with standard queue, asynchronous one follows asyncio queue design.

Usage

Three queues are available:

  • Queue

  • LifoQueue

  • PriorityQueue

Each has two properties: sync_q and async_q.

Use the first to get synchronous interface and the second to get asynchronous one.

Example

import anyio
import culsans


def sync_run(sync_q: culsans.SyncQueue[int]) -> None:
    for i in range(100):
        sync_q.put(i)
    else:
        sync_q.join()


async def async_run(async_q: culsans.AsyncQueue[int]) -> None:
    for i in range(100):
        value = await async_q.get()

        assert value == i

        async_q.task_done()


async def main() -> None:
    queue: culsans.Queue[int] = culsans.Queue()

    async with anyio.create_task_group() as tasks:
        tasks.start_soon(anyio.to_thread.run_sync, sync_run, queue.sync_q)
        tasks.start_soon(async_run, queue.async_q)

    queue.shutdown()


anyio.run(main)

Subclasses

You can create your own queues by inheriting from existing queue classes as if you were using the queue module. For example, this is how you can create an unordered queue that contains only unique items:

from culsans import Queue


class UniqueQueue(Queue):
    def _init(self, maxsize):
        self.data = set()

    def _qsize(self):
        return len(self.data)

    def _put(self, item):
        self.data.add(item)

    def _get(self):
        return self.data.pop()
sync_q = UniqueQueue().sync_q

sync_q.put_nowait(23)
sync_q.put_nowait(42)
sync_q.put_nowait(23)

assert sync_q.qsize() == 2
assert sorted(sync_q.get_nowait() for _ in range(2)) == [23, 42]

All four of these methods are called in exclusive access mode, so you can freely create your subclasses without thinking about whether your methods are thread-safe or not.

Greenlets

Libraries such as eventlet and gevent use greenlets instead of tasks. Since they do not use async-await syntax, their code is similar to synchronous code. There are three ways that you can tell culsans that you want to use greenlets instead of threads:

  • Set aiologic.lowlevel.current_green_library_tlocal.name (for the current thread).

  • Patch the threading module (for the main thread).

  • Specify AIOLOGIC_GREEN_LIBRARY environment variable (for all threads).

The value is the name of the library that you want to use.

Checkpoints

Sometimes it is useful when each asynchronous call switches execution to the next task and checks for cancellation and timeouts. For example, if you want to distribute CPU usage across all tasks. There are two ways to do this:

  • Set aiologic.lowlevel.<library>_checkpoints_cvar (for the current context).

  • Specify AIOLOGIC_<LIBRARY>_CHECKPOINTS environment variable (for all contexts).

The value is True or False for the first way, and a non-empty or empty string for the second.

Checkpoints are enabled by default for the trio library.

Performance

Being built on the aiologic package, the culsans library has speed advantages. In sync -> async tests, culsans.Queue is typically 4 times faster than janus.Queue on CPython, and 8 times faster on PyPy. However, if your application is performance sensitive and you do not need API compatibility, try aiologic queues. They are 7 times faster and 24 times faster in the same tests.

Communication channels

GitHub Discussions: https://github.com/x42005e1f/culsans/discussions

Feel free to post your questions and ideas here.

License

The culsans library is offered under Zero-Clause BSD license.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

culsans-0.1.0.tar.gz (10.6 kB view details)

Uploaded Source

Built Distribution

culsans-0.1.0-py3-none-any.whl (8.4 kB view details)

Uploaded Python 3

File details

Details for the file culsans-0.1.0.tar.gz.

File metadata

  • Download URL: culsans-0.1.0.tar.gz
  • Upload date:
  • Size: 10.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.8.10

File hashes

Hashes for culsans-0.1.0.tar.gz
Algorithm Hash digest
SHA256 2da57d3ab6069bd011ce6f1e8b41348638de7a52dd03e71a4fda138266d50632
MD5 4bc9729d63fd6b29d89c4b918188485a
BLAKE2b-256 0b7bf4219a3dd6cd0c3b3e095f82e1e1a2c4b0b070850b88b7dab5ade98698a2

See more details on using hashes here.

File details

Details for the file culsans-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: culsans-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 8.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.8.10

File hashes

Hashes for culsans-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3788adcab025bfa9961b601d058b0f44649c92e4db3089a51a84d9973eb699f9
MD5 adfe8c8a5756931b3924f9fbd664ef9b
BLAKE2b-256 b5a0d32776057e4bdc03b01002c6ae212f66de7dc377b8c212aa4b62873f8705

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page