Skip to main content

Declarative YAML-based AI workflow engine SDK

Project description

Beddel

PyPI version Python 3.11+ License: MIT Ruff mypy: strict Pydantic v2 Hatch Status: Alpha PyPI downloads

Declarative YAML-based AI workflow engine for Python.

Define outcome-driven AI workflows in YAML — the engine handles adaptive execution with conditional branching, retry strategies, multi-provider LLM abstraction, and compositional primitives. YAML for the backbone, code escape hatches for complex logic.

steps:
  - id: greet
    primitive: llm
    config:
      model: gemini/gemini-2.0-flash
      prompt: "Say hello and share a fun fact about $input.topic"
      temperature: 0.7

Why Beddel

  • Write workflows in YAML, not hundreds of lines of Python
  • 7 built-in primitives cover most AI workflow patterns out of the box
  • Multi-provider LLM support via LiteLLM (100+ providers)
  • Adaptive execution: branching, retry with backoff, fallback, skip, delegate
  • OpenTelemetry tracing with token usage tracking per step
  • Lifecycle hooks for custom logging, metrics, and side effects
  • Expose workflows as HTTP/SSE endpoints with one function call
  • Hexagonal architecture — swap adapters without touching domain logic

Installation

# Core only (parser, resolver, executor — no external adapters)
pip install beddel

# With LLM adapters (LiteLLM, OpenTelemetry, httpx)
pip install beddel[adapters]

# With FastAPI integration (HTTP endpoints + SSE streaming)
pip install beddel[fastapi]

# With CLI (validate, run, serve workflows)
pip install beddel[cli]

# Everything
pip install beddel[all]

Requires Python 3.11+.

Quickstart

Get a workflow running in under 5 minutes.

1. Install

pip install beddel[adapters]

2. Set your API key

Get a free key from Google AI Studio:

export GEMINI_API_KEY="your-key-here"

3. Create a workflow

Save as workflow.yaml:

id: hello-world
name: Hello World
description: A minimal workflow that greets the user with a fun fact.

input_schema:
  type: object
  properties:
    topic:
      type: string
  required:
    - topic

steps:
  - id: greet
    primitive: llm
    config:
      model: gemini/gemini-2.0-flash
      prompt: "Say hello and share one fun fact about $input.topic"
      temperature: 0.7

4. Run it

import asyncio
from pathlib import Path

from beddel.adapters.litellm_adapter import LiteLLMAdapter
from beddel.domain.executor import WorkflowExecutor
from beddel.domain.parser import WorkflowParser
from beddel.domain.registry import PrimitiveRegistry
from beddel.primitives import register_builtins

async def main():
    workflow = WorkflowParser.parse(Path("workflow.yaml").read_text())

    registry = PrimitiveRegistry()
    register_builtins(registry)

    executor = WorkflowExecutor(registry, provider=LiteLLMAdapter())
    result = await executor.execute(workflow, inputs={"topic": "astronomy"})

    print(result["step_results"]["greet"]["content"])

asyncio.run(main())
python run_workflow.py

Model names use the LiteLLM format (provider/model). Avoid experimental (-exp) suffixes — they get retired without notice.

Examples

The examples/ directory contains ready-to-run workflows:

Example Primitives What it demonstrates
research-pipeline.yaml llm, output-generator Sequential multi-step, $stepResult cross-references, retry
email-classifier.yaml llm, output-generator if/then/else branching, retry + skip strategies
chat-with-guardrail.yaml chat, guardrail, output-generator Multi-turn conversation, output validation

Run any example with the CLI:

pip install beddel[all]
export GEMINI_API_KEY="your-key-here"

# Research pipeline
beddel run examples/research-pipeline.yaml -i topic="AI agents" -i depth="brief"

# Email classifier with branching
beddel run examples/email-classifier.yaml -i email_body="How do I configure nginx SSL?"

# Chat with guardrail validation
beddel run examples/chat-with-guardrail.yaml \
  -i question="What are the benefits of microservices?" \
  -i context="enterprise software architecture"

Features

Adaptive Core Engine (Epic 1)

The foundation. Parses YAML workflows, resolves variables, and executes steps with adaptive control flow.

YAML Parser — Secure loading via yaml.safe_load() with Pydantic 2.x validation. Supports workflow metadata, step definitions, variable references, conditional expressions, and execution strategy declarations.

Variable Resolver — Extensible namespace system with three built-in namespaces and a registration mechanism for custom ones:

prompt: "Tell me about $input.topic"           # Runtime inputs
prompt: "Expand on $stepResult.step1.content"   # Previous step outputs
prompt: "Using key $env.API_KEY"                # Environment variables
# Register custom namespaces
resolver.register_namespace("memory", my_memory_handler)

Adaptive Workflow Executor — Sequential execution with step-level conditional branching (if/then/else), configurable execution strategies per step, and step-level timeout support. The executor evaluates conditions and adapts flow — not a pure sequential dispatcher.

Execution Strategies — Five strategies per step, with exponential backoff and jitter for retries:

steps:
  - id: risky-call
    primitive: llm
    config:
      model: gemini/gemini-2.0-flash
      prompt: "Generate content about $input.topic"
    execution_strategy:
      type: retry
      retry:
        max_attempts: 3
        backoff_base: 2.0
Strategy Behavior
fail Stop workflow on error (default)
skip Log error, continue to next step
retry Retry with exponential backoff and jitter
fallback Execute an alternative step on failure
delegate Delegate error recovery to agent judgment

Primitive Registry — Instance-based registration with contract validation:

from beddel.domain.ports import IPrimitive
from beddel.domain.registry import PrimitiveRegistry

registry = PrimitiveRegistry()

class MyPrimitive(IPrimitive):
    async def execute(self, config, context):
        return {"result": "custom logic here"}

registry.register("my-custom-primitive", MyPrimitive())

Or use the @primitive decorator for module-level registration:

from beddel.domain.ports import IPrimitive
from beddel.domain.registry import primitive

@primitive("my-custom-primitive")
class MyPrimitive(IPrimitive):
    async def execute(self, config, context):
        return {"result": "custom logic here"}

LiteLLM Adapter — Multi-provider LLM abstraction supporting OpenRouter, Google Gemini, AWS Bedrock, Anthropic, and all LiteLLM-supported providers. Explicit API key resolution from environment variables for resilience against upstream library changes.

Compositional Primitives (Epic 2)

Seven built-in primitives that compose into complex agent behaviors.

Primitive Description
llm Single-turn LLM invocation with streaming support
chat Multi-turn conversation with message history and context windowing
output-generator Template-based output rendering (JSON, Markdown, text)
guardrail Input/output validation with 4 failure strategies
call-agent Nested workflow invocation with depth tracking
tool External function invocation (sync and async)
agent-exec Unified agent adapter for external agent delegation

chat — Multi-turn conversations with automatic context windowing:

steps:
  - id: conversation
    primitive: chat
    config:
      model: gemini/gemini-2.0-flash
      system: "You are a helpful coding assistant."
      messages:
        - role: user
          content: "What is Python?"
        - role: assistant
          content: "$stepResult.prev.content"
        - role: user
          content: "Tell me more about async/await"
      max_messages: 50
      max_context_tokens: 4000

guardrail — Validate LLM outputs with four failure strategies:

steps:
  - id: validate
    primitive: guardrail
    config:
      data: "$stepResult.generate.content"
      schema:
        fields:
          name: { type: str }
          age: { type: int }
      strategy: correct  # raise | return_errors | correct | delegate
Strategy Behavior LLM Required
raise Hard fail with validation errors No
return_errors Soft fail — returns errors alongside data No
correct JSON repair (parse → strip markdown fences → retry) No
delegate Ask LLM to fix validation errors, retry up to N times Yes

call-agent — Compose workflows by nesting them:

steps:
  - id: delegate
    primitive: call-agent
    config:
      workflow: summarizer-workflow
      inputs:
        text: "$stepResult.extract.content"
      max_depth: 5

tool — Invoke registered functions (sync or async):

steps:
  - id: search
    primitive: tool
    config:
      tool: web_search
      arguments:
        query: "$input.question"
# Register tools before execution
tool_registry = {
    "web_search": my_search_function,
    "calculate": my_calc_function,
}

Observability & Integration (Epic 3)

Production-grade observability and framework integration.

OpenTelemetry Tracing — Opt-in tracing with three nesting levels and token usage tracking:

from beddel.adapters.otel_adapter import OpenTelemetryAdapter

tracer = OpenTelemetryAdapter(service_name="my-app")
executor = WorkflowExecutor(registry, provider=adapter, tracer=tracer)

Spans generated:

  • beddel.workflow — workflow-level span with beddel.workflow_id
  • beddel.step.{step_id} — step-level span with token usage (gen_ai.usage.*)
  • beddel.primitive.{name} — primitive-level span with model and provider attributes

Zero overhead when tracing is disabled (all calls gated behind if tracer is not None).

Lifecycle Hooks — Granular event system for custom logging, metrics, or side effects:

from beddel.domain.ports import ILifecycleHook

class MyHook(ILifecycleHook):
    async def on_workflow_start(self, workflow_id, inputs):
        print(f"Starting {workflow_id}")

    async def on_step_end(self, step_id, primitive, result):
        print(f"Step {step_id} completed")

    async def on_error(self, step_id, error):
        print(f"Error in {step_id}: {error}")

executor = WorkflowExecutor(registry, provider=adapter, hooks=[MyHook()])

Events: on_workflow_start, on_workflow_end, on_step_start, on_step_end, on_error, on_retry. Hook failures are silently caught — a misbehaving hook never breaks workflow execution.

FastAPI Integration — Expose workflows as HTTP/SSE endpoints with one function call:

from fastapi import FastAPI
from beddel.integrations.fastapi import create_beddel_handler

app = FastAPI()
router = create_beddel_handler(workflow)  # auto-creates provider + registry
app.include_router(router)

The handler streams workflow execution via Server-Sent Events (W3C-compliant). Clients receive real-time events: WORKFLOW_START, STEP_START, STEP_END, WORKFLOW_END.

pip install beddel[fastapi]
beddel serve -w workflow.yaml --port 8000

Endpoints:

  • POST /workflows/{id} — Execute workflow (SSE response)
  • GET /health — Health check

CLI

Beddel includes a command-line interface for validating, running, and serving workflows.

pip install beddel[cli]

Validate a workflow

beddel validate workflow.yaml

Output:

OK: hello-world
  name: Hello World
  steps: 1
  primitives: llm

Run a workflow

beddel run workflow.yaml --input topic=astronomy

Machine-readable output:

beddel run workflow.yaml --input topic=astronomy --json-output

List primitives

beddel list-primitives

Start the server

beddel serve -w workflow.yaml --port 8000
beddel serve -w flow1.yaml -w flow2.yaml --port 8000

Version

beddel version

OpenClaw Integration

Beddel works as an OpenClaw agent skill. After installing with pip install beddel[cli], the beddel command is available for any OpenClaw agent to create, validate, and execute AI workflows.

Practical examples

Agent that validates workflows before execution:

An OpenClaw agent can validate YAML files authored by users or other agents, catching schema errors before runtime:

openclaw agent --message "Validate my workflow at ./flows/pipeline.yaml" \
  --agent main

The agent calls beddel validate ./flows/pipeline.yaml and reports any issues.

Agent-driven workflow execution with dynamic inputs:

An OpenClaw agent can run Beddel workflows as part of a larger task, passing context-dependent inputs:

openclaw agent --message "Run the summarizer workflow for the topic 'quantum computing'" \
  --agent main

The agent calls beddel run summarizer.yaml --input topic="quantum computing" and processes the result.

Serving workflows for dashboard integration:

An OpenClaw agent can start the Beddel server to expose workflows as HTTP endpoints, enabling integration with dashboards or other services:

openclaw agent --message "Start the beddel server with all workflows in ./flows/" \
  --agent main

The agent discovers YAML files and runs beddel serve -w flow1.yaml -w flow2.yaml --port 8000.

Multi-agent pipeline with Beddel as the execution engine:

In a multi-agent setup, one agent (e.g., an architect) designs the workflow YAML, another (e.g., a QA agent) validates it, and a third executes it:

Architect agent → writes workflow.yaml
QA agent        → beddel validate workflow.yaml
Executor agent  → beddel run workflow.yaml --input topic=security --json-output

See SKILL.md for the full skill manifest and OpenClaw metadata.

Architecture

Beddel follows Hexagonal Architecture (Ports & Adapters). The domain core never imports from adapters or integrations — all external dependencies flow through port interfaces.

┌─────────────────────────────────────────────┐
│              Integrations                    │
│         FastAPI  ·  SSE Streaming            │
├─────────────────────────────────────────────┤
│               Adapters                       │
│    LiteLLM  ·  OpenTelemetry  ·  Hooks       │
├─────────────────────────────────────────────┤
│            Compositional Primitives          │
│  llm · chat · output · guardrail · tool · …  │
├─────────────────────────────────────────────┤
│              Domain Core                     │
│  Parser · Resolver · Executor · Registry     │
│  Models · Ports (interfaces)                 │
└─────────────────────────────────────────────┘

Development Setup

git clone https://github.com/botanarede/beddel-py.git
cd beddel-py
pip install -e ".[dev]"

Run all quality gates:

# Tests
python -Wd -m pytest

# Lint + format
ruff check .
ruff format .

# Type check
mypy src/

The -Wd flag turns DeprecationWarning into errors, catching deprecated API usage early.

Roadmap

Epics 1–3 (Adaptive Core, Compositional Primitives, Observability & Integration) are complete. Upcoming:

  • Epic 4 — Adaptive Execution Patterns: reflection loops, parallel execution, circuit breaker, goal-oriented execution, MCP-native tool integration
  • Epic 5 — Agent Autonomy & Safety: human-in-the-loop, model tier selection, PII tokenization, state persistence, cost controls

Contributing

Contributions are welcome. Open an issue to discuss before submitting a PR. Guidelines will be documented as the project matures.

Newsletter

Subscribe on Substack

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

beddel-0.1.2.tar.gz (76.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

beddel-0.1.2-py3-none-any.whl (95.7 kB view details)

Uploaded Python 3

File details

Details for the file beddel-0.1.2.tar.gz.

File metadata

  • Download URL: beddel-0.1.2.tar.gz
  • Upload date:
  • Size: 76.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for beddel-0.1.2.tar.gz
Algorithm Hash digest
SHA256 dce82782a518ebb2e4ec1ea595eb4bc1ecd392700e7be46a6a648c44808e00cb
MD5 a6b08ed84a8fad84e1fa5556002db18f
BLAKE2b-256 ae9556cf6831f07f142e4fa352527444b8df1df77f87cb74f43c0bb021fa6d9b

See more details on using hashes here.

File details

Details for the file beddel-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: beddel-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 95.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for beddel-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 9319a00a9e41af97cd4c8800d38e00b979ba14d43f1fa1b4a238f972d9303f18
MD5 9c9f7174d1a5243bddbe9037a73d1564
BLAKE2b-256 1930922a25d929d6bcec1c86e246ad9ae35fe76e000a0740d8a063ec8c7fc016

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page