Python SDK for building Orbflow workflow plugins
Project description
orbflow-sdk
Python SDK for building Orbflow workflow plugins.
Build custom action and trigger nodes that appear in the Orbflow visual workflow builder and execute within the distributed engine. No protobuf code generation required.
Table of Contents
- Installation
- Quick Start
- Deploying to Orbflow
- API Reference
- Handler Context
- Return Values
- Plugin Manifest
- Examples
- Architecture
- Development
- License
Installation
pip install orbflow-sdk
Requirements: Python >= 3.10
Dependencies (installed automatically):
grpcio>= 1.60.0protobuf>= 4.25.0
Quick Start
Create a file called main.py:
from orbflow_sdk import plugin, action, Field, run
@plugin(name="my-plugin", version="1.0.0", author="You", category="utility")
class MyPlugin:
@action(
inputs=[Field.string("name", required=True)],
outputs=[Field.string("greeting")],
)
async def greet(self, ctx):
return {"greeting": f"Hello, {ctx.input['name']}!"}
if __name__ == "__main__":
run(MyPlugin)
Run it locally:
python main.py
# [orbflow-sdk] Plugin 'my-plugin' v1.0.0 serving on 0.0.0.0:50051 (1 actions)
Your plugin is now running a gRPC server. The Orbflow engine will discover it, fetch its node schemas, and make the "Greet" action available in the workflow builder.
The ref and display name auto-derive from the method name (greet -> plugin:greet, "Greet"). Override with explicit ref= and name= if needed.
Deploying to Orbflow
There are two ways to connect your plugin to a running Orbflow instance.
Option 1: Managed Plugin (Recommended)
Place your plugin in the Orbflow plugins directory and let the engine manage its lifecycle.
1. Create the plugin directory:
plugins/
└── my-plugin/
├── main.py # Entry point
├── pyproject.toml # Dependencies
└── orbflow-plugin.json # Plugin manifest
2. Create the manifest (orbflow-plugin.json):
{
"name": "my-plugin",
"version": "1.0.0",
"description": "My custom plugin",
"author": "Your Name",
"language": "python",
"protocol": {
"Grpc": { "default_port": 50051 }
},
"node_types": ["plugin:greet"]
}
3. Start Orbflow. The engine will:
- Detect your plugin in the
plugins/directory - Create a virtual environment and install dependencies automatically
- Launch your plugin with
python main.py - Assign a port via
ORBFLOW_PLUGIN_PORTenvironment variable - Poll the health check endpoint until the plugin is ready
- Call
GetSchemasto register your nodes in the workflow builder
Reading the assigned port:
import os
if __name__ == "__main__":
port = int(os.environ.get("ORBFLOW_PLUGIN_PORT", "50051"))
run(MyPlugin, port=port)
Option 2: Standalone Plugin
Run your plugin independently and tell Orbflow where to find it.
1. Start your plugin:
python main.py
# Listening on 0.0.0.0:50051
2. Register in Orbflow config (configs/orbflow.yaml):
plugins:
dir: "./plugins"
grpc:
- name: "my-plugin"
address: "http://localhost:50051"
timeout_secs: 30
3. Restart Orbflow. The engine connects to your plugin's gRPC endpoint, calls GetSchemas, and registers the nodes.
This approach is useful for:
- Plugins running on a different machine or in a separate container
- Development -- run the plugin with hot reload while Orbflow runs separately
- Plugins that need to manage their own lifecycle
Verifying Deployment
Once deployed, your plugin's nodes appear in the Orbflow workflow builder's node picker. You can verify by:
- Opening the workflow builder UI at
http://localhost:3000 - Clicking the "+" button to open the node picker
- Searching for your plugin's action name (e.g., "Greet")
- The node should appear with the icon and category you defined
Or via the API:
curl http://localhost:8080/api/node-types | jq '.data[] | select(.ref | startswith("plugin:"))'
API Reference
@plugin — Plugin Decorator
Marks a class as an Orbflow plugin.
@plugin(
name="orbflow-weather", # Unique package name
version="1.0.0", # SemVer version
author="Your Name", # Shown in marketplace
category="utility", # Category tab in marketplace
icon="cloud", # Lucide icon name
description="Get weather forecasts",
)
class WeatherPlugin:
...
@action — Action Node
Turns a method into a workflow action node.
@action(
inputs=[
Field.string("city", required=True).label("City Name"),
],
outputs=[
Field.number("temperature").label("Temperature (F)"),
Field.string("condition").label("Condition"),
],
parameters=[
Field.string("units").default("fahrenheit").enum("fahrenheit", "celsius"),
],
)
async def weather_forecast(self, ctx):
city = ctx.input["city"]
units = ctx.parameters.get("units", "fahrenheit")
return {"temperature": 72, "condition": "sunny"}
Naming rules:
- Action refs auto-derive from the method name:
weather_forecastbecomesplugin:weather-forecast - Display names auto-capitalize:
weather_forecastbecomes"Weather Forecast" - Override with explicit
ref=andname=parameters
Inputs are data flowing from upstream nodes. Parameters are static configuration set in the builder. Outputs define data passed to downstream nodes.
@action / @trigger Parameters
| Parameter | Type | Required | Default |
|---|---|---|---|
ref |
str |
No | Auto: plugin:<method-name> |
name |
str |
No | Auto: capitalized method name |
description |
str |
No | "" |
inputs |
list[Field] |
No | () |
outputs |
list[Field] |
No | () |
parameters |
list[Field] |
No | () |
@trigger — Trigger Node
For nodes that start a workflow:
from orbflow_sdk import trigger
@trigger(
outputs=[Field.string("filename"), Field.number("size")],
)
async def on_file_upload(self, ctx):
return {"filename": "report.pdf", "size": 1024}
Field — Field Builder
Fluent field definitions with chainable methods.
Field types:
| Factory | Type | UI Control |
|---|---|---|
Field.string(key) |
"string" |
Text input |
Field.number(key) |
"number" |
Number input |
Field.boolean(key) |
"boolean" |
Toggle switch |
Field.object(key) |
"object" |
JSON editor |
Field.array(key) |
"array" |
Array editor |
Field.credential(key) |
"credential" |
Credential picker |
Chainable methods (all return self):
Field.string("name", required=True)
.label("Full Name") # Display label (defaults to key)
.description("Enter name") # Help text shown on hover
.default("World") # Default value
.enum("a", "b", "c") # Dropdown options
Credential fields accept type filtering to restrict which stored credentials are selectable:
Field.credential("api_key").types("openai", "anthropic")
run() — gRPC Server
Starts the gRPC server. Blocks until SIGTERM/SIGINT.
run(MyPlugin)
# With options:
run(MyPlugin, host="0.0.0.0", port=50051, max_workers=10)
| Parameter | Type | Default | Description |
|---|---|---|---|
plugin_class |
type |
Required | The @plugin-decorated class |
host |
str |
"0.0.0.0" |
Bind address |
port |
int |
50051 |
gRPC port |
max_workers |
int |
10 |
Max concurrent RPC handlers |
Server details:
- Uses
grpciofor gRPC transport, wire-compatible with Orbflow's Rusttonicclient - Graceful shutdown on SIGTERM/SIGINT
- Error messages are sanitized -- full errors logged to console, generic "internal error" sent to caller
When managed by Orbflow, read the port from the environment:
import os
run(MyPlugin, port=int(os.environ.get("ORBFLOW_PLUGIN_PORT", "50051")))
Handler Context (ExecuteContext)
Every action/trigger handler receives an ExecuteContext:
| Property | Type | Description |
|---|---|---|
ctx.input |
dict[str, Any] |
Data from upstream nodes (mapped via CEL expressions or direct wiring) |
ctx.parameters |
dict[str, Any] |
Static config values set in the workflow builder UI |
ctx.config |
dict[str, Any] |
Resolved credentials and node configuration |
ctx.capabilities |
dict[str, Any] |
Output from capability nodes (e.g., database connections) |
ctx.instance_id |
str |
Workflow execution instance ID |
ctx.node_id |
str |
This node's ID in the workflow DAG |
ctx.plugin_ref |
str |
Action ref (e.g., "plugin:weather-forecast") |
ctx.attempt |
int |
Retry attempt number (0-based) |
Return Values
Handlers can return data in three ways:
# 1. Dict (most common) -- becomes the node's output data
async def my_action(self, ctx):
return {"result": "hello", "count": 42}
# 2. ActionResult -- explicit error (for controlled error reporting)
from orbflow_sdk import ActionResult
async def my_action(self, ctx):
return ActionResult(error="Something went wrong")
# 3. None -- no output data (for side-effect-only nodes like "send notification")
async def my_action(self, ctx):
await send_notification(ctx.input["message"])
Error handling:
# Raising an exception marks the node as failed:
async def my_action(self, ctx):
raise RuntimeError("API rate limit exceeded")
# Or return an explicit error with partial data:
async def my_action(self, ctx):
return ActionResult(error="Warning: some items failed")
Plugin Manifest
When deploying as a managed plugin, create an orbflow-plugin.json in your plugin directory:
{
"name": "my-plugin",
"version": "1.0.0",
"description": "Short description of what this plugin does",
"author": "Your Name",
"license": "MIT",
"language": "python",
"protocol": {
"Grpc": { "default_port": 50051 }
},
"node_types": [
"plugin:action-one",
"plugin:action-two"
],
"tags": ["utility", "transform"],
"repository": "https://github.com/your-org/your-plugin",
"icon": "zap",
"category": "utility",
"color": "#7C5CFC"
}
| Field | Required | Description |
|---|---|---|
name |
Yes | Plugin name (1-64 chars, alphanumeric + hyphens + underscores) |
version |
Yes | Semver version string |
language |
Yes | "python", "typescript", "javascript", or custom |
protocol |
Yes | { "Grpc": { "default_port": 50051 } } for gRPC plugins |
node_types |
Yes | Array of plugin:* refs this plugin provides |
description |
No | Short description for the marketplace |
author |
No | Author name |
icon |
No | Lucide icon name for the node in the builder |
category |
No | Marketplace category |
color |
No | Hex color for the node border |
tags |
No | Searchable tags |
repository |
No | Source code URL |
license |
No | SPDX license identifier |
Examples
UUID Generator
A simple utility plugin with parameters:
import uuid
from orbflow_sdk import plugin, action, Field, run
@plugin(name="orbflow-uuid-gen", version="1.0.0", author="Orbflow", category="utility", icon="hash")
class UuidGenPlugin:
@action(
ref="plugin:uuid-gen",
name="UUID Generator",
outputs=[Field.string("uuid"), Field.array("uuids")],
parameters=[
Field.number("count").default(1).description("How many UUIDs"),
Field.string("format").default("hyphenated").enum("hyphenated", "simple", "upper"),
],
)
async def generate(self, ctx):
count = min(max(int(ctx.parameters.get("count", 1)), 1), 1000)
uuids = [str(uuid.uuid4()) for _ in range(count)]
return {"uuid": uuids[0], "uuids": uuids}
if __name__ == "__main__":
run(UuidGenPlugin)
Credential-Authenticated API Plugin
Using the credential system to securely access external APIs:
import json
import urllib.request
from orbflow_sdk import plugin, action, Field, ActionResult, run
@plugin(name="orbflow-ai-codegen", version="1.0.0", author="Orbflow", category="ai", icon="terminal")
class AiCodegenPlugin:
@action(
ref="plugin:ai-codegen",
name="AI Code Generator",
inputs=[Field.string("prompt", required=True)],
outputs=[Field.string("code"), Field.string("model")],
parameters=[
Field.string("model").default("gpt-4o-mini"),
Field.credential("credential_id").types("openai", "anthropic"),
],
)
async def generate(self, ctx):
api_key = ctx.config.get("api_key", "")
if not api_key:
return ActionResult(error="Missing API key -- attach a credential")
body = json.dumps({
"model": ctx.parameters.get("model", "gpt-4o-mini"),
"messages": [{"role": "user", "content": ctx.input["prompt"]}],
}).encode()
base_url = ctx.config.get("base_url", "https://api.openai.com/v1").rstrip("/")
req = urllib.request.Request(
f"{base_url}/chat/completions",
data=body,
headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"},
)
with urllib.request.urlopen(req, timeout=60) as resp:
data = json.loads(resp.read())
return {"code": data["choices"][0]["message"]["content"], "model": data.get("model", "")}
if __name__ == "__main__":
run(AiCodegenPlugin)
Manifest for this plugin (orbflow-plugin.json):
{
"name": "orbflow-ai-codegen",
"version": "1.0.0",
"language": "python",
"protocol": { "Grpc": { "default_port": 50051 } },
"node_types": ["plugin:ai-codegen"]
}
Architecture
graph TD
subgraph Engine["Orbflow Engine"]
Server["Server\n(HTTP / gRPC)"]
Worker["Worker\n(task execution)"]
Loader["Plugin Loader\n(discovery + lifecycle)"]
end
Loader -- "gRPC (HTTP/2)" --> YourPlugin["Your Plugin\n(orbflow-sdk Python)"]
Loader -- "gRPC (HTTP/2)" --> OtherPlugin["Other Plugins\n(TypeScript SDK, custom)"]
Worker --> Loader
Server --> Worker
style Engine fill:#1a1a2e,stroke:#7C5CFC,stroke-width:2px,color:#fff
style YourPlugin fill:#7C5CFC,stroke:#5a3fd4,stroke-width:2px,color:#fff
style OtherPlugin fill:#2d2d44,stroke:#555,stroke-width:1px,color:#aaa
style Server fill:#2d2d44,stroke:#7C5CFC,color:#fff
style Worker fill:#2d2d44,stroke:#7C5CFC,color:#fff
style Loader fill:#2d2d44,stroke:#7C5CFC,color:#fff
How it works:
- Startup: Orbflow scans the
plugins/directory, finds yourorbflow-plugin.jsonmanifest - Install: Creates a venv and installs dependencies if
pyproject.tomlexists - Launch: Starts your plugin with
python main.py, passingORBFLOW_PLUGIN_PORT - Health check: Polls
HealthCheckRPC every 500ms until the plugin reports healthy (30s timeout) - Schema discovery: Calls
GetSchemasRPC to learn about your plugin's nodes (inputs, outputs, parameters, icons, categories) - Registration: Registers each
plugin:*ref as a node type in the engine, available in the builder UI - Execution: When a workflow runs your node, the engine calls
ExecuteRPC with input data, your handler runs, and the result flows to downstream nodes - Shutdown: On SIGTERM, the engine gracefully stops all managed plugin processes
gRPC service (orbflow.plugin.v1.OrbflowPlugin):
| RPC | Description |
|---|---|
Execute(ExecuteRequest) -> ExecuteResponse |
Run an action with input data |
GetSchemas(GetSchemasRequest) -> GetSchemasResponse |
Return all node schemas |
HealthCheck(HealthCheckRequest) -> HealthCheckResponse |
Report health and version |
Security: The engine strips sensitive environment variables (DATABASE_URL, API tokens, encryption keys) before launching plugin processes. Plugins only receive ORBFLOW_PLUGIN_PORT and non-sensitive env vars.
Development
Setup
git clone https://github.com/orbflow-dev/orbflow-python.git
cd orbflow-python
pip install -e ".[dev]"
Commands
pytest # Run tests
pytest --cov=orbflow_sdk # Run tests with coverage
pytest -x # Stop on first failure
pytest -k test_fields # Run specific test file
Project Structure
src/orbflow_sdk/
├── __init__.py # Public API exports
├── decorators.py # @plugin, @action, @trigger decorators
├── fields.py # Field builder (Field.string, Field.number, etc.)
├── types.py # Core type definitions (FieldDef, ExecuteContext, etc.)
├── server.py # gRPC server (grpcio)
├── _proto_codec.py # Hand-rolled protobuf encoder/decoder
├── schema.py # Node schema building for GetSchemas RPC
├── convert.py # Proto <-> SDK type conversion helpers
├── subprocess_runner.py # Legacy subprocess mode runner
└── gen/ # Generated protobuf stubs
tests/ # pytest tests (8 test files)
docs/
├── getting-started.md # Getting started guide
├── cookbook.md # Common patterns and recipes
└── reference.md # Full API reference
Testing
import pytest
from orbflow_sdk.types import ExecuteContext
from main import MyPlugin
@pytest.mark.asyncio
async def test_greet():
plugin = MyPlugin()
ctx = ExecuteContext(
instance_id="test", node_id="n1", plugin_ref="plugin:greet", attempt=0,
input={"name": "World"},
)
result = await plugin.greet(ctx)
assert result["greeting"] == "Hello, World!"
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file orbflow_sdk-0.1.0.tar.gz.
File metadata
- Download URL: orbflow_sdk-0.1.0.tar.gz
- Upload date:
- Size: 22.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ec0df7b76e2f6f35838932935d8b0d4863f8628ff712ab3b32f5aeb21cdab3a5
|
|
| MD5 |
a3d83de59243a5d58115c9cb1ce8bab2
|
|
| BLAKE2b-256 |
14d70232ade22a397a22f290ea69afdcf6536830b37a7a385bac3fee0cca1056
|
Provenance
The following attestation bundles were made for orbflow_sdk-0.1.0.tar.gz:
Publisher:
publish.yml on orbflow-dev/orbflow-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
orbflow_sdk-0.1.0.tar.gz -
Subject digest:
ec0df7b76e2f6f35838932935d8b0d4863f8628ff712ab3b32f5aeb21cdab3a5 - Sigstore transparency entry: 1200783567
- Sigstore integration time:
-
Permalink:
orbflow-dev/orbflow-python@2f1a4bf37fd7a70bd40fb07027e68a7560af615e -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/orbflow-dev
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@2f1a4bf37fd7a70bd40fb07027e68a7560af615e -
Trigger Event:
push
-
Statement type:
File details
Details for the file orbflow_sdk-0.1.0-py3-none-any.whl.
File metadata
- Download URL: orbflow_sdk-0.1.0-py3-none-any.whl
- Upload date:
- Size: 27.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b52e98f552c432e5430531adb07e662a345880b042c9eee21cb1de06eb70b1f6
|
|
| MD5 |
b3abf44965fbf63d88caa84ab7d5d5ae
|
|
| BLAKE2b-256 |
144ab36ea0455450e4a017af4ccc4f2623a68f64eef26be13ba40108a8a9633b
|
Provenance
The following attestation bundles were made for orbflow_sdk-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on orbflow-dev/orbflow-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
orbflow_sdk-0.1.0-py3-none-any.whl -
Subject digest:
b52e98f552c432e5430531adb07e662a345880b042c9eee21cb1de06eb70b1f6 - Sigstore transparency entry: 1200783592
- Sigstore integration time:
-
Permalink:
orbflow-dev/orbflow-python@2f1a4bf37fd7a70bd40fb07027e68a7560af615e -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/orbflow-dev
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@2f1a4bf37fd7a70bd40fb07027e68a7560af615e -
Trigger Event:
push
-
Statement type: