Skip to main content

Structured concurrency and parallelism for asyncio, threading, and multiprocessing using queues.

Project description

osiiso structured task queue banner

osiiso

Structured task queues for Python across asyncio, threads, and processes.

CI Docs Python 3.13+ Typed package License: MIT

osiiso gives you one compact queue API for three execution backends:

  • AsyncQueue for coroutine-heavy I/O and async integrations.
  • ThreadQueue for blocking I/O, synchronous SDKs, filesystem work, and SQLite writes.
  • ProcessQueue for CPU-heavy work that benefits from separate subprocesses.

It is dependency-free at runtime, typed with py.typed, and built around a predictable workflow: submit tasks, apply options, run the queue, then inspect handles and a RunSummary.

Contents

Why osiiso

  • Shared API across async, thread, and process execution.
  • Priority scheduling where lower priority numbers run first.
  • Retries with optional delay and exponential backoff.
  • Per-task timeouts and queue-level run timeouts.
  • Graceful shutdown with must_complete task protection.
  • Batch workflows with submit(), map(), and group().
  • Awaitable async handles and blocking sync handles.
  • Structured RunSummary and immutable TaskResult records.
  • Lifecycle hooks for on_start, on_complete, and on_retry.
  • Optional uvloop integration through osiiso.run().

Installation

pip install osiiso

With optional uvloop support:

pip install "osiiso[uvloop]"

The project targets Python 3.13 and newer.

Choose a queue

Workload Queue Good for
Coroutine-based I/O AsyncQueue HTTP clients, async databases, websockets, API fan-out
Blocking synchronous work ThreadQueue File operations, blocking SDKs, SQLite writes, sync integrations
CPU-heavy functions ProcessQueue Ranking, parsing, scoring, transformations, analytics

The queues intentionally look similar, so work can move between execution models with minimal changes.

Quick start

import asyncio
import osiiso


async def fetch(name: str) -> str:
    await asyncio.sleep(0.1)
    return f"fetched {name}"


async def main():
    async with osiiso.AsyncQueue(workers=4) as q:
        q.submit(fetch, "users", priority=0)
        q.submit(fetch, "posts", retries=2, retry_delay=0.25, timeout=5)

        summary = await q.run(strict=True)
        return summary.values


print(osiiso.run(main()))

Core concepts

submit()

Use submit() for one task. It returns a handle immediately.

handle = q.submit(fetch_user, "ada", retries=3, timeout=10, name="fetch-user")

Async handles are awaitable:

result = await handle
value = handle.value()

Thread and process handles are blocking:

result = handle.wait(timeout=5)
value = handle.value()

map()

Use map() for one callable over many inputs.

q.map(download, urls, retries=2, group_id="downloads")
q.map(add, [(1, 2), (3, 4), (5, 6)], name="add")
q.map(request, [{"method": "GET", "url": "https://example.com"}])

Tuple entries are unpacked as positional arguments. Mapping entries are passed as keyword arguments.

group()

Use group() for a named batch, especially when tasks have different callables.

group = q.group(
    [
        (extract, "db"),
        (transform, raw_records),
        (load, destination),
    ],
    group_id="etl-batch-1",
)

summary = q.run()
values = group.values()

For AsyncQueue, use await group.wait() and await group.values().

Bound tasks

Bind a callable to a queue with @q.task().

async with osiiso.AsyncQueue(workers=4) as q:
    @q.task(retries=2, retry_delay=0.25, name="fetch")
    async def fetch(url: str) -> str:
        return await client.get(url)

    fetch("https://example.com")
    fetch.map(["https://example.org", "https://example.net"])

    summary = await q.run(strict=True)

Queue examples

AsyncQueue

import asyncio
import osiiso


async def fetch(name: str) -> str:
    await asyncio.sleep(0.1)
    return f"fetched {name}"


async def main():
    async with osiiso.AsyncQueue(workers=4) as q:
        q.map(fetch, ["users", "posts", "comments"], retries=2, timeout=5)
        summary = await q.run(strict=True)
        print(summary.values)


osiiso.run(main())

ThreadQueue

import time
import osiiso


def resize(path: str) -> str:
    time.sleep(0.1)
    return f"resized {path}"


with osiiso.ThreadQueue(workers=4) as q:
    q.map(resize, ["a.png", "b.png", "c.png"], name="resize")
    summary = q.run(strict=True)

print(summary.values)

ProcessQueue

Keep process tasks importable and pickleable. Top-level functions and plain data arguments are the safest choice.

import osiiso


def score(n: int) -> int:
    return sum(i * i for i in range(n))


if __name__ == "__main__":
    with osiiso.ProcessQueue(workers=4) as q:
        q.map(score, [10_000, 20_000, 30_000], name="score")
        summary = q.run(strict=True)

    print(summary.values)

Task options

Task behavior can be configured inline or through an immutable TaskOptions object.

from osiiso import TaskOptions


retrying = TaskOptions(retries=3, retry_delay=0.5, backoff=2, timeout=10)
urgent = retrying.replace(priority=0, name="urgent-api-call")

q.submit(fetch, url, opts=urgent)
q.submit(fetch, other_url, retries=3, retry_delay=0.5, backoff=2)
Option Default Meaning
priority 3 Lower numbers run first.
must_complete False Protects a task during graceful shutdown.
timeout None Per-task timeout in seconds.
retries 0 Retry attempts after the first failure.
retry_delay 0.0 Delay before the first retry.
backoff 1.0 Multiplier applied after each retry.
delay None Run after this many seconds.
run_at None Run at an absolute epoch timestamp.
name None Custom result and hook name.
group_id None Group label for summaries.
detached False Metadata flag for fire-and-forget style tasks.

TaskOptions validates invalid combinations immediately. For example, delay and run_at are mutually exclusive, negative retries are rejected, and unknown submit options raise TypeError.

Results and errors

Every run() returns a RunSummary.

summary.ok
summary.succeeded
summary.failed
summary.cancelled
summary.timed_out
summary.values
summary.errors
summary.by_task_id()
summary.by_name()
summary.by_group()
summary.raise_for_errors()
summary.display()

Use strict=True when failures should raise ExecutionError after the run finishes:

summary = await q.run(strict=True)

Each task result is stored as a TaskResult with task id, name, status, value, exception, attempts, priority, timing, group id, and cancellation metadata.

Lifecycle and policies

Queues support finite and long-running modes:

q = osiiso.AsyncQueue(mode="finite", fail_policy="continue", on_exit="complete_priority")
  • mode="finite" runs pending work and exits.
  • mode="infinite" keeps workers alive until shutdown or timeout.
  • fail_policy="continue" records failures and keeps processing.
  • fail_policy="fail_first" cancels remaining eligible work after the first failure.
  • on_exit="complete_priority" lets must_complete tasks finish during graceful shutdown.
  • on_exit="cancel" cancels eligible pending and active work on timeout or forced shutdown.

Hooks give you a simple integration point for logging, metrics, and tracing:

def completed(result: osiiso.TaskResult) -> None:
    print(result.name, result.status, result.duration)


q = osiiso.ThreadQueue(on_complete=completed)

Examples

Run the compact feature gallery:

uv run python examples/feature_gallery.py

Run the complete Hacker News style showcase:

uv run python -m examples.hackernews_showcase --limit 6

Use the live Hacker News API:

uv run python -m examples.hackernews_showcase --limit 20 --online

The showcase uses all three backends:

  • AsyncQueue fetches feeds, items, and users.
  • ThreadQueue persists records into SQLite.
  • ProcessQueue ranks stories and computes keywords.

Documentation

The MkDocs documentation lives in docs/.

Run the docs locally:

python -m pip install -e ".[docs]"
mkdocs serve

Build the docs strictly:

mkdocs build --strict

Project docs are configured for GitHub Pages at:

https://ichinga-samuel.github.io/osiiso/

Development

Install the development dependencies:

python -m pip install -e ".[dev]"

Run tests:

uv run pytest

Run Ruff:

uv run ruff check .

Build the package:

python -m build

Build the docs with the docs extra:

uv run --extra docs mkdocs build --strict

Community

Project layout

.
|-- src/osiiso/                  # Library source
|-- tests/                       # Unit tests for async, thread, process, and options behavior
|-- docs/                        # MkDocs documentation
|-- examples/feature_gallery.py  # Compact API showcase
|-- examples/hackernews_showcase # Complete multi-backend example project
|-- pyproject.toml               # Package metadata and tool configuration
`-- mkdocs.yml                   # Documentation site configuration

Public API

from osiiso import (
    AsyncQueue,
    ThreadQueue,
    ProcessQueue,
    TaskOptions,
    TaskHandle,
    SyncTaskHandle,
    TaskGroup,
    SyncTaskGroup,
    TaskResult,
    RunSummary,
    ExecutionError,
    ClosedError,
    OsiisoError,
    run,
)

License

osiiso is released under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

osiiso-0.0.1a2.tar.gz (46.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

osiiso-0.0.1a2-py3-none-any.whl (44.1 kB view details)

Uploaded Python 3

File details

Details for the file osiiso-0.0.1a2.tar.gz.

File metadata

  • Download URL: osiiso-0.0.1a2.tar.gz
  • Upload date:
  • Size: 46.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.5

File hashes

Hashes for osiiso-0.0.1a2.tar.gz
Algorithm Hash digest
SHA256 b15b2aa88661fb7e8477d6c39bd2bec251fff018e4804f7d6e945317f46df407
MD5 fdc273502ff97195477b2379e5f5e54f
BLAKE2b-256 4b7bb4cc492718434670b176dca882fc88c74d74e6cd6b154d9ff7b7b8da5f76

See more details on using hashes here.

File details

Details for the file osiiso-0.0.1a2-py3-none-any.whl.

File metadata

  • Download URL: osiiso-0.0.1a2-py3-none-any.whl
  • Upload date:
  • Size: 44.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.5

File hashes

Hashes for osiiso-0.0.1a2-py3-none-any.whl
Algorithm Hash digest
SHA256 5bc3b9c544ab398cd93cb9263dfbc2f3070e90d7f61d663c49a8a0ea36640995
MD5 9d30086a1cd0e8242c68f124bf2f0f23
BLAKE2b-256 7d638dc36be21ddcebc561164276540a5e4c2887fa63bde3e887844ad35c1bca

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page