Skip to main content

Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem

Project description

UiPath Runtime

PyPI downloads PyPI - Version Python versions

Runtime abstractions and contracts for the UiPath Python SDK.

Overview

uipath-runtime provides the foundational interfaces and base contracts for building agent runtimes in the UiPath ecosystem. It defines the protocols that all runtime implementations must follow and provides utilities for execution context, event streaming, tracing, structured error handling, durable execution, and human-in-the-loop interactions.

This package is typically used as a dependency by higher-level SDKs such as:

You would use this directly only if you're building custom runtime implementations.

Installation

uv add uipath-runtime

Developer Tools

Check out uipath-dev - an interactive terminal application for building, testing, and debugging UiPath Python runtimes, agents, and automation scripts.

Runtime Protocols

All runtimes implement the UiPathRuntimeProtocol (or one of its sub-protocols):

  • get_schema() — defines input and output JSON schemas.
  • execute(input, options) — executes the runtime logic and returns a UiPathRuntimeResult.
  • stream(input, options) — optionally streams runtime events for real-time monitoring.
  • dispose() — releases resources when the runtime is no longer needed.

Any class that structurally implements these methods satisfies the protocol.

from typing import Any, AsyncGenerator, Optional

from uipath.runtime import (
    UiPathRuntimeResult,
    UiPathRuntimeStatus,
    UiPathRuntimeSchema,
    UiPathRuntimeEvent,
    UiPathExecuteOptions,
    UiPathStreamOptions,
)
from uipath.runtime.events import UiPathRuntimeStateEvent


class MyRuntime:
    """Example runtime implementing the UiPath runtime protocols."""

    async def get_schema(self) -> UiPathRuntimeSchema:
        return UiPathRuntimeSchema(
            input={
                "type": "object",
                "properties": {"message": {"type": "string"}},
                "required": ["message"],
            },
            output={
                "type": "object",
                "properties": {"result": {"type": "string"}},
                "required": ["result"],
            },
        )

    async def execute(
        self,
        input: Optional[dict[str, Any]] = None,
        options: Optional[UiPathExecuteOptions] = None,
    ) -> UiPathRuntimeResult:
        message = (input or {}).get("message", "")
        return UiPathRuntimeResult(
            output={'message': 'Hello from MyRuntime'},
            status=UiPathRuntimeStatus.SUCCESSFUL,
        )

    async def stream(
        self,
        input: Optional[dict[str, Any]] = None,
        options: Optional[UiPathStreamOptions] = None,
    ) -> AsyncGenerator[UiPathRuntimeEvent, None]:
        yield UiPathRuntimeStateEvent(payload={"status": "starting"})
        yield UiPathRuntimeResult(
            output={"completed": True},
            status=UiPathRuntimeStatus.SUCCESSFUL,
        )

    async def dispose(self) -> None:
        pass

Event Streaming

Runtimes can optionally emit real-time events during execution:

from uipath.runtime.events import (
    UiPathRuntimeStateEvent,
    UiPathRuntimeMessageEvent,
)
from uipath.runtime.result import UiPathRuntimeResult

async for event in runtime.stream({"query": "hello"}):
    if isinstance(event, UiPathRuntimeStateEvent):
        print(f"State update: {event.payload}")
    elif isinstance(event, UiPathRuntimeMessageEvent):
        print(f"Message received: {event.payload}")
    elif isinstance(event, UiPathRuntimeResult):
        print(f"Completed: {event.output}")

If a runtime doesn’t support streaming, it raises a UiPathStreamNotSupportedError.

Structured Error Handling

Runtime errors use a consistent, structured model:

from uipath.runtime.errors import UiPathRuntimeError, UiPathErrorCode, UiPathErrorCategory

raise UiPathRuntimeError(
    UiPathErrorCode.EXECUTION_ERROR,
    "Agent failed",
    "Failed to call external service",
    UiPathErrorCategory.USER,
)

Resulting JSON contract:

{
  "code": "Python.EXECUTION_ERROR",
  "title": "Agent failed",
  "detail": "Failed to call external service",
  "category": "User"
}

Runtime Factory

UiPathRuntimeFactoryProtocol provides a consistent contract for discovering and creating runtime instances.

Factories decouple runtime construction (configuration, dependencies) from runtime execution, allowing orchestration, discovery, reuse, and tracing across multiple types of runtimes.

from typing import Any, AsyncGenerator, Optional

from uipath.runtime import (
    UiPathRuntimeResult,
    UiPathRuntimeStatus,
    UiPathRuntimeSchema,
    UiPathExecuteOptions,
    UiPathStreamOptions,
    UiPathRuntimeProtocol,
    UiPathRuntimeFactoryProtocol
)

class MyRuntimeFactory:
    async def new_runtime(self, entrypoint: str, runtime_id: str) -> UiPathRuntimeProtocol:
        return MyRuntime()

    async def discover_runtimes(self) -> list[UiPathRuntimeProtocol]:
        return []

    def discover_entrypoints(self) -> list[str]:
        return []


factory = MyRuntimeFactory()
runtime = await factory.new_runtime("example", "id")

result = await runtime.execute()
print(result.output)  # {'message': 'Hello from MyRuntime'}

Execution Context

UiPathRuntimeContext manages configuration, file I/O, and logs across runtime execution. It can read JSON input files, capture all stdout/stderr logs, and automatically write output and result files when execution completes.

from uipath.runtime import UiPathRuntimeContext, UiPathRuntimeResult, UiPathRuntimeStatus

with UiPathRuntimeContext(input_file="input.json", result_file="result.json", logs_file="execution.log") as ctx:
    ctx.result = await runtime.execute(ctx.input)
# On exit: the result and logs are written automatically to the configured files

When execution fails, the context:

  • Writes a structured error contract to the result file.
  • Re-raises the original exception.

Execution Runtime

UiPathExecutionRuntime wraps any runtime with tracing, telemetry, and log collection capabilities. When running multiple runtimes in the same process, this wrapper ensures each execution's spans and logs are properly isolated and captured.

graph TB
    TM[TraceManager<br/>Shared across all runtimes]

    FACTORY[Factory]

    RT[Runtime]

    EXE[ExecutionRuntime<br/>exec-id: exec-id]

    %% Factory creates runtimes
    FACTORY -->|new_runtime| RT

    %% Runtimes wrapped by ExecutionRuntime
    RT -->|wrapped by| EXE

    %% TraceManager shared with all
    TM -.->|shared| EXE

    %% Execution captures spans to TraceManager
    EXE -->|captures spans| TM

    %% Styling
    style TM fill:#e1f5ff,stroke:#0277bd,stroke-width:3px
    style FACTORY fill:#f3e5f5
    style RT fill:#fff3e0
    style EXE fill:#e8f5e9
from uipath.core import UiPathTraceManager
from uipath.runtime import UiPathExecutionRuntime

trace_manager = UiPathTraceManager()
runtime = MyRuntime()
executor = UiPathExecutionRuntime(
    runtime,
    trace_manager,
    root_span="my-runtime",
    execution_id="exec-123",
)

result = await executor.execute({"message": "hello"})
spans = trace_manager.get_execution_spans("exec-123")  # captured spans
logs = executor.log_handler.buffer  # captured logs
print(result.output)  # {'message': 'Hello from MyRuntime'}

Example: Runtime Orchestration

This example demonstrates an orchestrator runtime that receives a UiPathRuntimeFactoryProtocol, creates child runtimes through it, and executes each one via UiPathExecutionRuntime, all within a single shared UiPathTraceManager.

Orchestrator Runtime
from typing import Any, Optional, AsyncGenerator

from uipath.core import UiPathTraceManager
from uipath.runtime import (
    UiPathExecutionRuntime,
    UiPathRuntimeResult,
    UiPathRuntimeStatus,
    UiPathExecuteOptions,
    UiPathStreamOptions,
    UiPathRuntimeProtocol,
    UiPathRuntimeFactoryProtocol
)


class ChildRuntime:
    """A simple child runtime that echoes its name and input."""

    def __init__(self, name: str):
        self.name = name

    async def get_schema(self):
        return None

    async def execute(
        self,
        input: Optional[dict[str, Any]] = None,
        options: Optional[UiPathExecuteOptions] = None,
    ) -> UiPathRuntimeResult:
        payload = input or {}
        return UiPathRuntimeResult(
            output={
                "runtime": self.name,
                "input": payload,
            },
            status=UiPathRuntimeStatus.SUCCESSFUL,
        )

    async def stream(
        self,
        input: Optional[dict[str, Any]] = None,
        options: Optional[UiPathStreamOptions] = None,
    ) -> AsyncGenerator[UiPathRuntimeResult, None]:
        yield await self.execute(input, options)

    async def dispose(self) -> None:
        pass


class ChildRuntimeFactory:
    """Factory that creates ChildRuntime instances."""

    async def new_runtime(self, entrypoint: str) -> UiPathRuntimeProtocol:
        return ChildRuntime(name=entrypoint)

    async def discover_runtimes(self) -> list[UiPathRuntimeProtocol]:
        return []

    def discover_entrypoints(self) -> list[str]:
        return []


class OrchestratorRuntime:
    """A runtime that orchestrates multiple child runtimes via a factory."""

    def __init__(
        self,
        factory: UiPathRuntimeFactoryProtocol,
        trace_manager: UiPathTraceManager,
    ):
        self.factory = factory
        self.trace_manager = trace_manager

    async def get_schema(self):
        return None

    async def execute(
        self,
        input: Optional[dict[str, Any]] = None,
        options: Optional[UiPathExecuteOptions] = None,
    ) -> UiPathRuntimeResult:
        payload = input or {}
        child_inputs: list[dict[str, Any]] = payload.get("children", [])
        child_results: list[dict[str, Any]] = []

        for i, child_input in enumerate(child_inputs):
            # Use the factory to create a new child runtime
            child_runtime = await self.factory.new_runtime(entrypoint=f"child-{i}", runtime_id=f"child-{i}")

            # Wrap child runtime with tracing + logs
            execution_id = f"child-{i}"
            executor = UiPathExecutionRuntime(
                delegate=child_runtime,
                trace_manager=self.trace_manager,
                root_span=f"child-span-{i}",
                execution_id=execution_id,
            )

            # Execute child runtime
            result = await executor.execute(child_input, options=options)
            child_results.append(result.output or {})
            child_spans = trace_manager.get_execution_spans(execution_id) # Captured spans
            # Dispose the child runtime when finished
            await child_runtime.dispose()

        return UiPathRuntimeResult(
            output={
                "main": True,
                "children": child_results,
            },
            status=UiPathRuntimeStatus.SUCCESSFUL,
        )

    async def stream(
        self,
        input: Optional[dict[str, Any]] = None,
        options: Optional[UiPathStreamOptions] = None,
    ) -> AsyncGenerator[UiPathRuntimeResult, None]:
        yield await self.execute(input, options)

    async def dispose(self) -> None:
        pass


# Example usage
async def main() -> None:
    trace_manager = UiPathTraceManager()
    factory = ChildRuntimeFactory()
    options = UiPathExecuteOptions()

    with UiPathRuntimeContext(job_id="main-job-001") as ctx:
        runtime = OrchestratorRuntime(factory=factory, trace_manager=trace_manager)

        input_data = {
            "children": [
                {"message": "hello from child 1"},
                {"message": "hello from child 2"},
            ]
        }

        ctx.result = await runtime.execute(input=input_data, options=options)
        print(ctx.result.output)

# Output:
# {
#   "main": True,
#   "children": [
#       {"runtime": "child-0", "input": {"message": "hello from child 1"}},
#       {"runtime": "child-1", "input": {"message": "hello from child 2"}}
#   ]
# }

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

uipath_runtime-0.2.3.tar.gz (95.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

uipath_runtime-0.2.3-py3-none-any.whl (36.5 kB view details)

Uploaded Python 3

File details

Details for the file uipath_runtime-0.2.3.tar.gz.

File metadata

  • Download URL: uipath_runtime-0.2.3.tar.gz
  • Upload date:
  • Size: 95.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for uipath_runtime-0.2.3.tar.gz
Algorithm Hash digest
SHA256 d2c993586ad6bfc35de5a224f90589276f45105086b5fd955fe9be4a284ba5fd
MD5 8b717d7d4af1fe653b9bab3f799ec29a
BLAKE2b-256 dd50fe3da51da1426d49e73ae316b3f83ad9ceba942c2b45ecbd9919ef7fd719

See more details on using hashes here.

Provenance

The following attestation bundles were made for uipath_runtime-0.2.3.tar.gz:

Publisher: cd.yml on UiPath/uipath-runtime-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file uipath_runtime-0.2.3-py3-none-any.whl.

File metadata

  • Download URL: uipath_runtime-0.2.3-py3-none-any.whl
  • Upload date:
  • Size: 36.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for uipath_runtime-0.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 ae8f56de81f630ba719f832bd67ec548cea97540066820bb0a9bf77a592c5e63
MD5 59e541059bcd18f9bdb15f9c7fd9ca6d
BLAKE2b-256 c6a0b1898a03c6a13edc98f7497a43edae342616e4084a7f49d51adfe0194be1

See more details on using hashes here.

Provenance

The following attestation bundles were made for uipath_runtime-0.2.3-py3-none-any.whl:

Publisher: cd.yml on UiPath/uipath-runtime-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page