Skip to main content

Python SDK and CLI for authoring, packaging, and publishing Pegasus workflows.

Project description

Pegasus Workflows SDK

pegasus-workflows-sdk is the Python SDK and CLI for authoring, packaging, and publishing Pegasus workflows — Temporal workflows that automate cross-domain operations (move lifecycle, billing follow-ups, dispatch decisions) against the Pegasus public API.

Phase 1 ships the developer flow: write a workflow locally, run it against a Dockerized Temporal, package it, and upload it. There is no server-side execution yet — the API stores the artifact and lists it.

Install

pip install pegasus-workflows-sdk

This installs the pegasus-workflows CLI. Python 3.11+ is required. Pin the version in your project's requirements for reproducible builds, e.g. pegasus-workflows-sdk==0.1.0.

Interim / unreleased install (git)

The repository is public, so you can install straight from a tagged commit without waiting for a PyPI release — useful for an unreleased fix, or before the first PyPI publish lands:

pip install "pegasus-workflows-sdk @ git+https://github.com/DolasDev/pegasus@sdk-python-v0.1.0#subdirectory=packages/workflows-sdk-python"

Swap the @sdk-python-v0.1.0 tag for @main to track the latest unreleased SDK. This clones the whole monorepo to build one subdirectory, so prefer the PyPI install for everyday use.

Quick start

pegasus-workflows init demo
cd demo
pegasus-workflows test demo
pegasus-workflows package
pegasus-workflows push --token=vnd_... --base-url=http://localhost:3000

Authoring

Import the Temporal authoring primitives from pegasus_workflows and mark your workflow class with @pegasus_workflow:

from datetime import timedelta
from pegasus_workflows import activity, pegasus_workflow, workflow

@activity.defn
async def greet(name: str) -> str:
    return f"Hello, {name}!"

@pegasus_workflow(name="demo", version="0.1.0")
class HelloWorkflow:
    @workflow.run
    async def run(self, name: str = "world") -> str:
        return await workflow.execute_activity(
            greet, name, start_to_close_timeout=timedelta(seconds=10)
        )

@pegasus_workflow wraps temporalio.workflow.defn and records the (name, version) used by the manifest.

Input contract: how run() receives its argument

Your run() method receives a single positional argument whose shape depends on how the workflow was started:

1. Trigger-fired (domain-event trigger) — the dispatcher passes the full event envelope:

{
    "domainEventId": "<uuid>",
    "eventType": "quote.accepted",      # the event type that fired the trigger
    "occurredAt": "<ISO-8601>",
    "payload": {"quoteId": "<id>", "moveId": "<id>"}   # entity ids, camelCase
}

Read entity ids from arg["payload"]["quoteId"] etc. The payload is a pointer, not a full snapshot — always re-fetch authoritative state from the Pegasus API using those ids rather than relying on the payload alone.

2. Manual runPOST /api/v1/workflows/:id/run passes:

{"executionId": "<uuid>", "input": <user-supplied dict>}

Read your business data from arg["input"] (e.g. arg["input"]["quote_id"]).

3. CLI testpegasus-workflows test <name> passes a raw string for local-dev parity.

Your run() should handle all three shapes. A module-level helper (not a method) is the recommended pattern — it stays unit-testable without a Temporal worker context:

def _resolve_quote_id(payload: dict | str) -> str:
    if isinstance(payload, str):
        return payload
    event_payload = payload.get("payload") if isinstance(payload, dict) else None
    if isinstance(event_payload, dict) and event_payload.get("quoteId"):
        return str(event_payload["quoteId"])
    inner = payload.get("input") if isinstance(payload, dict) else None
    if isinstance(inner, dict) and inner.get("quote_id"):
        return str(inner["quote_id"])
    return "quote-unknown"

Sending an SMS

Inside a workflow activity, call client.send_sms to send an outbound text message via the tenant's configured SMS provider. The platform holds the provider credentials — no credential needs to appear in the workflow source or manifest.

from pegasus_workflows import activity
from pegasus_workflows.api import PegasusClient
import os

@activity.defn
async def send_alert_sms(to: str, message: str) -> dict:
    client = PegasusClient(
        base_url=os.environ["PEGASUS_BASE_URL"],
        token=os.environ["PEGASUS_WORKFLOW_TOKEN"],
    )
    return client.send_sms(to=to, body=message)

Declare the capability in pegasus-workflows.toml:

[[workflow]]
name = "order-saved-notify"
version = "0.1.0"
entry_points = ["order_saved.workflow:OrderSavedWorkflow"]
required_actions = ["SendSms"]

send_sms raises PegasusApiError (403) if SendSms is absent from required_actions, or (404) if the tenant has no SMS provider connected. The to number must be E.164 (e.g. "+16308868537").

The manifest — pegasus-workflows.toml

Every project has a pegasus-workflows.toml at its root. Each [[workflow]] table is packaged into its own artifact and uploaded as a distinct (name, version) row:

[[workflow]]
name = "demo"                                   # ^[a-z0-9][a-z0-9_-]{0,63}$
version = "0.1.0"                               # semver
entry_points = ["demo.workflow:HelloWorkflow"]  # non-empty
source_dir = "demo"                             # optional, defaults to name
description = "..."                             # optional

These rules mirror the server's ManifestSchema exactly, so package/push fail fast locally before any HTTP call.

CLI

Command What it does
pegasus-workflows init <name> Scaffold a new workflow project.
pegasus-workflows package Zip each declared workflow into dist/<name>-<version>.zip.
pegasus-workflows push --token=<vnd_…> [--base-url=…] Package, then upload-url → S3 PUT → finalize.
pegasus-workflows test <workflow> Start local Temporal and run the workflow with a stub input.
pegasus-workflows integration-config validate <id> [-C <dir>] Dry-run the publish gate for a config (no write).
pegasus-workflows integration-config publish <id> [-C <dir>] Gate then publish a new config version.
pegasus-workflows integration-config pull <id> [-C <dir>] [--stdout] Fetch the active config; write the editable surface to disk.
pegasus-workflows integration-config versions <id> List the config version history (newest first).
pegasus-workflows integration-config rollback <id> <version> Re-publish a prior version (re-runs the gate).

push reads the token from --token or the PEGASUS_WORKFLOW_TOKEN environment variable. The token is a vnd_* Pegasus API key whose service account holds the workflow_developer role.

Authoring an integration-validator config

The integration-config group manages an integration's declarative mapping + rules (the DB-backed authoring surface; see apps/api/src/handlers/integration-validation/config.ts). The editable surface lives as three JSON files in a working directory (-C, default .): mapping.json, rules.json, corpus.json. The round-trip is pull → edit → validate → publish:

pegasus-workflows integration-config pull weichert -C ./weichert
# …edit mapping.json / rules.json…
pegasus-workflows integration-config validate weichert -C ./weichert
pegasus-workflows integration-config publish weichert -C ./weichert

publish/rollback require the token's tenant to be the platform tenant to write GLOBAL (visibility is derived server-side) and to carry the PublishIntegrationConfig action; they are gated by the server's INTEGRATION_CONFIG_PUBLISH_ENABLED switch. validate and pull are read-level and never gated.

Local Temporal

pegasus-workflows test needs a Temporal server. The repo root ships docker-compose.temporal.yml (Temporal server + Temporal UI on 7233 / 8080) purely as a local-dev aid — no production connection. test runs docker compose -f docker-compose.temporal.yml up -d automatically if Temporal is not already reachable on 127.0.0.1:7233. To start it by hand:

docker compose -f docker-compose.temporal.yml up -d

The Temporal Web UI is then at http://localhost:8080.

Using the SDK with an AI coding agent

The SDK ships a built-in MCP server that gives any MCP-compatible AI coding agent (Claude Code, Cursor, Windsurf, …) structured access to SDK rules and safe tooling — without the agent having to read or guess from source files.

Install the extra

pip install 'pegasus-workflows-sdk[mcp]'

Configure your agent

Claude Code (~/.claude/settings.json or project .claude/settings.json)

{
  "mcpServers": {
    "pegasus-workflows": {
      "command": "pegasus-workflows",
      "args": ["mcp"]
    }
  }
}

Cursor / Windsurf (.cursor/mcp.json or .windsurf/mcp.json)

{
  "mcpServers": {
    "pegasus-workflows": {
      "command": "pegasus-workflows",
      "args": ["mcp"]
    }
  }
}

Once configured, the agent can call the resources and tools below without any additional setup.

Resources (read-only context)

URI Description
pegasus://guide/authoring Authoring guide: import surface, determinism rule, input contract pointer.
pegasus://guide/input-contract The three run() input shapes (trigger-fired, manual run, CLI test) + a worked resolver example.
pegasus://reference/manifest Manifest fields and constraints generated from manifest.py constants — stays in sync automatically.
pegasus://reference/api PegasusClient method signatures and docstrings generated by introspection — stays in sync automatically.

Tools (safe actions — no network writes)

Tool Description
scaffold_workflow(name, dest) Scaffold a new workflow project at dest/name. Wraps pegasus-workflows init.
validate_manifest(path_or_toml) Validate a manifest file path or raw TOML text. Returns structured errors or the parsed manifest.
package_project(project_dir) Package declared workflows into dist/. Returns {name, version, zip_path, size_bytes} per workflow.
validate_integration_config(integration_id, mapping, rules, corpus, base_url, token) Dry-run the integration config publish gate. No state change.

Network-mutating operations (push, publish_integration_config, run) are intentionally not exposed — keep those human-gated via the CLI.

Smoke test (verify the server starts)

pegasus-workflows mcp --help   # should print the mcp command help

Without the extra installed, the command exits non-zero with an install hint.

Release

The SDK is published to PyPI by .github/workflows/release-sdk-python.yml on sdk-python-v* tags via PyPI trusted publishing (OIDC — no API token).

To cut a release:

  1. Bump version in pyproject.toml and commit it on main.
  2. Tag the release commit and push the tag, e.g. git tag sdk-python-v0.1.0 && git push origin sdk-python-v0.1.0.

The workflow then lints, audits, tests, builds, and uploads the sdist + wheel.

One-time setup (before the first release): a PyPI project owner must add a pending publisher at pegasus-workflows-sdk → Publishing → owner DolasDev, repo pegasus, workflow release-sdk-python.yml, environment pypi. Until that exists the publish job fails at the upload step, and tenants must use the git install above.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pegasus_workflows_sdk-0.2.0.tar.gz (28.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pegasus_workflows_sdk-0.2.0-py3-none-any.whl (38.1 kB view details)

Uploaded Python 3

File details

Details for the file pegasus_workflows_sdk-0.2.0.tar.gz.

File metadata

  • Download URL: pegasus_workflows_sdk-0.2.0.tar.gz
  • Upload date:
  • Size: 28.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pegasus_workflows_sdk-0.2.0.tar.gz
Algorithm Hash digest
SHA256 7f8b228b7f09cbdde7d84747036dffc5b8efcb51e6c5a73782aaa9da56300046
MD5 880dc76f742a56522de580481fd4fc5d
BLAKE2b-256 48b9f01674e641f8824eabf804555ddf1eb7c2d91dda8f5fb97e045a0d6fbc12

See more details on using hashes here.

Provenance

The following attestation bundles were made for pegasus_workflows_sdk-0.2.0.tar.gz:

Publisher: release-sdk-python.yml on DolasDev/pegasus

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pegasus_workflows_sdk-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for pegasus_workflows_sdk-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 143bd7828924440226a1a626dbf86ffd38302a30cc942a09eed1f16044136fbf
MD5 0414ca23d1a2806cff36a6d2e55a52f2
BLAKE2b-256 359e0834f8afa12b025d5d8f2e542348a9e643fdb9910f8393bacb50da3436d7

See more details on using hashes here.

Provenance

The following attestation bundles were made for pegasus_workflows_sdk-0.2.0-py3-none-any.whl:

Publisher: release-sdk-python.yml on DolasDev/pegasus

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page