Skip to main content

Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem

Project description

UiPath Runtime

PyPI downloads Python versions

Runtime abstractions and contracts for the UiPath Python SDK.

Overview

uipath-runtime provides the foundational interfaces and base contracts for building agent runtimes in the UiPath ecosystem. It defines the protocols that all runtime implementations must follow and provides utilities for execution context, event streaming, tracing, and structured error handling.

This package is typically used as a dependency by higher-level SDKs such as:

You would use this directly only if you're building custom runtime implementations.

Installation

uv add uipath-runtime

Runtime Protocols

All runtimes implement the UiPathRuntimeProtocol (or one of its sub-protocols):

  • get_schema() — defines input and output JSON schemas.
  • execute(input, options) — executes the runtime logic and returns a UiPathRuntimeResult.
  • stream(input, options) — optionally streams runtime events for real-time monitoring.
  • dispose() — releases resources when the runtime is no longer needed.

Any class that structurally implements these methods satisfies the protocol.

from typing import Any, AsyncGenerator, Optional

from uipath.runtime import (
    UiPathRuntimeResult,
    UiPathRuntimeStatus,
    UiPathRuntimeSchema,
    UiPathRuntimeEvent,
    UiPathExecuteOptions,
    UiPathStreamOptions,
)
from uipath.runtime.events import UiPathRuntimeStateEvent


class MyRuntime:
    """Example runtime implementing the UiPath runtime protocols."""

    async def get_schema(self) -> UiPathRuntimeSchema:
        return UiPathRuntimeSchema(
            input={
                "type": "object",
                "properties": {"message": {"type": "string"}},
                "required": ["message"],
            },
            output={
                "type": "object",
                "properties": {"result": {"type": "string"}},
                "required": ["result"],
            },
        )

    async def execute(
        self,
        input: Optional[dict[str, Any]] = None,
        options: Optional[UiPathExecuteOptions] = None,
    ) -> UiPathRuntimeResult:
        message = (input or {}).get("message", "")
        return UiPathRuntimeResult(
            output={'message': 'Hello from MyRuntime'},
            status=UiPathRuntimeStatus.SUCCESSFUL,
        )

    async def stream(
        self,
        input: Optional[dict[str, Any]] = None,
        options: Optional[UiPathStreamOptions] = None,
    ) -> AsyncGenerator[UiPathRuntimeEvent, None]:
        yield UiPathRuntimeStateEvent(payload={"status": "starting"})
        yield UiPathRuntimeResult(
            output={"completed": True},
            status=UiPathRuntimeStatus.SUCCESSFUL,
        )

    async def dispose(self) -> None:
        pass

Event Streaming

Runtimes can optionally emit real-time events during execution:

from uipath.runtime.events import (
    UiPathRuntimeStateEvent,
    UiPathRuntimeMessageEvent,
)
from uipath.runtime.result import UiPathRuntimeResult

async for event in runtime.stream({"query": "hello"}):
    if isinstance(event, UiPathRuntimeStateEvent):
        print(f"State update: {event.payload}")
    elif isinstance(event, UiPathRuntimeMessageEvent):
        print(f"Message received: {event.payload}")
    elif isinstance(event, UiPathRuntimeResult):
        print(f"Completed: {event.output}")

If a runtime doesn’t support streaming, it raises a UiPathStreamNotSupportedError.

Structured Error Handling

Runtime errors use a consistent, structured model:

from uipath.runtime.errors import UiPathRuntimeError, UiPathErrorCode, UiPathErrorCategory

raise UiPathRuntimeError(
    UiPathErrorCode.EXECUTION_ERROR,
    "Agent failed",
    "Failed to call external service",
    UiPathErrorCategory.USER,
)

Resulting JSON contract:

{
  "code": "Python.EXECUTION_ERROR",
  "title": "Agent failed",
  "detail": "Failed to call external service",
  "category": "User"
}

Runtime Factory

UiPathRuntimeFactoryProtocol provides a consistent contract for discovering and creating runtime instances.

Factories decouple runtime construction (configuration, dependencies) from runtime execution, allowing orchestration, discovery, reuse, and tracing across multiple types of runtimes.

from typing import Any, AsyncGenerator, Optional

from uipath.runtime import (
    UiPathRuntimeResult,
    UiPathRuntimeStatus,
    UiPathRuntimeSchema,
    UiPathExecuteOptions,
    UiPathStreamOptions,
    UiPathRuntimeProtocol,
    UiPathRuntimeFactoryProtocol
)

class MyRuntimeFactory:
    async def new_runtime(self, entrypoint: str) -> UiPathRuntimeProtocol:
        return MyRuntime()

    def discover_runtimes(self) -> list[UiPathRuntimeProtocol]:
        return []

    def discover_entrypoints(self) -> list[str]:
        return []


factory = MyRuntimeFactory()
runtime = await factory.new_runtime("example")

result = await runtime.execute()
print(result.output)  # {'message': 'Hello from MyRuntime'}

Execution Context

UiPathRuntimeContext manages configuration, file I/O, and logs across runtime execution. It can read JSON input files, capture all stdout/stderr logs, and automatically write output and result files when execution completes.

from uipath.runtime import UiPathRuntimeContext, UiPathRuntimeResult, UiPathRuntimeStatus

with UiPathRuntimeContext(input_file="input.json", result_file="result.json", logs_file="execution.log") as ctx:
    ctx.result = await runtime.execute(ctx.input)
# On exit: the result and logs are written automatically to the configured files

When execution fails, the context:

  • Writes a structured error contract to the result file.
  • Re-raises the original exception.

Execution Runtime

UiPathExecutionRuntime wraps any runtime with tracing, telemetry, and log collection capabilities. When running multiple runtimes in the same process, this wrapper ensures each execution's spans and logs are properly isolated and captured.

graph TB
    TM[TraceManager<br/>Shared across all runtimes]

    FACTORY[Factory]

    RT[Runtime]

    EXE[ExecutionRuntime<br/>exec-id: exec-id]

    %% Factory creates runtimes
    FACTORY -->|new_runtime| RT

    %% Runtimes wrapped by ExecutionRuntime
    RT -->|wrapped by| EXE

    %% TraceManager shared with all
    TM -.->|shared| EXE

    %% Execution captures spans to TraceManager
    EXE -->|captures spans| TM

    %% Styling
    style TM fill:#e1f5ff,stroke:#0277bd,stroke-width:3px
    style FACTORY fill:#f3e5f5
    style RT fill:#fff3e0
    style EXE fill:#e8f5e9
from uipath.core import UiPathTraceManager
from uipath.runtime import UiPathExecutionRuntime

trace_manager = UiPathTraceManager()
runtime = MyRuntime()
executor = UiPathExecutionRuntime(
    runtime,
    trace_manager,
    root_span="my-runtime",
    execution_id="exec-123",
)

result = await executor.execute({"message": "hello"})
spans = trace_manager.get_execution_spans("exec-123")  # captured spans
logs = executor.log_handler.buffer  # captured logs
print(result.output)  # {'message': 'Hello from MyRuntime'}

Example: Runtime Orchestration

This example demonstrates an orchestrator runtime that receives a UiPathRuntimeFactoryProtocol, creates child runtimes through it, and executes each one via UiPathExecutionRuntime, all within a single shared UiPathTraceManager.

Orchestrator Runtime
from typing import Any, Optional, AsyncGenerator

from uipath.core import UiPathTraceManager
from uipath.runtime import (
    UiPathExecutionRuntime,
    UiPathRuntimeResult,
    UiPathRuntimeStatus,
    UiPathExecuteOptions,
    UiPathStreamOptions,
    UiPathRuntimeProtocol,
    UiPathRuntimeFactoryProtocol
)


class ChildRuntime:
    """A simple child runtime that echoes its name and input."""

    def __init__(self, name: str):
        self.name = name

    async def get_schema(self):
        return None

    async def execute(
        self,
        input: Optional[dict[str, Any]] = None,
        options: Optional[UiPathExecuteOptions] = None,
    ) -> UiPathRuntimeResult:
        payload = input or {}
        return UiPathRuntimeResult(
            output={
                "runtime": self.name,
                "input": payload,
            },
            status=UiPathRuntimeStatus.SUCCESSFUL,
        )

    async def stream(
        self,
        input: Optional[dict[str, Any]] = None,
        options: Optional[UiPathStreamOptions] = None,
    ) -> AsyncGenerator[UiPathRuntimeResult, None]:
        yield await self.execute(input, options)

    async def dispose(self) -> None:
        pass


class ChildRuntimeFactory:
    """Factory that creates ChildRuntime instances."""

    async def new_runtime(self, entrypoint: str) -> UiPathRuntimeProtocol:
        return ChildRuntime(name=entrypoint)

    def discover_runtimes(self) -> list[UiPathRuntimeProtocol]:
        return []

    def discover_entrypoints(self) -> list[str]:
        return []


class OrchestratorRuntime:
    """A runtime that orchestrates multiple child runtimes via a factory."""

    def __init__(
        self,
        factory: UiPathRuntimeFactoryProtocol,
        trace_manager: UiPathTraceManager,
    ):
        self.factory = factory
        self.trace_manager = trace_manager

    async def get_schema(self):
        return None

    async def execute(
        self,
        input: Optional[dict[str, Any]] = None,
        options: Optional[UiPathExecuteOptions] = None,
    ) -> UiPathRuntimeResult:
        payload = input or {}
        child_inputs: list[dict[str, Any]] = payload.get("children", [])
        child_results: list[dict[str, Any]] = []

        for i, child_input in enumerate(child_inputs):
            # Use the factory to create a new child runtime
            child_runtime = await self.factory.new_runtime(entrypoint=f"child-{i}")

            # Wrap child runtime with tracing + logs
            execution_id = f"child-{i}"
            executor = UiPathExecutionRuntime(
                delegate=child_runtime,
                trace_manager=self.trace_manager,
                root_span=f"child-span-{i}",
                execution_id=execution_id,
            )

            # Execute child runtime
            result = await executor.execute(child_input, options=options)
            child_results.append(result.output or {})
            child_spans = trace_manager.get_execution_spans(execution_id) # Captured spans
            # Dispose the child runtime when finished
            await child_runtime.dispose()

        return UiPathRuntimeResult(
            output={
                "main": True,
                "children": child_results,
            },
            status=UiPathRuntimeStatus.SUCCESSFUL,
        )

    async def stream(
        self,
        input: Optional[dict[str, Any]] = None,
        options: Optional[UiPathStreamOptions] = None,
    ) -> AsyncGenerator[UiPathRuntimeResult, None]:
        yield await self.execute(input, options)

    async def dispose(self) -> None:
        pass


# Example usage
async def main() -> None:
    trace_manager = UiPathTraceManager()
    factory = ChildRuntimeFactory()
    options = UiPathExecuteOptions()

    with UiPathRuntimeContext(job_id="main-job-001") as ctx:
        runtime = OrchestratorRuntime(factory=factory, trace_manager=trace_manager)

        input_data = {
            "children": [
                {"message": "hello from child 1"},
                {"message": "hello from child 2"},
            ]
        }

        ctx.result = await runtime.execute(input=input_data, options=options)
        print(ctx.result.output)

# Output:
# {
#   "main": True,
#   "children": [
#       {"runtime": "child-0", "input": {"message": "hello from child 1"}},
#       {"runtime": "child-1", "input": {"message": "hello from child 2"}}
#   ]
# }

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

uipath_runtime-0.0.7.tar.gz (81.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

uipath_runtime-0.0.7-py3-none-any.whl (26.8 kB view details)

Uploaded Python 3

File details

Details for the file uipath_runtime-0.0.7.tar.gz.

File metadata

  • Download URL: uipath_runtime-0.0.7.tar.gz
  • Upload date:
  • Size: 81.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for uipath_runtime-0.0.7.tar.gz
Algorithm Hash digest
SHA256 5569466d125be30b90982bf2396635b50450d8bcaee88bf5de3556eb4810b650
MD5 89d4ca42ff007dbde54a2767d7bd1ada
BLAKE2b-256 7369a06b0dc0072ad8c8d40084899fd7c4ee25b1ee4ae5c6529a74d759625355

See more details on using hashes here.

Provenance

The following attestation bundles were made for uipath_runtime-0.0.7.tar.gz:

Publisher: cd.yml on UiPath/uipath-runtime-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file uipath_runtime-0.0.7-py3-none-any.whl.

File metadata

  • Download URL: uipath_runtime-0.0.7-py3-none-any.whl
  • Upload date:
  • Size: 26.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for uipath_runtime-0.0.7-py3-none-any.whl
Algorithm Hash digest
SHA256 c19ae2e7283961d7a69319b70de7196132d0439722793bd8ccbe66684fe48736
MD5 9379c36822b524f748642dc4e0cc902f
BLAKE2b-256 db606af437fc8d7914f292300487252dffecdd6b79f69dda8b7f22e4c0526dd6

See more details on using hashes here.

Provenance

The following attestation bundles were made for uipath_runtime-0.0.7-py3-none-any.whl:

Publisher: cd.yml on UiPath/uipath-runtime-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page