Skip to main content

Automation agent toolkit for the Ondemand platform

Project description

ondemand-ai

Python SDK for building automations on the Ondemand platform.

Provides OndemandWorker (a Temporal worker wrapper for Cloud Run Jobs), WorkflowReporter (step tree management queryable via the Temporal Query API), structured logging, R2 artifact storage, and human-in-the-loop approval helpers.

PyPI Python License

Installation

# Full install with Temporal worker support
pip install ondemand-ai[worker]

# Shared utilities only (logging, R2 storage, artifacts, approvals)
pip install ondemand-ai

Requirements: Python 3.9+

Quick Start

A minimal automation with one workflow and one activity:

# workflows.py
from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    from activities import process_data

from ondemand.worker import WorkflowReporter


@workflow.defn
class MyWorkflow:
    def __init__(self):
        self.reporter = WorkflowReporter()

    @workflow.query
    def get_progress(self) -> dict:
        return self.reporter.to_dict()

    @workflow.run
    async def run(self, inputs: dict) -> dict:
        # Define the step tree
        self.reporter.add_step("extract", "Extrair Dados")
        self.reporter.add_step("validate", "Validar Dados", parent="extract")

        # Execute activity
        self.reporter.start_step("extract")
        result = await workflow.execute_activity(
            process_data,
            inputs,
            start_to_close_timeout=timedelta(minutes=30),
        )

        # Apply updates returned by the activity
        self.reporter.apply_updates(result.get("step_updates", []))
        self.reporter.complete_step("extract")

        return result
# activities.py
from temporalio import activity
from ondemand.shared import get_logger

logger = get_logger(__name__)


@activity.defn
async def process_data(inputs: dict) -> dict:
    logger.section("Processing Data")

    with logger.timed("Loading files"):
        data = load(inputs["file"])

    logger.success(f"Processed {len(data)} records")

    return {
        "count": len(data),
        "step_updates": [
            {"step_id": "validate", "status": "completed"},
            {"record": {"step_id": "extract", "id": "file1.pdf", "status": "success", "message": "OK"}},
            {"log": "All records validated"},
        ],
    }
# main.py
from ondemand.worker import OndemandWorker
from workflows import MyWorkflow
from activities import process_data

worker = OndemandWorker(name="my-automation")
worker.register_workflow(MyWorkflow)
worker.register_activity(process_data)

if __name__ == "__main__":
    worker.run()

Modules

ondemand.worker.OndemandWorker

Connects to Temporal, registers workflows and activities, polls a task queue, and shuts down after an idle timeout (Cloud Run Jobs pay per second).

from ondemand.worker import OndemandWorker

worker = OndemandWorker(name="my-worker")

# Register via decorators
@worker.workflow
class MyWorkflow: ...

@worker.activity
async def my_activity(inputs: dict) -> dict: ...

# Or register explicitly
worker.register_workflow(MyWorkflow)
worker.register_activity(my_activity)

# Start polling (blocking call, runs asyncio event loop)
worker.run()

Behavior:

  • Reads configuration from environment variables (see below)
  • Captures all stdout/stderr for console log upload
  • Exits gracefully on SIGINT/SIGTERM
  • Exits after WORKER_IDLE_TIMEOUT seconds with no work (default: 300s)

ondemand.worker.WorkflowReporter

Manages a step tree with records, logs, and artifacts. Lives inside the workflow class and is exposed to the portal via @workflow.query.

Step Management

reporter = WorkflowReporter()

# Build the step tree
reporter.add_step("extract", "Extrair Dados")
reporter.add_step("parse", "Parsear Arquivos", parent="extract")
reporter.add_step("classify", "Classificar")

# Track progress
reporter.start_step("extract")      # logs "▶ Extrair Dados" at INFO
reporter.complete_step("extract")    # logs "✓ Extrair Dados" at SUCCESS
reporter.fail_step("classify", "Timeout na API")  # logs "✗ Classificar: Timeout na API" at ERROR
reporter.warn_step("parse")         # marks step as completed with warnings
reporter.skip_step("classify")      # marks step as skipped

Step statuses: pending, running, completed, failed, warning, skipped

Records

Attach individual item results to a step (e.g., one file processed, one transaction classified):

reporter.add_record(
    step_id="extract",
    record_id="invoice_001.pdf",
    status="success",        # "success", "warning", "failed"
    message="Processado OK",
    metadata={"pages": 3, "total": 1500.00},
)

Logs

reporter.log("Downloading 42 files...", level="INFO", module="Downloader")
# module defaults to the current step's title if omitted

Console log format: timestamp - module - LEVEL - message

Colors in the portal UI:

Level Color
ERROR Red
WARNING Amber
SUCCESS Green
Lines starting with Cyan
Everything else Gray

Artifacts

Register files uploaded to R2 so they appear in the portal:

reporter.add_artifact(
    name="relatorio.xlsx",
    r2_key="artifacts/run-123/relatorio.xlsx",
    size=45_000,
    mime_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)

Batch Updates from Activities

Activities cannot modify workflow state directly. Instead, they return a list of updates that the workflow applies:

# In the activity — return updates
return {
    "result": "...",
    "step_updates": [
        {"step_id": "parse", "status": "running", "timestamp": "2026-03-31T12:00:00Z"},
        {"step_id": "parse", "status": "completed", "timestamp": "2026-03-31T12:00:05Z"},
        {"record": {"step_id": "parse", "id": "file1.pdf", "status": "success", "message": "OK"}},
        {"log": "Parsed 150 records"},
        {"artifact": {"name": "output.csv", "r2_key": "artifacts/run-123/output.csv", "size": 1024}},
    ],
}

# In the workflow — apply them
result = await workflow.execute_activity(my_activity, inputs, ...)
self.reporter.apply_updates(result.get("step_updates", []))

State Export

@workflow.query
def get_progress(self) -> dict:
    return self.reporter.to_dict()

Returns a dict with status, current_step, steps (flat list), step_tree (nested), logs, and artifacts. The portal polls this via the Temporal Query API.

ondemand.shared.logging

Custom logger with a SUCCESS level (25, between INFO and WARNING) and helpers for structured output.

from ondemand.shared import get_logger

logger = get_logger(__name__)

logger.info("Processing started")
logger.success("All files uploaded")        # SUCCESS level, green in portal
logger.section("Fase 2: Classificacao")     # logs "#### Fase 2: Classificacao", cyan in portal
logger.step("Extrair", "ABC Corp")          # logs "[Extrair] ABC Corp"
logger.divider()                            # logs "============..."
logger.summary("Results", {"total": 42, "errors": 0})

with logger.timed("Uploading files"):
    upload()
# logs "#### Uploading files" on entry
# logs "SUCCESS - Uploading files completed in 3.2s" on exit
# logs "ERROR - Uploading files FAILED after 3.2s" on exception

ondemand.shared.r2_storage

Upload and download files from Cloudflare R2 (S3-compatible). Uses boto3 under the hood.

from ondemand.shared import get_r2_client, download_input_files, upload_task_artifacts
from pathlib import Path

# Direct client usage
r2 = get_r2_client()
r2.upload_file(Path("output.xlsx"), "artifacts/run-123/output.xlsx")
r2.download_file("inputs/uuid/data.csv", Path("./downloads/data.csv"))
r2.copy_object("inputs/uuid/data.csv", "artifacts/run-123/inputs/data.csv")

# Download all file-type inputs from a workflow's input dict
downloaded = download_input_files(
    inputs={"planilha": "inputs/uuid/data.xlsx", "empresa": "ABC"},
    dest_dir=Path("./downloads"),
    run_id="run-123",                # copies to artifacts/ for portal visibility
)
# downloaded == {"planilha": Path("./downloads/data.xlsx")}

# Upload a task's output directory
uploaded = upload_task_artifacts(
    task_output_dir=Path("output/run-123/classify"),
    run_id="run-123",
    task_name="classify",
    exclude=["console.txt"],
)

ondemand.shared.approval

Pause a workflow and wait for human approval (HITL pattern).

from ondemand import request_approval

approval_url, rejection_url = request_approval(
    message="3 divergencias encontradas. Revisar?",
    data={"total": 15000, "items": ["NF-001", "NF-002", "NF-003"]},
    show_buttons=True,      # show approve/reject buttons in portal UI
    timeout_days=7,         # auto-reject after 7 days (default)
)

# Send notification however you want (email, Slack, WhatsApp, etc.)
send_email(to="reviewer@client.com", body=f"Aprovar: {approval_url}")

Behavior:

  • Synchronous call -- sends a webhook to the portal and gets tokenized URLs back
  • After calling, the activity/step should exit normally
  • The Temporal workflow pauses automatically (the worker slot is freed)
  • If approved, the next step executes
  • If rejected, remaining steps are cancelled
  • Raises ApprovalRequestError if the portal is unreachable after 3 retries

ondemand.shared.artifacts

Manage output directories and pass data between workflow steps.

from ondemand.shared import (
    set_run_id, get_run_id, get_run_info,
    get_output_dir, get_base_output_dir,
    save_artifact, load_artifact,
)

set_run_id("run-123")

# Per-task output: output/run-123/extract/
output_dir = get_output_dir("extract")

# Shared output: output/run-123/
base_dir = get_base_output_dir()

# Save/load JSON artifacts
save_artifact({"companies": [...]}, "companies.json")
data = load_artifact("companies.json", task="extract")

# Run context
info = get_run_info()  # RunInfo(run_id, process_code, organization_id, started_at)

Environment Variables

Set by the platform when running on Cloud Run. For local development, set them manually or use a .env file.

Variable Required Description
TEMPORAL_ADDRESS Yes Temporal server address (e.g., temporal.example.com:7233)
TEMPORAL_NAMESPACE Yes Temporal namespace (typically the org code)
TEMPORAL_QUEUE Yes Task queue name (typically the process code)
ONDEMAND_APP_URL No API base URL for webhook callbacks
SUPERVISOR_WEBHOOK_SECRET No Auth token for webhook calls
WORKER_NAME No Worker name (default: ondemand-worker)
WORKER_MAX_CONCURRENT No Max concurrent activities (default: 1)
WORKER_IDLE_TIMEOUT No Seconds to wait before exiting if idle (default: 300)
R2_ENDPOINT No Cloudflare R2 endpoint URL
R2_ACCESS_KEY No R2 access key ID
R2_SECRET_KEY No R2 secret access key
R2_BUCKET No R2 bucket name
ONDEMAND_RUN_ID No Current run UUID
ONDEMAND_PROCESS_CODE No Process code for the current run
ONDEMAND_ORGANIZATION_ID No Organization ID for the current run
ONDEMAND_WEBHOOK_URL No Webhook URL (required for request_approval)
ONDEMAND_WEBHOOK_SECRET No Webhook auth secret

Package Structure

ondemand/
├── __init__.py                # Top-level exports (request_approval, ApprovalRequestError)
├── worker/
│   ├── __init__.py            # Exports: OndemandWorker, WorkflowReporter
│   ├── base.py                # OndemandWorker — Temporal connection, polling, idle timeout
│   └── reporter.py            # WorkflowReporter — step tree, records, logs, artifacts
└── shared/
    ├── __init__.py            # Re-exports all shared utilities
    ├── approval.py            # request_approval() for HITL workflows
    ├── artifacts.py           # save_artifact, load_artifact, output dirs, RunInfo
    ├── logging.py             # OndemandLogger with SUCCESS level, section/step/timed helpers
    └── r2_storage.py          # R2StorageClient, download/upload utilities

Publishing

# Bump version in pyproject.toml, then:
python -m build
python -m twine upload dist/*

Requires a PyPI API token configured in ~/.pypirc or via TWINE_PASSWORD.

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ondemand_ai-1.4.3.tar.gz (28.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ondemand_ai-1.4.3-py3-none-any.whl (28.1 kB view details)

Uploaded Python 3

File details

Details for the file ondemand_ai-1.4.3.tar.gz.

File metadata

  • Download URL: ondemand_ai-1.4.3.tar.gz
  • Upload date:
  • Size: 28.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for ondemand_ai-1.4.3.tar.gz
Algorithm Hash digest
SHA256 0dc4697f7cf98e90a6468f47243b73dcf0e6d3fd9810e9df963f694f508c0b41
MD5 5cd3c5ef7b9ba0f2a770e9e2383e3208
BLAKE2b-256 e7ff175f4f8ce7660974435412f88fa76b0bf0eeb481a7db1c734d6bb351e8d6

See more details on using hashes here.

File details

Details for the file ondemand_ai-1.4.3-py3-none-any.whl.

File metadata

  • Download URL: ondemand_ai-1.4.3-py3-none-any.whl
  • Upload date:
  • Size: 28.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for ondemand_ai-1.4.3-py3-none-any.whl
Algorithm Hash digest
SHA256 3035c117973bb54c876f39bb77ed12c14a1d5a5ff993fb79f00af29ec61ad13c
MD5 e87e9fb09764af72236d5865d6bbc9ed
BLAKE2b-256 0e9ad9b594c0b105d879443dd94d50f3c8ff4aa8a8b73ee12ad9be892057a4fc

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page