Structured concurrency and parallelism for asyncio, threading, and multiprocessing using queues.
Project description
osiiso
Structured task queues for Python across asyncio, threads, and processes.
osiiso gives you one compact queue API for three execution backends:
AsyncQueuefor coroutine-heavy I/O and async integrations.ThreadQueuefor blocking I/O, synchronous SDKs, filesystem work, and SQLite writes.ProcessQueuefor CPU-heavy work that benefits from separate subprocesses.
It is dependency-free at runtime, typed with py.typed, and built around a predictable workflow: submit tasks, apply options, run the queue, then inspect handles and a RunSummary.
Contents
- Why osiiso
- Installation
- Choose a queue
- Quick start
- Core concepts
- Task options
- Results and errors
- Examples
- Documentation
- Development
- Community
Why osiiso
- Shared API across async, thread, and process execution.
- Priority scheduling where lower priority numbers run first.
- Retries with optional delay and exponential backoff.
- Per-task timeouts and queue-level run timeouts.
- Graceful shutdown with
must_completetask protection. - Batch workflows with
submit(),map(), andgroup(). - Awaitable async handles and blocking sync handles.
- Structured
RunSummaryand immutableTaskResultrecords. - Lifecycle hooks for
on_start,on_complete, andon_retry. - Optional
uvloopintegration throughosiiso.run().
Installation
pip install osiiso
With optional uvloop support:
pip install "osiiso[uvloop]"
The project targets Python 3.13 and newer.
Choose a queue
| Workload | Queue | Good for |
|---|---|---|
| Coroutine-based I/O | AsyncQueue |
HTTP clients, async databases, websockets, API fan-out |
| Blocking synchronous work | ThreadQueue |
File operations, blocking SDKs, SQLite writes, sync integrations |
| CPU-heavy functions | ProcessQueue |
Ranking, parsing, scoring, transformations, analytics |
The queues intentionally look similar, so work can move between execution models with minimal changes.
Quick start
import asyncio
import osiiso
async def fetch(name: str) -> str:
await asyncio.sleep(0.1)
return f"fetched {name}"
async def main():
async with osiiso.AsyncQueue(workers=4) as q:
q.submit(fetch, "users", priority=0)
q.submit(fetch, "posts", retries=2, retry_delay=0.25, timeout=5)
summary = await q.run(strict=True)
return summary.values
print(osiiso.run(main()))
Core concepts
submit()
Use submit() for one task. It returns a handle immediately.
handle = q.submit(fetch_user, "ada", retries=3, timeout=10, name="fetch-user")
Async handles are awaitable:
result = await handle
value = handle.value()
Thread and process handles are blocking:
result = handle.wait(timeout=5)
value = handle.value()
map()
Use map() for one callable over many inputs.
q.map(download, urls, retries=2, group_id="downloads")
q.map(add, [(1, 2), (3, 4), (5, 6)], name="add")
q.map(request, [{"method": "GET", "url": "https://example.com"}])
Tuple entries are unpacked as positional arguments. Mapping entries are passed as keyword arguments.
group()
Use group() for a named batch, especially when tasks have different callables.
group = q.group(
[
(extract, "db"),
(transform, raw_records),
(load, destination),
],
group_id="etl-batch-1",
)
summary = q.run()
values = group.values()
For AsyncQueue, use await group.wait() and await group.values().
Bound tasks
Bind a callable to a queue with @q.task().
async with osiiso.AsyncQueue(workers=4) as q:
@q.task(retries=2, retry_delay=0.25, name="fetch")
async def fetch(url: str) -> str:
return await client.get(url)
fetch("https://example.com")
fetch.map(["https://example.org", "https://example.net"])
summary = await q.run(strict=True)
Queue examples
AsyncQueue
import asyncio
import osiiso
async def fetch(name: str) -> str:
await asyncio.sleep(0.1)
return f"fetched {name}"
async def main():
async with osiiso.AsyncQueue(workers=4) as q:
q.map(fetch, ["users", "posts", "comments"], retries=2, timeout=5)
summary = await q.run(strict=True)
print(summary.values)
osiiso.run(main())
ThreadQueue
import time
import osiiso
def resize(path: str) -> str:
time.sleep(0.1)
return f"resized {path}"
with osiiso.ThreadQueue(workers=4) as q:
q.map(resize, ["a.png", "b.png", "c.png"], name="resize")
summary = q.run(strict=True)
print(summary.values)
ProcessQueue
Keep process tasks importable and pickleable. Top-level functions and plain data arguments are the safest choice.
import osiiso
def score(n: int) -> int:
return sum(i * i for i in range(n))
if __name__ == "__main__":
with osiiso.ProcessQueue(workers=4) as q:
q.map(score, [10_000, 20_000, 30_000], name="score")
summary = q.run(strict=True)
print(summary.values)
Task options
Task behavior can be configured inline or through an immutable TaskOptions object.
from osiiso import TaskOptions
retrying = TaskOptions(retries=3, retry_delay=0.5, backoff=2, timeout=10)
urgent = retrying.replace(priority=0, name="urgent-api-call")
q.submit(fetch, url, opts=urgent)
q.submit(fetch, other_url, retries=3, retry_delay=0.5, backoff=2)
| Option | Default | Meaning |
|---|---|---|
priority |
3 |
Lower numbers run first. |
must_complete |
False |
Protects a task during graceful shutdown. |
timeout |
None |
Per-task timeout in seconds. |
retries |
0 |
Retry attempts after the first failure. |
retry_delay |
0.0 |
Delay before the first retry. |
backoff |
1.0 |
Multiplier applied after each retry. |
delay |
None |
Run after this many seconds. |
run_at |
None |
Run at an absolute epoch timestamp. |
name |
None |
Custom result and hook name. |
group_id |
None |
Group label for summaries. |
detached |
False |
Metadata flag for fire-and-forget style tasks. |
TaskOptions validates invalid combinations immediately. For example, delay and run_at are mutually exclusive, negative retries are rejected, and unknown submit options raise TypeError.
Results and errors
Every run() returns a RunSummary.
summary.ok
summary.succeeded
summary.failed
summary.cancelled
summary.timed_out
summary.values
summary.errors
summary.by_task_id()
summary.by_name()
summary.by_group()
summary.raise_for_errors()
summary.display()
Use strict=True when failures should raise ExecutionError after the run finishes:
summary = await q.run(strict=True)
Each task result is stored as a TaskResult with task id, name, status, value, exception, attempts, priority, timing, group id, and cancellation metadata.
Lifecycle and policies
Queues support finite and long-running modes:
q = osiiso.AsyncQueue(mode="finite", fail_policy="continue", on_exit="complete_priority")
mode="finite"runs pending work and exits.mode="infinite"keeps workers alive until shutdown or timeout.fail_policy="continue"records failures and keeps processing.fail_policy="fail_first"cancels remaining eligible work after the first failure.on_exit="complete_priority"letsmust_completetasks finish during graceful shutdown.on_exit="cancel"cancels eligible pending and active work on timeout or forced shutdown.
Hooks give you a simple integration point for logging, metrics, and tracing:
def completed(result: osiiso.TaskResult) -> None:
print(result.name, result.status, result.duration)
q = osiiso.ThreadQueue(on_complete=completed)
Examples
Run the compact feature gallery:
uv run python examples/feature_gallery.py
Run the complete Hacker News style showcase:
uv run python -m examples.hackernews_showcase --limit 6
Use the live Hacker News API:
uv run python -m examples.hackernews_showcase --limit 20 --online
The showcase uses all three backends:
AsyncQueuefetches feeds, items, and users.ThreadQueuepersists records into SQLite.ProcessQueueranks stories and computes keywords.
Documentation
The MkDocs documentation lives in docs/.
Run the docs locally:
python -m pip install -e ".[docs]"
mkdocs serve
Build the docs strictly:
mkdocs build --strict
Project docs are configured for GitHub Pages at:
https://ichinga-samuel.github.io/osiiso/
Development
Install the development dependencies:
python -m pip install -e ".[dev]"
Run tests:
uv run pytest
Run Ruff:
uv run ruff check .
Build the package:
python -m build
Build the docs with the docs extra:
uv run --extra docs mkdocs build --strict
Community
- Read the contribution guide before opening larger pull requests.
- Check the changelog for release history and upcoming changes.
- Use support guidance for questions, bug reports, and feature requests.
- Report vulnerabilities through the security policy, not public issues.
- Follow the code of conduct when participating in project spaces.
Project layout
.
|-- src/osiiso/ # Library source
|-- tests/ # Unit tests for async, thread, process, and options behavior
|-- docs/ # MkDocs documentation
|-- examples/feature_gallery.py # Compact API showcase
|-- examples/hackernews_showcase # Complete multi-backend example project
|-- pyproject.toml # Package metadata and tool configuration
`-- mkdocs.yml # Documentation site configuration
Public API
from osiiso import (
AsyncQueue,
ThreadQueue,
ProcessQueue,
TaskOptions,
TaskHandle,
SyncTaskHandle,
TaskGroup,
SyncTaskGroup,
TaskResult,
RunSummary,
ExecutionError,
ClosedError,
OsiisoError,
run,
)
License
osiiso is released under the MIT License.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file osiiso-0.0.1a2.tar.gz.
File metadata
- Download URL: osiiso-0.0.1a2.tar.gz
- Upload date:
- Size: 46.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b15b2aa88661fb7e8477d6c39bd2bec251fff018e4804f7d6e945317f46df407
|
|
| MD5 |
fdc273502ff97195477b2379e5f5e54f
|
|
| BLAKE2b-256 |
4b7bb4cc492718434670b176dca882fc88c74d74e6cd6b154d9ff7b7b8da5f76
|
File details
Details for the file osiiso-0.0.1a2-py3-none-any.whl.
File metadata
- Download URL: osiiso-0.0.1a2-py3-none-any.whl
- Upload date:
- Size: 44.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5bc3b9c544ab398cd93cb9263dfbc2f3070e90d7f61d663c49a8a0ea36640995
|
|
| MD5 |
9d30086a1cd0e8242c68f124bf2f0f23
|
|
| BLAKE2b-256 |
7d638dc36be21ddcebc561164276540a5e4c2887fa63bde3e887844ad35c1bca
|