Skip to main content

Reusable async process management library

Project description

optio-core

The core Python library for Optio. Embed it in your Python application to define, launch, cancel, and monitor long-running background tasks — with progress reporting, hierarchical child processes, cooperative cancellation, cron scheduling, and ad-hoc dynamic task creation. All backed by MongoDB for persistence. Optional Redis integration enables remote control and custom command handlers.

Installation

pip install optio-core

For Redis command bus support:

pip install optio-core[redis]

Requirements: Python 3.11+, MongoDB. Dependencies: motor>=3.3.0, apscheduler>=4.0.0a5, quaestor. Redis support: redis>=5.0.0 (optional extra).

Quick Start

import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
from optio_core import init, launch_and_wait, get_process, TaskInstance

async def my_task(ctx):
    for i in range(10):
        if not ctx.should_continue():
            return
        ctx.report_progress(i * 10, f"Step {i + 1}/10")
        await asyncio.sleep(1)
    ctx.report_progress(100, "Done")

async def get_tasks(services):
    return [
        TaskInstance(
            execute=my_task,
            process_id="my-task",
            name="My Task",
        ),
    ]

async def main():
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    db = client["myapp"]

    await init(
        mongo_db=db,
        get_task_definitions=get_tasks,
    )

    await launch_and_wait("my-task")
    proc = await get_process("my-task")
    print(proc["status"]["state"])  # "done"

asyncio.run(main())

Lifecycle API

All symbols are available directly from optio_core:

import optio_core

init()

await optio_core.init(
    mongo_db: AsyncIOMotorDatabase,
    prefix: str = "optio",
    redis_url: str | None = None,
    services: dict[str, Any] | None = None,
    get_task_definitions: Callable[..., Awaitable[list[TaskInstance]]] | None = None,
) -> None

Initialize Optio. Must be called before any other function.

Parameter Type Default Description
mongo_db AsyncIOMotorDatabase required Motor async database object
prefix str "optio" Namespace for collections ({prefix}_processes) and clamator service streams (the {database}/{prefix} Redis key prefix). Override if you need to avoid name collisions in a shared database. Streams are automatically scoped by both database name and prefix, so instances using different databases won't collide even on a shared Redis server.
redis_url str | None None If None, Redis features are disabled; use direct method calls only
services dict[str, Any] | None {} Passed as ctx.services to all task execute functions
get_task_definitions Callable[..., Awaitable[list[TaskInstance]]] | None None Async function (services, metadata_filter) -> list[TaskInstance]; called on init and resync. metadata_filter is None for a full sync, or a dict of metadata key/value pairs to restrict which task definitions are regenerated. This is the most important part — this is where you declare the tasks that Optio will manage. See the TaskInstance definition under Data Types.

run()

await optio_core.run() -> None

Start the main event loop. Blocks until shutdown() is called. Starts the cron scheduler and, if Redis is configured, begins consuming commands from the Redis stream. Installs signal handlers for SIGTERM and SIGINT that trigger graceful shutdown.

Without Redis, run() simply blocks until shutdown() is called (useful for scheduler-only deployments).

shutdown()

await optio_core.shutdown() -> None

Initiate graceful shutdown. Stops the command consumer and scheduler, sets cancellation flags on all running processes, waits up to 5 seconds for them to exit, and closes the Redis connection.

State Machine

Every process has a state that follows a strict state machine. The happy path is straightforward:

idle --> scheduled --> running --> done
                          |         |
                          v         v
                        failed    idle (dismiss)
                          |
                          v
                        idle (dismiss)

Cancel path:
  scheduled --> cancelled
  running --> cancel_requested --> cancelling --> cancelled
  cancelled --> idle (dismiss)

Here is the full state diagram, including re-launch and dismiss transitions:

State Machine

States

State Group Description
idle LAUNCHABLE Initial state; ready to launch
scheduled ACTIVE, CANCELLABLE Queued by scheduler or launch command
running ACTIVE, CANCELLABLE Execute function is running
cancel_requested ACTIVE Cancel requested, waiting for executor to acknowledge
cancelling ACTIVE Executor acknowledged; cleaning up
done END, LAUNCHABLE, DISMISSABLE Completed successfully
failed END, LAUNCHABLE, DISMISSABLE Execute function raised an exception
cancelled END, LAUNCHABLE, DISMISSABLE Cancelled successfully

State Groups

ACTIVE_STATES     = {"scheduled", "running", "cancel_requested", "cancelling"}
END_STATES        = {"done", "failed", "cancelled"}
LAUNCHABLE_STATES = {"idle", "done", "failed", "cancelled"}
CANCELLABLE_STATES = {"scheduled", "running"}
DISMISSABLE_STATES = {"done", "failed", "cancelled"}

Transition Table

From To (valid)
idle scheduled
scheduled running, cancel_requested
running done, failed, cancel_requested
done scheduled, idle
failed scheduled, idle
cancel_requested cancelling
cancelling cancelled
cancelled scheduled, idle

Cooperative Cancellation

Cancellation is cooperative. When a cancel request arrives:

  1. The process state transitions to cancel_requested, then cancelling.
  2. An internal flag is set.
  3. The task function must check ctx.should_continue() periodically and return early if it returns False.
  4. Cancellation propagates to child processes automatically.

If a task never checks should_continue(), it cannot be cancelled (it will remain in cancelling state until it finishes naturally).

Scheduling

Tasks with a schedule field are registered with APScheduler as cron jobs. The schedule is a standard cron expression (5 fields: minute, hour, day-of-month, month, day-of-week).

TaskInstance(
    execute=nightly_cleanup,
    process_id="nightly-cleanup",
    name="Nightly Cleanup",
    schedule="0 2 * * *",  # 2:00 AM daily
)

Schedules are synced on init() and resync(). The scheduler starts when run() is called.

ProcessContext

ProcessContext is the sole argument to every task execute function. It is the interface your tasks use to communicate with Optio — reporting progress, checking for cancellation, spawning child processes — and also how they receive their injected dependencies and task-specific parameters.

async def execute(ctx: ProcessContext) -> None:
    ...

Properties (read-only) — What the process receives

Property Type Description
ctx.process_id str The process ID string
ctx.params dict[str, Any] Parameters from the task definition. Useful when the same execute function is shared across multiple tasks — for example, a generic data fetcher where each task targets a different source. The source ID (different for each task) is passed in via params.
ctx.metadata dict[str, Any] Metadata from the task definition. Use this for tagging your processes, which can be useful for finding or identifying them later.
ctx.services dict[str, Any] Services dict passed to init(). Use this for dependency injection.

Methods — What the process can do

report_progress()

ctx.report_progress(percent: float | None, message: str | None = None) -> None

Update progress. percent is 0-100, or None for indeterminate progress. message is an optional description of the current step (also appended to the process log). Progress writes are buffered and flushed to MongoDB at most every 100ms (configurable via the OPTIO_PROGRESS_FLUSH_INTERVAL_MS environment variable). A final flush occurs automatically when the process completes.

should_continue()

ctx.should_continue() -> bool

Returns False when cancellation has been requested. Poll this in loops to support cooperative cancellation.

mark_ephemeral()

await ctx.mark_ephemeral() -> None

Mark this process for deletion after it completes (reaches a terminal state).

run_child()

await ctx.run_child(
    execute: Callable[..., Awaitable[None]],
    process_id: str,
    name: str,
    description: str | None = None,
    params: dict[str, Any] | None = None,
    survive_failure: bool = False,
    survive_cancel: bool = False,
    on_child_progress: Callable[[list[ChildProgressInfo]], None] | None = None,
) -> str

Run a sequential child process. Blocks until the child completes. Returns the child's final state: "done", "failed", or "cancelled".

Parameter Type Default Description
execute Callable required Async function for the child task
process_id str required Unique ID for the child process
name str required Display name
description str | None None Optional description text
params dict | None None Parameters passed to the child's ctx.params
survive_failure bool False If False, raises RuntimeError when child fails
survive_cancel bool False If False, raises RuntimeError when child is cancelled
on_child_progress Callable | None None Callback for child progress updates (use progress helpers)

parallel_group()

ctx.parallel_group(
    max_concurrency: int = 10,
    survive_failure: bool = False,
    survive_cancel: bool = False,
    on_child_progress: Callable[[list[ChildProgressInfo]], None] | None = None,
) -> ParallelGroup

Create a parallel execution group. Use as an async context manager.

Parameter Type Default Description
max_concurrency int 10 Maximum number of children running concurrently
survive_failure bool False If False, raises RuntimeError on exit if any child failed
survive_cancel bool False If False, raises RuntimeError on exit if any child was cancelled
on_child_progress Callable | None None Callback for child progress updates

Inside the context, call await group.spawn(execute, process_id, name, description, params) to add children. After the context exits, group.results contains a list[ChildResult].

Child Processes

Tasks can spawn child processes that appear as a tree in the database. Children have their own state, progress, and logs.

Sequential Children

async def parent_task(ctx):
    ctx.report_progress(0, "Starting phase 1")
    state = await ctx.run_child(
        execute=phase_one,
        process_id=f"{ctx.process_id}/phase-1",
        name="Phase 1",
        params={"key": "value"},
        survive_failure=False,  # Raise if child fails (default)
        survive_cancel=False,   # Raise if child is cancelled (default)
    )
    # state is "done", "failed", or "cancelled"

    ctx.report_progress(50, "Starting phase 2")
    await ctx.run_child(
        execute=phase_two,
        process_id=f"{ctx.process_id}/phase-2",
        name="Phase 2",
    )
    ctx.report_progress(100, "Complete")

Parallel Children

async def parent_task(ctx):
    async with ctx.parallel_group(
        max_concurrency=5,
        survive_failure=True,   # Continue even if some children fail
        survive_cancel=False,
    ) as group:
        for i, item in enumerate(items):
            await group.spawn(
                execute=process_item,
                process_id=f"{ctx.process_id}/item-{i}",
                name=f"Process {item['name']}",
                params={"item": item},
            )

    # group.results is a list of ChildResult after the group completes
    failed = [r for r in group.results if r.state != "done"]
    ctx.report_progress(100, f"Done, {len(failed)} failures")

When survive_failure=False (the default), a RuntimeError is raised when the group's async context exits if any child failed or was cancelled.

Progress Reporting

Call ctx.report_progress(percent, message) from your task function:

  • percent: float from 0 to 100, or None for indeterminate progress.
  • message: Optional str describing the current step. Also appended to the process log.

Progress writes are buffered and flushed to MongoDB at most every 100ms (configurable via OPTIO_PROGRESS_FLUSH_INTERVAL_MS). A final flush occurs automatically when the process completes.

Progress Helpers

Import from optio_core.progress_helpers:

from optio_core.progress_helpers import sequential_progress, average_progress, mapped_progress

These return on_child_progress callbacks suitable for run_child() and parallel_group().

sequential_progress(ctx, total_children)

Divides parent 0-100% into equal slots for N sequential children. Each child's 0-100% maps to its 100/N% slot of the parent.

average_progress(ctx)

Parent percent = average of all children's percent. Children in terminal states (done, failed, cancelled) count as 100%.

mapped_progress(ctx, range_start, range_end)

Maps a single child's 0-100% into a sub-range of the parent. Arguments are fractions from 0.0 to 1.0.

# Child's progress maps to 0-25% of parent
on_progress = mapped_progress(ctx, 0.0, 0.25)
await ctx.run_child(execute=step_one, process_id="step-1", name="Step 1",
                    on_child_progress=on_progress)

Process Management

Now that we have discussed what the individual processes can do, let's look at what the application can do to the processes.

launch()

await optio_core.launch(process_id: str) -> None

Fire-and-forget launch. The process begins execution in a background task and the call returns immediately. The process must be in a launchable state (idle, done, failed, or cancelled).

launch_and_wait()

await optio_core.launch_and_wait(process_id: str) -> None

Launch a process and block until it reaches a terminal state (done, failed, or cancelled). Useful for scripting and tests.

cancel()

await optio_core.cancel(process_id: str) -> None

Cancel a running or scheduled process. If scheduled, it transitions directly to cancelled. If running, it transitions through cancel_requested -> cancelling, and the cancellation flag is set for cooperative cancellation.

dismiss()

await optio_core.dismiss(process_id: str) -> None

Reset a completed process back to idle. Only works on processes in a dismissable state (done, failed, or cancelled). Clears the previous run's result fields (status timestamps, progress, logs) and deletes all descendant child processes.

resync()

await optio_core.resync(clean: bool = False) -> None

Re-run the task generator and sync definitions with the database. New tasks are created, removed tasks are deleted (if idle), and metadata on existing tasks is updated without disturbing runtime state.

Parameter Type Default Description
clean bool False If True, delete all process records before re-syncing

Querying

get_process()

await optio_core.get_process(process_id: str) -> dict | None

Get a single process document by its processId string. Returns the full MongoDB document or None if not found.

list_processes()

await optio_core.list_processes(
    state: str | None = None,
    root_id: str | None = None,
    metadata: dict[str, str] | None = None,
) -> list[dict]

List processes with optional filters. Results are sorted by depth, order, then _id.

Parameter Type Default Description
state str | None None Filter by status.state (e.g., "running", "done")
root_id str | None None Filter by rootId (string; converted to ObjectId internally)
metadata dict[str, str] | None None Filter by metadata fields. Each key-value pair matches against metadata.{key} in the process document. Multiple entries are combined with AND. For example, metadata={"customer": "2"} returns all processes tagged with that customer.

Ad-hoc Processes

Ad-hoc processes are created at runtime rather than from the task generator. They are useful for one-off operations or dynamically spawned work.

adhoc_define()

await optio_core.adhoc_define(
    task: TaskInstance,
    parent_id: ObjectId | None = None,
    ephemeral: bool = False,
) -> dict

Create an ad-hoc process at runtime. Returns the MongoDB process document. The process starts in idle state.

Parameter Type Default Description
task TaskInstance required Task instance with execute function, process_id, name, etc.
parent_id ObjectId | None None If set, creates the process as a child of the given parent (by MongoDB _id)
ephemeral bool False If True, the process is automatically deleted after reaching a terminal state

adhoc_delete()

await optio_core.adhoc_delete(process_id: str) -> None

Delete an ad-hoc process and all its descendants from MongoDB. Also removes it from the internal task registry.

Example:

from optio_core import adhoc_define, launch, TaskInstance

proc = await adhoc_define(
    task=TaskInstance(
        execute=one_off_task,
        process_id="one-off-123",
        name="One-off Import",
    ),
    ephemeral=True,  # Auto-delete after completion
)

await launch("one-off-123")

Remote Control via Clamator RPC

When Redis is configured (by passing redis_url to init()), optio-core hosts a clamator RedisRpcServer at the ${database}/${prefix} key prefix and registers the optio-engine service against it. External services control processes by holding a clamator client and calling typed methods: launch, cancel, dismiss, resync, group_cancel, group_cancel_and_wait, block_launches, unblock_launches. Results are discriminated unions — ok: true carries the updated process payload; ok: false carries a typed reason field (e.g. "not-found", "not-cancellable", "launch-blocked", "no-resume-support").

The contract lives in packages/optio-contracts/src/optio-engine-to-api.ts. Generated TypeScript bindings ship from optio-api / optio-contracts; the Python ABC and Pydantic models are codegenned to src/optio_core/_generated/optio_engine.py (filename post-processed from optio-engine.py so Python's module-identifier rules accept it).

The RPC server is exposed at optio_core.rpc_server for applications that need to register additional services on the same server before calling optio_core.run():

import optio_core

await optio_core.init(mongo_db=db, redis_url=URL, prefix='myapp')
optio_core.rpc_server.register_service(my_domain_contract, MyDomainService())
await optio_core.run()

Custom command verbs are added by registering an additional clamator service against optio_core.rpc_server.

Data Types

TaskInstance

@dataclass
class TaskInstance:
    execute: Callable[..., Awaitable[None]]  # async def execute(ctx: ProcessContext) -> None
    process_id: str
    name: str
    description: str | None = None           # optional description, shown as tooltip in UI
    params: dict[str, Any] = field(default_factory=dict)
    metadata: dict[str, Any] = field(default_factory=dict)
    schedule: str | None = None              # cron expression, e.g. "0 3 * * *"
    special: bool = False                    # hidden from default UI views when special=True
    warning: str | None = None               # shown as confirmation prompt before launch
    cancellable: bool = True                 # whether this process can be cancelled
Field Type Description
execute Callable[..., Awaitable[None]] Async function (ctx: ProcessContext) -> None that implements the task
process_id str Unique string identifier for this process
name str Human-readable display name
description str | None Optional description text, shown as tooltip on process name in the UI
params dict[str, Any] Parameters passed to the execute function via ctx.params
metadata dict[str, Any] Application metadata stored on the process document. Use this for tagging processes so you can find them later — e.g., {"customer": "2"} to associate a task with a specific customer. See list_processes(metadata=...) for querying.
schedule str | None Cron expression (5 fields) for automatic scheduling, or None
special bool Hidden from default UI views when True
warning str | None Warning message shown as confirmation prompt before launch
cancellable bool Whether this process can be cancelled (default: True)

ChildResult

@dataclass
class ChildResult:
    process_id: str
    state: str        # "done" | "failed" | "cancelled"
    error: str | None = None

Returned in ParallelGroup.results after all children complete.

ChildProgressInfo

@dataclass
class ChildProgressInfo:
    process_id: str
    name: str
    state: str             # "scheduled" | "running" | "done" | "failed" | "cancelled"
    percent: float | None = None
    message: str | None = None

Passed to on_child_progress callbacks. Contains the current state and progress of each child in a group.

MongoDB Document Schema

Optio-core is the sole owner of the data in MongoDB — external code should treat these documents as read-only. If you need to observe process state from other services or a frontend, use optio-api, which exposes this data as a REST API with SSE streams for real-time updates.

Collection: {prefix}_processes

Field Type Description
_id ObjectId MongoDB document ID
processId string Application-defined unique identifier
name string Human-readable display name
description string | null Optional description text
params object Static parameters from TaskInstance
metadata object Arbitrary metadata; fields can be filtered via list_processes(metadata=...)
parentId ObjectId | null Parent process _id; null for root processes
rootId ObjectId Root process _id; equals _id for root processes
depth int Tree depth; 0 for root
order int Sort order among siblings
cancellable bool Whether cancel is permitted
adhoc bool True if created via adhoc_define()
ephemeral bool True if process should be deleted after completion
special bool Marks administrative/special-purpose processes
warning string | null Warning text shown before launch
status object Runtime status sub-document
status.state string Current process state
status.error string | null Error message (failed state)
status.runningSince datetime | null When execution started
status.doneAt datetime | null When process completed successfully
status.duration float | null Execution duration in seconds
status.failedAt datetime | null When process failed
status.stoppedAt datetime | null When process was cancelled
progress object Progress sub-document
progress.percent float | null 0-100, or null for indeterminate
progress.message string | null Current progress message
log array Log entries from the current/last run
log[].timestamp ISO datetime string Entry timestamp
log[].level string event | info | debug | warning | error
log[].message string Log message
log[].data object | absent Optional structured data
createdAt datetime Document creation timestamp

Configuration

Environment Variable Default Description
OPTIO_PROGRESS_FLUSH_INTERVAL_MS 100 How often (in milliseconds) buffered progress updates are flushed to MongoDB

See Also

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

optio_core-0.1.0.tar.gz (109.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

optio_core-0.1.0-py3-none-any.whl (58.7 kB view details)

Uploaded Python 3

File details

Details for the file optio_core-0.1.0.tar.gz.

File metadata

  • Download URL: optio_core-0.1.0.tar.gz
  • Upload date:
  • Size: 109.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for optio_core-0.1.0.tar.gz
Algorithm Hash digest
SHA256 5eec1849a19cea269d244fb4d1540c9fea44f42e578a2322fc673fd059161e59
MD5 fa7ddf429988f31cdd29f6a837bdafc3
BLAKE2b-256 c1d5da2a3a74eb5833250673d2c994ee2eb19f27f41d17d93bc99b8f886e83ee

See more details on using hashes here.

File details

Details for the file optio_core-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: optio_core-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 58.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for optio_core-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e9ddad4e5b5ffbb88ab7656631451c462e64a698d2b4ffe74f17292aaba28275
MD5 6a02b899f813deb8d92c12e29267402a
BLAKE2b-256 68c802e8980e7f63ce64b839f0dff237642cdae2f18d65e68ca7f1d6b6a21fa8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page