Skip to main content

Python SDK for building Orbflow workflow plugins

Project description

orbflow-sdk

Python SDK for building Orbflow workflow plugins.

Build custom action and trigger nodes that appear in the Orbflow visual workflow builder and execute within the distributed engine. No protobuf code generation required.

Table of Contents

Installation

pip install orbflow-sdk

Requirements: Python >= 3.10

Dependencies (installed automatically):

  • grpcio >= 1.60.0
  • protobuf >= 4.25.0

Quick Start

Create a file called main.py:

from orbflow_sdk import plugin, action, Field, run

@plugin(name="my-plugin", version="1.0.0", author="You", category="utility")
class MyPlugin:

    @action(
        inputs=[Field.string("name", required=True)],
        outputs=[Field.string("greeting")],
    )
    async def greet(self, ctx):
        return {"greeting": f"Hello, {ctx.input['name']}!"}

if __name__ == "__main__":
    run(MyPlugin)

Run it locally:

python main.py
# [orbflow-sdk] Plugin 'my-plugin' v1.0.0 serving on 0.0.0.0:50051 (1 actions)

Your plugin is now running a gRPC server. The Orbflow engine will discover it, fetch its node schemas, and make the "Greet" action available in the workflow builder.

The ref and display name auto-derive from the method name (greet -> plugin:greet, "Greet"). Override with explicit ref= and name= if needed.

Deploying to Orbflow

There are two ways to connect your plugin to a running Orbflow instance.

Option 1: Managed Plugin (Recommended)

Place your plugin in the Orbflow plugins directory and let the engine manage its lifecycle.

1. Create the plugin directory:

plugins/
└── my-plugin/
    ├── main.py              # Entry point
    ├── pyproject.toml       # Dependencies
    └── orbflow-plugin.json  # Plugin manifest

2. Create the manifest (orbflow-plugin.json):

{
  "name": "my-plugin",
  "version": "1.0.0",
  "description": "My custom plugin",
  "author": "Your Name",
  "language": "python",
  "protocol": {
    "Grpc": { "default_port": 50051 }
  },
  "node_types": ["plugin:greet"]
}

3. Start Orbflow. The engine will:

  • Detect your plugin in the plugins/ directory
  • Create a virtual environment and install dependencies automatically
  • Launch your plugin with python main.py
  • Assign a port via ORBFLOW_PLUGIN_PORT environment variable
  • Poll the health check endpoint until the plugin is ready
  • Call GetSchemas to register your nodes in the workflow builder

Reading the assigned port:

import os

if __name__ == "__main__":
    port = int(os.environ.get("ORBFLOW_PLUGIN_PORT", "50051"))
    run(MyPlugin, port=port)

Option 2: Standalone Plugin

Run your plugin independently and tell Orbflow where to find it.

1. Start your plugin:

python main.py
# Listening on 0.0.0.0:50051

2. Register in Orbflow config (configs/orbflow.yaml):

plugins:
  dir: "./plugins"
  grpc:
    - name: "my-plugin"
      address: "http://localhost:50051"
      timeout_secs: 30

3. Restart Orbflow. The engine connects to your plugin's gRPC endpoint, calls GetSchemas, and registers the nodes.

This approach is useful for:

  • Plugins running on a different machine or in a separate container
  • Development -- run the plugin with hot reload while Orbflow runs separately
  • Plugins that need to manage their own lifecycle

Verifying Deployment

Once deployed, your plugin's nodes appear in the Orbflow workflow builder's node picker. You can verify by:

  1. Opening the workflow builder UI at http://localhost:3000
  2. Clicking the "+" button to open the node picker
  3. Searching for your plugin's action name (e.g., "Greet")
  4. The node should appear with the icon and category you defined

Or via the API:

curl http://localhost:8080/api/node-types | jq '.data[] | select(.ref | startswith("plugin:"))'

API Reference

@plugin — Plugin Decorator

Marks a class as an Orbflow plugin.

@plugin(
    name="orbflow-weather",       # Unique package name
    version="1.0.0",              # SemVer version
    author="Your Name",           # Shown in marketplace
    category="utility",           # Category tab in marketplace
    icon="cloud",                 # Lucide icon name
    description="Get weather forecasts",
)
class WeatherPlugin:
    ...

@action — Action Node

Turns a method into a workflow action node.

@action(
    inputs=[
        Field.string("city", required=True).label("City Name"),
    ],
    outputs=[
        Field.number("temperature").label("Temperature (F)"),
        Field.string("condition").label("Condition"),
    ],
    parameters=[
        Field.string("units").default("fahrenheit").enum("fahrenheit", "celsius"),
    ],
)
async def weather_forecast(self, ctx):
    city = ctx.input["city"]
    units = ctx.parameters.get("units", "fahrenheit")
    return {"temperature": 72, "condition": "sunny"}

Naming rules:

  • Action refs auto-derive from the method name: weather_forecast becomes plugin:weather-forecast
  • Display names auto-capitalize: weather_forecast becomes "Weather Forecast"
  • Override with explicit ref= and name= parameters

Inputs are data flowing from upstream nodes. Parameters are static configuration set in the builder. Outputs define data passed to downstream nodes.

@action / @trigger Parameters

Parameter Type Required Default
ref str No Auto: plugin:<method-name>
name str No Auto: capitalized method name
description str No ""
inputs list[Field] No ()
outputs list[Field] No ()
parameters list[Field] No ()

@trigger — Trigger Node

For nodes that start a workflow:

from orbflow_sdk import trigger

@trigger(
    outputs=[Field.string("filename"), Field.number("size")],
)
async def on_file_upload(self, ctx):
    return {"filename": "report.pdf", "size": 1024}

Field — Field Builder

Fluent field definitions with chainable methods.

Field types:

Factory Type UI Control
Field.string(key) "string" Text input
Field.number(key) "number" Number input
Field.boolean(key) "boolean" Toggle switch
Field.object(key) "object" JSON editor
Field.array(key) "array" Array editor
Field.credential(key) "credential" Credential picker

Chainable methods (all return self):

Field.string("name", required=True)
    .label("Full Name")          # Display label (defaults to key)
    .description("Enter name")   # Help text shown on hover
    .default("World")            # Default value
    .enum("a", "b", "c")        # Dropdown options

Credential fields accept type filtering to restrict which stored credentials are selectable:

Field.credential("api_key").types("openai", "anthropic")

run() — gRPC Server

Starts the gRPC server. Blocks until SIGTERM/SIGINT.

run(MyPlugin)

# With options:
run(MyPlugin, host="0.0.0.0", port=50051, max_workers=10)
Parameter Type Default Description
plugin_class type Required The @plugin-decorated class
host str "0.0.0.0" Bind address
port int 50051 gRPC port
max_workers int 10 Max concurrent RPC handlers

Server details:

  • Uses grpcio for gRPC transport, wire-compatible with Orbflow's Rust tonic client
  • Graceful shutdown on SIGTERM/SIGINT
  • Error messages are sanitized -- full errors logged to console, generic "internal error" sent to caller

When managed by Orbflow, read the port from the environment:

import os

run(MyPlugin, port=int(os.environ.get("ORBFLOW_PLUGIN_PORT", "50051")))

Handler Context (ExecuteContext)

Every action/trigger handler receives an ExecuteContext:

Property Type Description
ctx.input dict[str, Any] Data from upstream nodes (mapped via CEL expressions or direct wiring)
ctx.parameters dict[str, Any] Static config values set in the workflow builder UI
ctx.config dict[str, Any] Resolved credentials and node configuration
ctx.capabilities dict[str, Any] Output from capability nodes (e.g., database connections)
ctx.instance_id str Workflow execution instance ID
ctx.node_id str This node's ID in the workflow DAG
ctx.plugin_ref str Action ref (e.g., "plugin:weather-forecast")
ctx.attempt int Retry attempt number (0-based)

Return Values

Handlers can return data in three ways:

# 1. Dict (most common) -- becomes the node's output data
async def my_action(self, ctx):
    return {"result": "hello", "count": 42}

# 2. ActionResult -- explicit error (for controlled error reporting)
from orbflow_sdk import ActionResult

async def my_action(self, ctx):
    return ActionResult(error="Something went wrong")

# 3. None -- no output data (for side-effect-only nodes like "send notification")
async def my_action(self, ctx):
    await send_notification(ctx.input["message"])

Error handling:

# Raising an exception marks the node as failed:
async def my_action(self, ctx):
    raise RuntimeError("API rate limit exceeded")

# Or return an explicit error with partial data:
async def my_action(self, ctx):
    return ActionResult(error="Warning: some items failed")

Plugin Manifest

When deploying as a managed plugin, create an orbflow-plugin.json in your plugin directory:

{
  "name": "my-plugin",
  "version": "1.0.0",
  "description": "Short description of what this plugin does",
  "author": "Your Name",
  "license": "MIT",
  "language": "python",
  "protocol": {
    "Grpc": { "default_port": 50051 }
  },
  "node_types": [
    "plugin:action-one",
    "plugin:action-two"
  ],
  "tags": ["utility", "transform"],
  "repository": "https://github.com/your-org/your-plugin",
  "icon": "zap",
  "category": "utility",
  "color": "#7C5CFC"
}
Field Required Description
name Yes Plugin name (1-64 chars, alphanumeric + hyphens + underscores)
version Yes Semver version string
language Yes "python", "typescript", "javascript", or custom
protocol Yes { "Grpc": { "default_port": 50051 } } for gRPC plugins
node_types Yes Array of plugin:* refs this plugin provides
description No Short description for the marketplace
author No Author name
icon No Lucide icon name for the node in the builder
category No Marketplace category
color No Hex color for the node border
tags No Searchable tags
repository No Source code URL
license No SPDX license identifier

Examples

UUID Generator

A simple utility plugin with parameters:

import uuid
from orbflow_sdk import plugin, action, Field, run

@plugin(name="orbflow-uuid-gen", version="1.0.0", author="Orbflow", category="utility", icon="hash")
class UuidGenPlugin:

    @action(
        ref="plugin:uuid-gen",
        name="UUID Generator",
        outputs=[Field.string("uuid"), Field.array("uuids")],
        parameters=[
            Field.number("count").default(1).description("How many UUIDs"),
            Field.string("format").default("hyphenated").enum("hyphenated", "simple", "upper"),
        ],
    )
    async def generate(self, ctx):
        count = min(max(int(ctx.parameters.get("count", 1)), 1), 1000)
        uuids = [str(uuid.uuid4()) for _ in range(count)]
        return {"uuid": uuids[0], "uuids": uuids}

if __name__ == "__main__":
    run(UuidGenPlugin)

Credential-Authenticated API Plugin

Using the credential system to securely access external APIs:

import json
import urllib.request
from orbflow_sdk import plugin, action, Field, ActionResult, run

@plugin(name="orbflow-ai-codegen", version="1.0.0", author="Orbflow", category="ai", icon="terminal")
class AiCodegenPlugin:

    @action(
        ref="plugin:ai-codegen",
        name="AI Code Generator",
        inputs=[Field.string("prompt", required=True)],
        outputs=[Field.string("code"), Field.string("model")],
        parameters=[
            Field.string("model").default("gpt-4o-mini"),
            Field.credential("credential_id").types("openai", "anthropic"),
        ],
    )
    async def generate(self, ctx):
        api_key = ctx.config.get("api_key", "")
        if not api_key:
            return ActionResult(error="Missing API key -- attach a credential")

        body = json.dumps({
            "model": ctx.parameters.get("model", "gpt-4o-mini"),
            "messages": [{"role": "user", "content": ctx.input["prompt"]}],
        }).encode()

        base_url = ctx.config.get("base_url", "https://api.openai.com/v1").rstrip("/")
        req = urllib.request.Request(
            f"{base_url}/chat/completions",
            data=body,
            headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"},
        )
        with urllib.request.urlopen(req, timeout=60) as resp:
            data = json.loads(resp.read())

        return {"code": data["choices"][0]["message"]["content"], "model": data.get("model", "")}

if __name__ == "__main__":
    run(AiCodegenPlugin)

Manifest for this plugin (orbflow-plugin.json):

{
  "name": "orbflow-ai-codegen",
  "version": "1.0.0",
  "language": "python",
  "protocol": { "Grpc": { "default_port": 50051 } },
  "node_types": ["plugin:ai-codegen"]
}

Architecture

graph TD
    subgraph Engine["Orbflow Engine"]
        Server["Server\n(HTTP / gRPC)"]
        Worker["Worker\n(task execution)"]
        Loader["Plugin Loader\n(discovery + lifecycle)"]
    end

    Loader -- "gRPC (HTTP/2)" --> YourPlugin["Your Plugin\n(orbflow-sdk Python)"]
    Loader -- "gRPC (HTTP/2)" --> OtherPlugin["Other Plugins\n(TypeScript SDK, custom)"]
    Worker --> Loader
    Server --> Worker

    style Engine fill:#1a1a2e,stroke:#7C5CFC,stroke-width:2px,color:#fff
    style YourPlugin fill:#7C5CFC,stroke:#5a3fd4,stroke-width:2px,color:#fff
    style OtherPlugin fill:#2d2d44,stroke:#555,stroke-width:1px,color:#aaa
    style Server fill:#2d2d44,stroke:#7C5CFC,color:#fff
    style Worker fill:#2d2d44,stroke:#7C5CFC,color:#fff
    style Loader fill:#2d2d44,stroke:#7C5CFC,color:#fff

How it works:

  1. Startup: Orbflow scans the plugins/ directory, finds your orbflow-plugin.json manifest
  2. Install: Creates a venv and installs dependencies if pyproject.toml exists
  3. Launch: Starts your plugin with python main.py, passing ORBFLOW_PLUGIN_PORT
  4. Health check: Polls HealthCheck RPC every 500ms until the plugin reports healthy (30s timeout)
  5. Schema discovery: Calls GetSchemas RPC to learn about your plugin's nodes (inputs, outputs, parameters, icons, categories)
  6. Registration: Registers each plugin:* ref as a node type in the engine, available in the builder UI
  7. Execution: When a workflow runs your node, the engine calls Execute RPC with input data, your handler runs, and the result flows to downstream nodes
  8. Shutdown: On SIGTERM, the engine gracefully stops all managed plugin processes

gRPC service (orbflow.plugin.v1.OrbflowPlugin):

RPC Description
Execute(ExecuteRequest) -> ExecuteResponse Run an action with input data
GetSchemas(GetSchemasRequest) -> GetSchemasResponse Return all node schemas
HealthCheck(HealthCheckRequest) -> HealthCheckResponse Report health and version

Security: The engine strips sensitive environment variables (DATABASE_URL, API tokens, encryption keys) before launching plugin processes. Plugins only receive ORBFLOW_PLUGIN_PORT and non-sensitive env vars.

Development

Setup

git clone https://github.com/orbflow-dev/orbflow-python.git
cd orbflow-python
pip install -e ".[dev]"

Commands

pytest                    # Run tests
pytest --cov=orbflow_sdk  # Run tests with coverage
pytest -x                 # Stop on first failure
pytest -k test_fields     # Run specific test file

Project Structure

src/orbflow_sdk/
├── __init__.py            # Public API exports
├── decorators.py          # @plugin, @action, @trigger decorators
├── fields.py              # Field builder (Field.string, Field.number, etc.)
├── types.py               # Core type definitions (FieldDef, ExecuteContext, etc.)
├── server.py              # gRPC server (grpcio)
├── _proto_codec.py        # Hand-rolled protobuf encoder/decoder
├── schema.py              # Node schema building for GetSchemas RPC
├── convert.py             # Proto <-> SDK type conversion helpers
├── subprocess_runner.py   # Legacy subprocess mode runner
└── gen/                   # Generated protobuf stubs

tests/                     # pytest tests (8 test files)
docs/
├── getting-started.md     # Getting started guide
├── cookbook.md             # Common patterns and recipes
└── reference.md           # Full API reference

Testing

import pytest
from orbflow_sdk.types import ExecuteContext
from main import MyPlugin

@pytest.mark.asyncio
async def test_greet():
    plugin = MyPlugin()
    ctx = ExecuteContext(
        instance_id="test", node_id="n1", plugin_ref="plugin:greet", attempt=0,
        input={"name": "World"},
    )
    result = await plugin.greet(ctx)
    assert result["greeting"] == "Hello, World!"

License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

orbflow_sdk-0.1.0.tar.gz (22.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

orbflow_sdk-0.1.0-py3-none-any.whl (27.9 kB view details)

Uploaded Python 3

File details

Details for the file orbflow_sdk-0.1.0.tar.gz.

File metadata

  • Download URL: orbflow_sdk-0.1.0.tar.gz
  • Upload date:
  • Size: 22.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for orbflow_sdk-0.1.0.tar.gz
Algorithm Hash digest
SHA256 ec0df7b76e2f6f35838932935d8b0d4863f8628ff712ab3b32f5aeb21cdab3a5
MD5 a3d83de59243a5d58115c9cb1ce8bab2
BLAKE2b-256 14d70232ade22a397a22f290ea69afdcf6536830b37a7a385bac3fee0cca1056

See more details on using hashes here.

Provenance

The following attestation bundles were made for orbflow_sdk-0.1.0.tar.gz:

Publisher: publish.yml on orbflow-dev/orbflow-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file orbflow_sdk-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: orbflow_sdk-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 27.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for orbflow_sdk-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b52e98f552c432e5430531adb07e662a345880b042c9eee21cb1de06eb70b1f6
MD5 b3abf44965fbf63d88caa84ab7d5d5ae
BLAKE2b-256 144ab36ea0455450e4a017af4ccc4f2623a68f64eef26be13ba40108a8a9633b

See more details on using hashes here.

Provenance

The following attestation bundles were made for orbflow_sdk-0.1.0-py3-none-any.whl:

Publisher: publish.yml on orbflow-dev/orbflow-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page