Python SDK and CLI for authoring, packaging, and publishing Pegasus workflows.
Project description
Pegasus Workflows SDK
pegasus-workflows-sdk is the Python SDK and CLI for authoring, packaging, and
publishing Pegasus workflows — Temporal workflows that automate
cross-domain operations (move lifecycle, billing follow-ups, dispatch
decisions) against the Pegasus public API.
Phase 1 ships the developer flow: write a workflow locally, run it against a Dockerized Temporal, package it, and upload it. There is no server-side execution yet — the API stores the artifact and lists it.
Install
pip install pegasus-workflows-sdk
This installs the pegasus-workflows CLI. Python 3.11+ is required. Pin the
version in your project's requirements for reproducible builds, e.g.
pegasus-workflows-sdk==0.1.0.
Interim / unreleased install (git)
The repository is public, so you can install straight from a tagged commit without waiting for a PyPI release — useful for an unreleased fix, or before the first PyPI publish lands:
pip install "pegasus-workflows-sdk @ git+https://github.com/DolasDev/pegasus@sdk-python-v0.1.0#subdirectory=packages/workflows-sdk-python"
Swap the @sdk-python-v0.1.0 tag for @main to track the latest unreleased
SDK. This clones the whole monorepo to build one subdirectory, so prefer the
PyPI install for everyday use.
Quick start
pegasus-workflows init demo
cd demo
pegasus-workflows test demo
pegasus-workflows diagram # generate workflow.mmd (needs [diagram] extra + ANTHROPIC_API_KEY)
pegasus-workflows package
pegasus-workflows push --token=vnd_... --base-url=http://localhost:3000
A workflow diagram (
<source_dir>/workflow.mmd) is required to publish.initships a starter one;pegasus-workflows diagramregenerates it from your code via the Anthropic API. Business users view it in the Pegasus tenant UI to confirm the workflow matches their business rules. See Visualizing workflows.
Authoring
Import the Temporal authoring primitives from pegasus_workflows and mark your
workflow class with @pegasus_workflow:
from datetime import timedelta
from pegasus_workflows import activity, pegasus_workflow, workflow
@activity.defn
async def greet(name: str) -> str:
return f"Hello, {name}!"
@pegasus_workflow(name="demo", version="0.1.0")
class HelloWorkflow:
@workflow.run
async def run(self, name: str = "world") -> str:
return await workflow.execute_activity(
greet, name, start_to_close_timeout=timedelta(seconds=10)
)
@pegasus_workflow wraps temporalio.workflow.defn and records the
(name, version) used by the manifest.
Input contract: how run() receives its argument
Your run() method receives a single positional argument whose shape depends on how the workflow
was started:
1. Trigger-fired (domain-event trigger) — the dispatcher passes the full event envelope:
{
"domainEventId": "<uuid>",
"eventType": "quote.accepted", # the event type that fired the trigger
"occurredAt": "<ISO-8601>",
"payload": {"quoteId": "<id>", "moveId": "<id>"} # entity ids, camelCase
}
Read entity ids from arg["payload"]["quoteId"] etc. The payload is a pointer, not a full snapshot
— always re-fetch authoritative state from the Pegasus API using those ids rather than relying on the
payload alone.
2. Manual run — POST /api/v1/workflows/:id/run passes:
{"executionId": "<uuid>", "input": <user-supplied dict>}
Read your business data from arg["input"] (e.g. arg["input"]["quote_id"]).
3. CLI test — pegasus-workflows test <name> passes a raw string for local-dev parity.
Your run() should handle all three shapes. A module-level helper (not a method) is the recommended
pattern — it stays unit-testable without a Temporal worker context:
def _resolve_quote_id(payload: dict | str) -> str:
if isinstance(payload, str):
return payload
event_payload = payload.get("payload") if isinstance(payload, dict) else None
if isinstance(event_payload, dict) and event_payload.get("quoteId"):
return str(event_payload["quoteId"])
inner = payload.get("input") if isinstance(payload, dict) else None
if isinstance(inner, dict) and inner.get("quote_id"):
return str(inner["quote_id"])
return "quote-unknown"
Sending an SMS
Inside a workflow activity, call client.send_sms to send an outbound text message
via the tenant's configured SMS provider. The platform holds the provider credentials —
no credential needs to appear in the workflow source or manifest.
from pegasus_workflows import activity
from pegasus_workflows.api import PegasusClient
import os
@activity.defn
async def send_alert_sms(to: str, message: str) -> dict:
client = PegasusClient(
base_url=os.environ["PEGASUS_BASE_URL"],
token=os.environ["PEGASUS_WORKFLOW_TOKEN"],
)
return client.send_sms(to=to, body=message)
Declare the capability in pegasus-workflows.toml:
[[workflow]]
name = "order-saved-notify"
version = "0.1.0"
entry_points = ["order_saved.workflow:OrderSavedWorkflow"]
required_actions = ["SendSms"]
send_sms raises PegasusApiError (403) if SendSms is absent from required_actions,
or (404) if the tenant has no SMS provider connected. The to number must be E.164
(e.g. "+16308868537").
Secrets & configuration
A workflow reads two kinds of per-tenant key/value data at runtime — secrets (write-once, encrypted at rest; e.g. a third-party API key) and config (plain, editable; e.g. a region or base URL). Both are scoped to the whole tenant, so every workflow the tenant owns reads the same namespace. Values live in the platform — they never appear in the workflow source or artifact.
1. Publish the values once with a vnd_ key holding the manage actions (the
workflow_developer or tenant_admin role), via the CLI:
pegasus-workflows secrets set STRIPE_API_KEY "sk_live_…" --token=vnd_… --base-url=…
pegasus-workflows config set DEFAULT_REGION us-east-1 --token=vnd_… --base-url=…
pegasus-workflows secrets list --token=vnd_… # metadata only — never values
or from Python (client.set_secret(...), client.set_config(...),
client.list_secrets(), client.delete_secret(...)). Secrets are write-once —
delete then set again to rotate; set_config is an idempotent upsert.
2. Declare the read actions your workflow needs in pegasus-workflows.toml:
[[workflow]]
name = "charge-on-quote-accepted"
version = "0.1.0"
entry_points = ["charge.workflow:ChargeWorkflow"]
required_actions = ["ReadWorkflowSecret", "ReadWorkflowConfig"]
3. Read the values inside an activity (never in workflow code):
@activity.defn
async def charge_customer(amount_cents: int) -> str:
client = PegasusClient(
base_url=os.environ["PEGASUS_BASE_URL"],
token=os.environ["PEGASUS_WORKFLOW_TOKEN"],
)
api_key = client.get_secret("STRIPE_API_KEY") # needs ReadWorkflowSecret
region = client.get_config("DEFAULT_REGION") # needs ReadWorkflowConfig
...
get_secret / get_config raise PegasusApiError (404) if the key is unset and
(403) if the matching read action is absent from required_actions.
Visualizing workflows
A workflow is published as opaque Python, so the Pegasus tenant UI can't infer
what it does. Instead, each workflow ships a Mermaid diagram at
<source_dir>/workflow.mmd that business users view to confirm the workflow
matches their business rules. The UI pairs it with a verified envelope drawn
from data the platform actually stores and trusts — the workflow's triggers, its
declared required_actions, and the secret/config keys it touches — so the
diagram (author-declared) sits next to the permission boundary (platform-guaranteed).
A diagram is required to publish. Generate or refresh it from your code:
pip install 'pegasus-workflows-sdk[diagram]' # one-time: the Anthropic SDK
export ANTHROPIC_API_KEY=sk-ant-...
pegasus-workflows diagram # writes <source_dir>/workflow.mmd
pegasus-workflows diagram --model claude-sonnet-4-6 --force # cheaper model, regenerate
The command sends your workflow's Python source to the Anthropic API and writes a
flowchart TD. The file is the source of truth — edit it freely afterward;
diagram only regenerates with --force. It is packaged into the bundle, so it
is SHA-pinned to the exact published version.
Inspecting executions
Read execution status, results, and the Temporal event-history timeline from the terminal (the same tenant-scoped data the web UI shows):
pegasus-workflows executions list <workflow-id> --token=vnd_…
pegasus-workflows executions show <workflow-id> <execution-id> --token=vnd_…
show prints the run's input/result/error plus a flattened timeline
(WorkflowExecutionStarted → per-activity events → the terminal event). Cancel
and retry are available in the tenant web UI. The same data is available
programmatically via client.list_executions, client.get_execution, and
client.get_execution_history.
⚠️ Keep PII out of workflow inputs and results. Temporal stores execution payloads (input, result, and the full event history) and renders them in its UI, and platform engineers can read them cross-tenant in the Temporal Cloud console. Pass entity ids, not raw personal data — look the details up inside an activity via the API. (A payload codec would let us encrypt payloads; it's deferred until this convention can't hold.)
The manifest — pegasus-workflows.toml
Every project has a pegasus-workflows.toml at its root. Each [[workflow]]
table is packaged into its own artifact and uploaded as a distinct
(name, version) row:
[[workflow]]
name = "demo" # ^[a-z0-9][a-z0-9_-]{0,63}$
version = "0.1.0" # semver
entry_points = ["demo.workflow:HelloWorkflow"] # non-empty
source_dir = "demo" # optional, defaults to name
description = "..." # optional
These rules mirror the server's ManifestSchema exactly, so package/push
fail fast locally before any HTTP call.
CLI
| Command | What it does |
|---|---|
pegasus-workflows init <name> |
Scaffold a new workflow project. |
pegasus-workflows diagram [-C <dir>] [--model …] [--force] |
AI-generate workflow.mmd from source ([diagram] extra). |
pegasus-workflows package |
Zip each declared workflow into dist/<name>-<version>.zip. |
pegasus-workflows push --token=<vnd_…> [--base-url=…] |
Package, then upload-url → S3 PUT → finalize. |
pegasus-workflows test <workflow> |
Start local Temporal and run the workflow with a stub input. |
pegasus-workflows executions list <wf-id> --token=<vnd_…> |
List recent executions of a workflow (newest first). |
pegasus-workflows executions show <wf-id> <exec-id> --token=<vnd_…> |
Show one execution's input/result/error + history timeline. |
pegasus-workflows integration-config validate <id> [-C <dir>] |
Dry-run the publish gate for a config (no write). |
pegasus-workflows integration-config publish <id> [-C <dir>] |
Gate then publish a new config version. |
pegasus-workflows integration-config pull <id> [-C <dir>] [--stdout] |
Fetch the active config; write the editable surface to disk. |
pegasus-workflows integration-config versions <id> |
List the config version history (newest first). |
pegasus-workflows integration-config rollback <id> <version> |
Re-publish a prior version (re-runs the gate). |
pegasus-workflows secrets set <key> <value> [-d <desc>] |
Publish a secret (write-once, encrypted at rest). |
pegasus-workflows secrets list / secrets delete <key> |
List secret keys (no values) / delete a secret. |
pegasus-workflows config set <key> <value> [-d <desc>] |
Publish a config value (idempotent upsert). |
pegasus-workflows config list / config delete <key> |
List config key/values / delete a config entry. |
push reads the token from --token or the PEGASUS_WORKFLOW_TOKEN
environment variable. The token is a vnd_* Pegasus API key whose service
account holds the workflow_developer role.
Authoring an integration-validator config
The integration-config group manages an integration's declarative mapping +
rules (the DB-backed authoring surface; see
apps/api/src/handlers/integration-validation/config.ts). The editable surface
lives as three JSON files in a working directory (-C, default .):
mapping.json, rules.json, corpus.json. The round-trip is pull → edit →
validate → publish:
pegasus-workflows integration-config pull weichert -C ./weichert
# …edit mapping.json / rules.json…
pegasus-workflows integration-config validate weichert -C ./weichert
pegasus-workflows integration-config publish weichert -C ./weichert
publish/rollback require the token's tenant to be the platform tenant to
write GLOBAL (visibility is derived server-side) and to carry the
PublishIntegrationConfig action; they are gated by the server's
INTEGRATION_CONFIG_PUBLISH_ENABLED switch. validate and pull are
read-level and never gated.
Local Temporal
pegasus-workflows test needs a Temporal server. The repo root ships
docker-compose.temporal.yml (Temporal server + Temporal UI on 7233 / 8080)
purely as a local-dev aid — no production connection. test runs
docker compose -f docker-compose.temporal.yml up -d automatically if Temporal
is not already reachable on 127.0.0.1:7233. To start it by hand:
docker compose -f docker-compose.temporal.yml up -d
The Temporal Web UI is then at http://localhost:8080.
Using the SDK with an AI coding agent
The SDK ships a built-in MCP server that gives any MCP-compatible AI coding agent (Claude Code, Cursor, Windsurf, …) structured access to SDK rules and safe tooling — without the agent having to read or guess from source files.
Install the extra
pip install 'pegasus-workflows-sdk[mcp]'
Configure your agent
Claude Code (~/.claude/settings.json or project .claude/settings.json)
{
"mcpServers": {
"pegasus-workflows": {
"command": "pegasus-workflows",
"args": ["mcp"]
}
}
}
Cursor / Windsurf (.cursor/mcp.json or .windsurf/mcp.json)
{
"mcpServers": {
"pegasus-workflows": {
"command": "pegasus-workflows",
"args": ["mcp"]
}
}
}
Once configured, the agent can call the resources and tools below without any additional setup.
Resources (read-only context)
| URI | Description |
|---|---|
pegasus://guide/authoring |
Authoring guide: import surface, determinism rule, input contract pointer. |
pegasus://guide/input-contract |
The three run() input shapes (trigger-fired, manual run, CLI test) + a worked resolver example. |
pegasus://guide/secrets-config |
How to publish and use per-tenant workflow secrets & configuration (manifest actions, CLI/SDK publish, runtime read). |
pegasus://reference/manifest |
Manifest fields and constraints generated from manifest.py constants — stays in sync automatically. |
pegasus://reference/api |
PegasusClient method signatures and docstrings generated by introspection — stays in sync automatically. |
Tools (safe actions — no network writes)
| Tool | Description |
|---|---|
scaffold_workflow(name, dest) |
Scaffold a new workflow project at dest/name. Wraps pegasus-workflows init. |
validate_manifest(path_or_toml) |
Validate a manifest file path or raw TOML text. Returns structured errors or the parsed manifest. |
package_project(project_dir) |
Package declared workflows into dist/. Returns {name, version, zip_path, size_bytes} per workflow. |
validate_integration_config(integration_id, mapping, rules, corpus, base_url, token) |
Dry-run the integration config publish gate. No state change. |
Network-mutating operations (push, publish_integration_config, run) are
intentionally not exposed — keep those human-gated via the CLI.
Smoke test (verify the server starts)
pegasus-workflows mcp --help # should print the mcp command help
Without the extra installed, the command exits non-zero with an install hint.
Release
The SDK is published to PyPI by .github/workflows/release-sdk-python.yml on
sdk-python-v* tags via PyPI trusted publishing (OIDC — no API token).
To cut a release:
- Bump
versioninpyproject.tomland commit it onmain. - Tag the release commit and push the tag, e.g.
git tag sdk-python-v0.1.0 && git push origin sdk-python-v0.1.0.
The workflow then lints, audits, tests, builds, and uploads the sdist + wheel.
One-time setup (before the first release): a PyPI project owner must add a
pending publisher at pegasus-workflows-sdk → Publishing → owner DolasDev,
repo pegasus, workflow release-sdk-python.yml, environment pypi. Until
that exists the publish job fails at the upload step, and tenants must use the
git install above.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pegasus_workflows_sdk-0.4.0.tar.gz.
File metadata
- Download URL: pegasus_workflows_sdk-0.4.0.tar.gz
- Upload date:
- Size: 38.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
788055dad25d090e852fe34d4d52be91f4f3a4e8322c308459cb2430e4e0eefe
|
|
| MD5 |
16d040a6fed11c6bf82de31cc9556ec9
|
|
| BLAKE2b-256 |
25c875a7baf62fc1aa545966e89ef44cc12dea56e6825d57bdac15fb35d0390a
|
Provenance
The following attestation bundles were made for pegasus_workflows_sdk-0.4.0.tar.gz:
Publisher:
release-sdk-python.yml on DolasDev/pegasus
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pegasus_workflows_sdk-0.4.0.tar.gz -
Subject digest:
788055dad25d090e852fe34d4d52be91f4f3a4e8322c308459cb2430e4e0eefe - Sigstore transparency entry: 1998178899
- Sigstore integration time:
-
Permalink:
DolasDev/pegasus@fc6578d3e9cde9eec67350410865de4b9d5b970a -
Branch / Tag:
refs/tags/sdk-python-v0.4.0 - Owner: https://github.com/DolasDev
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release-sdk-python.yml@fc6578d3e9cde9eec67350410865de4b9d5b970a -
Trigger Event:
push
-
Statement type:
File details
Details for the file pegasus_workflows_sdk-0.4.0-py3-none-any.whl.
File metadata
- Download URL: pegasus_workflows_sdk-0.4.0-py3-none-any.whl
- Upload date:
- Size: 50.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7a63b378354d4149e1e310751727746d3743ba113b81eaaf93aba3028576d77d
|
|
| MD5 |
8bc2ebafb69df5a86fba4157e062c2d7
|
|
| BLAKE2b-256 |
fddd143596b87e0d3410086205dd589bfaecdeab52b25a5dfa82fcfd4c07cef4
|
Provenance
The following attestation bundles were made for pegasus_workflows_sdk-0.4.0-py3-none-any.whl:
Publisher:
release-sdk-python.yml on DolasDev/pegasus
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pegasus_workflows_sdk-0.4.0-py3-none-any.whl -
Subject digest:
7a63b378354d4149e1e310751727746d3743ba113b81eaaf93aba3028576d77d - Sigstore transparency entry: 1998179207
- Sigstore integration time:
-
Permalink:
DolasDev/pegasus@fc6578d3e9cde9eec67350410865de4b9d5b970a -
Branch / Tag:
refs/tags/sdk-python-v0.4.0 - Owner: https://github.com/DolasDev
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release-sdk-python.yml@fc6578d3e9cde9eec67350410865de4b9d5b970a -
Trigger Event:
push
-
Statement type: