Skip to main content

A declarative Python framework for orchestrating service logic with rhythm and precision

Project description

Cadence

PyPI version Python Versions License: MIT Tests codecov

A declarative Python framework for building service and AI orchestration logic with explicit control flow.

Cadence lets you build complex orchestration — from checkout flows to LLM pipelines — with a clean, readable API. Define your business logic as composable notes, handle errors gracefully, and scale with confidence.

Why Cadence?

Challenge How Cadence Helps
Service orchestration is tangled in imperative code Declarative .then() / .sync() / .split() chain makes the flow visible
LLM APIs are unreliable Built-in @retry, @timeout, @fallback, @circuit_breaker
Parallel tool calling is error-prone .sync() with automatic score isolation and merging
Intent routing needs ugly if/else trees .split(condition, if_true, if_false) keeps it clean
LLMs hallucinate framework code Constrained 4-method DSL is easy for models to generate correctly
Workflows crash mid-execution .with_checkpoint() resumes from last successful step
Frameworks lock you into their ecosystem Zero dependencies — bring any LLM client, any HTTP library

Features

  • Declarative Cadence Definition - Build complex workflows with a fluent, chainable API
  • Parallel Execution - Run tasks concurrently with automatic context isolation and merging
  • Branching Logic - Conditional execution paths with clean syntax
  • Resilience Patterns - Built-in retry, timeout, fallback, and circuit breaker
  • Framework Integration - First-class support for FastAPI and Flask
  • Observability - Hooks for logging, metrics, and tracing
  • Event-Driven Architecture - Structured domain events with listener registration and async support
  • Workflow Checkpointing - Crash recovery with pluggable checkpoint stores
  • Type Safety - Full type hints and generics support
  • Zero Dependencies - Core library has no required dependencies
  • LLM Pipeline Ready - Natural fit for RAG, tool calling, multi-agent, and model fallback chains
  • LLM-Friendly DSL - Constrained grammar that LLMs can generate correctly with minimal hallucination

Installation

pip install cadence-orchestration

With optional integrations:

# FastAPI integration
pip install cadence-orchestration[fastapi]

# Flask integration
pip install cadence-orchestration[flask]

# OpenTelemetry tracing
pip install cadence-orchestration[opentelemetry]

# Prometheus metrics
pip install cadence-orchestration[prometheus]

# All integrations
pip install cadence-orchestration[all]

Quick Start

from dataclasses import dataclass
from cadence import Cadence, Score, note

@dataclass
class OrderScore(Score):
    order_id: str
    items: list = None
    total: float = 0.0
    status: str = "pending"

@note
async def fetch_items(score: OrderScore):
    # Fetch order items from database
    score.items = await db.get_items(score.order_id)

@note
async def calculate_total(score: OrderScore):
    score.total = sum(item.price for item in score.items)

@note
async def process_payment(score: OrderScore):
    await payment_service.charge(score.order_id, score.total)
    score.status = "paid"

# Build and run the cadence
cadence = (
    Cadence("checkout", OrderScore(order_id="ORD-123"))
    .then("fetch_items", fetch_items)
    .then("calculate_total", calculate_total)
    .then("process_payment", process_payment)
)

result = await cadence.run()
print(f"Order {result.order_id}: {result.status}")

LLM Pipeline Example

The same primitives work for AI pipelines — parallel retrieval, intent routing, resilience on model calls:

from dataclasses import dataclass, field
from cadence import Cadence, Score, note, retry, timeout, fallback

@dataclass
class RAGScore(Score):
    query: str
    context: list = field(default_factory=list)
    answer: str = ""

@note(timeout=5.0)
async def retrieve(score: RAGScore):
    score.context = await vector_db.search(score.query)

@note(fallback={"default": [], "field": "context"}, timeout=10.0)
async def web_search(score: RAGScore):
    score.context += await search_api.query(score.query)

@note(retry={"max_attempts": 3, "backoff": "exponential"}, timeout=30.0)
async def generate(score: RAGScore):
    score.answer = await llm.generate(score.query, context=score.context)

rag = (
    Cadence("rag", RAGScore(query="What is Cadence?"))
    .sync("retrieve", [retrieve, web_search])   # parallel retrieval
    .then("generate", generate)                  # LLM call with retry + timeout
)

This is identical in shape to the service orchestration example above — same API, same resilience patterns, different domain.

Core Concepts

Sequential Notes

Execute notes one after another:

cadence = (
    Cadence("process", MyScore())
    .then("note1", do_first)
    .then("note2", do_second)
    .then("note3", do_third)
)

Parallel Execution

Run independent tasks concurrently with automatic score isolation:

cadence = (
    Cadence("enrich", UserScore(user_id="123"))
    .sync("fetch_data", [
        fetch_profile,
        fetch_preferences,
        fetch_history,
    ])
    .then("merge_results", combine_data)
)

Conditional Branching

Route execution based on runtime conditions:

cadence = (
    Cadence("order", OrderScore())
    .then("validate", validate_order)
    .split("route",
        condition=is_premium_customer,
        if_true=[priority_processing, express_shipping],
        if_false=[standard_processing, regular_shipping]
    )
    .then("confirm", send_confirmation)
)

Child Cadences

Compose cadences for complex orchestration:

payment_cadence = Cadence("payment", PaymentScore())...
shipping_cadence = Cadence("shipping", ShippingScore())...

checkout_cadence = (
    Cadence("checkout", CheckoutScore())
    .then("prepare", prepare_order)
    .child("process_payment", payment_cadence, merge_payment)
    .child("arrange_shipping", shipping_cadence, merge_shipping)
    .then("complete", finalize_order)
)

Resilience Patterns

Retry with Backoff

from cadence import retry

@note(retry={"max_attempts": 3, "delay": 1.0, "backoff": "exponential"})
async def call_external_api(score):
    response = await http_client.get(score.api_url)
    score.data = response.json()

Timeout

from cadence import timeout

@note(timeout=5.0)
async def slow_operation(score):
    score.result = await long_running_task()

Note: On Windows, the @timeout decorator only works with async functions. Sync function timeouts require Unix signals (SIGALRM) which are not available on Windows.

Fallback

from cadence import fallback

@note(fallback={"default": "unknown", "field": "status"})
async def get_status(score):
    score.status = await status_service.get(score.id)

Circuit Breaker

from cadence import circuit_breaker

@circuit_breaker(failure_threshold=5, recovery_timeout=30.0)
@note
async def call_fragile_service(score):
    score.data = await fragile_service.fetch()

Combined Resilience

Combine retry, timeout, and fallback in a single decorator:

from cadence import note

# Shorthand — covers the common case
@note(retry=3, timeout=15.0)
async def call_service(score):
    score.result = await external_api.call()

# Full control via dicts
@note(
    retry={"max_attempts": 3, "backoff": "exponential", "delay": 0.5},
    timeout=30.0,
    fallback={"default": None, "field": "result"},
)
async def call_service(score):
    score.result = await external_api.call()

Standalone decorators (@retry, @timeout, @fallback) still work for stacking when you need maximum flexibility.

Framework Integration

FastAPI

from fastapi import FastAPI
from cadence.integrations.fastapi import CadenceRouter

app = FastAPI()
router = CadenceRouter()

@router.cadence("/orders/{order_id}", checkout_cadence)
async def create_order(order_id: str):
    return OrderScore(order_id=order_id)

app.include_router(router)

Flask

from flask import Flask
from cadence.integrations.flask import CadenceBlueprint

app = Flask(__name__)
bp = CadenceBlueprint("orders", __name__)

@bp.cadence_route("/orders/<order_id>", checkout_cadence)
def create_order(order_id):
    return OrderScore(order_id=order_id)

app.register_blueprint(bp)

Observability

Hooks System

from cadence import Cadence, LoggingHooks, TimingHooks

cadence = (
    Cadence("monitored", MyScore())
    .with_hooks([LoggingHooks(), TimingHooks()])
    .then("note1", do_work)
)

Custom Hooks

from cadence import CadenceHooks

class MyHooks(CadenceHooks):
    async def before_note(self, note_name, score):
        print(f"Starting: {note_name}")

    async def after_note(self, note_name, score, duration, error=None):
        print(f"Completed: {note_name} in {duration:.2f}s")

    async def on_error(self, note_name, score, error):
        alert_team(f"Error in {note_name}: {error}")

Prometheus Metrics

from cadence.reporters import PrometheusReporter

reporter = PrometheusReporter(prefix="myapp")

cadence = (
    Cadence("tracked", MyScore())
    .with_reporter(reporter.report)
    .then("note1", do_work)
)

OpenTelemetry Tracing

from cadence.reporters import OpenTelemetryReporter

reporter = OpenTelemetryReporter(service_name="my-service")

cadence = (
    Cadence("traced", MyScore())
    .with_reporter(reporter.report)
    .then("note1", do_work)
)

Event System

React to cadence and note lifecycle events with structured listeners:

from cadence.events import EventEmitter, NOTE_COMPLETED, CADENCE_FAILED

emitter = EventEmitter()
emitter.on(NOTE_COMPLETED, lambda e: print(f"{e.note_name} done in {e.duration:.2f}s"))
emitter.on(CADENCE_FAILED, lambda e: alert(f"{e.cadence_name} failed: {e.error}"))
emitter.on("*", lambda e: audit_log.write(e))  # wildcard for all events

cadence = (
    Cadence("checkout", OrderScore(order_id="ORD-123"))
    .with_hooks(emitter)
    .then("validate", validate_order)
    .then("charge", process_payment)
)
await cadence.run()

Event types: NOTE_STARTED, NOTE_COMPLETED, NOTE_FAILED, CADENCE_STARTED, CADENCE_COMPLETED, CADENCE_FAILED.

Checkpointing

Resume workflows from the last successful step after a crash:

from cadence import Cadence
from cadence.checkpoint import InMemoryCheckpointStore

store = InMemoryCheckpointStore()

cadence = (
    Cadence("disburse", PaymentScore(order_id="ORD-123"))
    .with_checkpoint(store, run_id="order-123")
    .then("validate", validate)
    .then("charge", charge)         # if it crashes here...
    .then("disburse", disburse)
)
await cadence.run()  # ...re-run skips validate, resumes from charge

Implement the CheckpointStore protocol with Redis, a database, or any durable backend for production use.

Cadence Diagrams

Generate visual diagrams of your cadences:

from cadence import to_mermaid, to_dot

# Generate Mermaid diagram
print(to_mermaid(my_cadence))

# Generate DOT/Graphviz diagram
print(to_dot(my_cadence))

CLI

Cadence includes a CLI for scaffolding and utilities:

# Initialize a new project
cadence init my-project

# Generate a new cadence
cadence new cadence checkout

# Generate a new note with resilience decorators
cadence new note process-payment --retry 3 --timeout 30

# Generate cadence diagram
cadence diagram myapp.cadences:checkout_cadence --format mermaid

# Validate cadence definitions
cadence validate myapp.cadences

Score Management

Immutable Score

For functional-style cadences:

from cadence import ImmutableScore

@dataclass(frozen=True)
class Config(ImmutableScore):
    api_key: str
    timeout: int = 30

# Create new score with changes
new_config = config.with_field("timeout", 60)

Atomic Operations

Thread-safe score updates for parallel execution:

from cadence import Score, AtomicList, AtomicDict

@dataclass
class AggregatorScore(Score):
    results: AtomicList = None
    cache: AtomicDict = None

    def __post_init__(self):
        super().__post_init__()
        self.results = AtomicList()
        self.cache = AtomicDict()

# Safe concurrent updates
score.results.append(new_result)
score.cache["key"] = value

Error Handling

from cadence import CadenceError, NoteError

cadence = (
    Cadence("handled", MyScore())
    .then("risky", risky_operation)
    .on_error(handle_error, stop=False)  # Continue on error
    .then("cleanup", cleanup)
)

async def handle_error(score, error):
    if isinstance(error, NoteError):
        logger.error(f"Note {error.note_name} failed: {error}")
        score.errors.append(str(error))

Parallel Error Context

Identify which task failed in a .sync() group:

from cadence import ParallelNoteError

try:
    await cadence.run()
except ParallelNoteError as e:
    print(f"Task '{e.note_name}' (index {e.task_index}) "
          f"in group '{e.group_name}' failed: {e.original_error}")

Documentation

Contributing

We welcome contributions! Please see our Contributing Guide for details.

License

Cadence is released under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cadence_orchestration-0.6.0.tar.gz (199.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cadence_orchestration-0.6.0-py3-none-any.whl (59.9 kB view details)

Uploaded Python 3

File details

Details for the file cadence_orchestration-0.6.0.tar.gz.

File metadata

  • Download URL: cadence_orchestration-0.6.0.tar.gz
  • Upload date:
  • Size: 199.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for cadence_orchestration-0.6.0.tar.gz
Algorithm Hash digest
SHA256 cf233b373a561fc4c19de433f0a8f889f01b4529e3cfcefc2254bfe84ae00d8a
MD5 58c7a46ca49dfe00043da3bcd9b97673
BLAKE2b-256 9ccec35b795149be78cf84d776ad24a5b344cba0879753ea23da461eeea98823

See more details on using hashes here.

Provenance

The following attestation bundles were made for cadence_orchestration-0.6.0.tar.gz:

Publisher: publish.yml on mauhpr/cadence

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file cadence_orchestration-0.6.0-py3-none-any.whl.

File metadata

File hashes

Hashes for cadence_orchestration-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 97031ddcddfc34cf533b8f267ca9f18fbb3cc341637687e5205983c6148facc7
MD5 30f7e4d4c35ba73a913c31ead8f41cdf
BLAKE2b-256 4142ba0067f816a8db66c307224f5ffa6ad806e89791cd2df9109b4ab22cced2

See more details on using hashes here.

Provenance

The following attestation bundles were made for cadence_orchestration-0.6.0-py3-none-any.whl:

Publisher: publish.yml on mauhpr/cadence

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page