Skip to main content

Python SDK and CLI for the Pipelines platform, with optional agent-hosting support

Project description

Pipelines SDK

PyPI

Python client, CLI, and agent-hosting toolkit for the Pipelines platform. Anything you can do in the Pipelines UI you can script from Python or the pipelines CLI — and you can host your own agent behind a Pipelines dispatch endpoint with a few lines of code.

Highlights:

  • Sync PipelinesClient + async AsyncPipelinesClient (mirrored surface) covering the full Pipelines REST API.
  • Typed HTTP errors per status code (AuthenticationError, ForbiddenError, NotFoundError, ConflictError, ValidationError, RateLimitError, ServerError).
  • Auto-pagination via iter_* helpers and a generic paginate().
  • Streaming primitives and polling waiters (wait_for_experiment, wait_for_create_tasks, wait_for_seed_from_dataset, wait_for_export_tasks).
  • Agent hosting under the [odyssey] extra (the deprecated [agents] alias still resolves): a one-decorator FastAPI dispatch route, framework adapters (Anthropic, OpenAI Agents, LangChain, Strands), and the pipelines odyssey CLI group.
  • py.typed marker — mypy / pyright respect the SDK's annotations.

Install

pip install pipelines-sdk

# Agent hosting (FastAPI dispatch route, registration helpers, CLI group)
pip install 'pipelines-sdk[odyssey]'

# Agent hosting plus a framework adapter (the adapter extra implies [odyssey])
pip install 'pipelines-sdk[odyssey,anthropic]'

Adapter extras: [anthropic], [openai-agents], [langchain], [langchain-mcp], [strands], [mcp]. Each one pulls in [odyssey] automatically.

Projects scaffolded with pipelines odyssey init include a register.py that loads .env before reading registration variables, so local .env files work the same way as the CLI.

The LangChain adapter drops framework-injected config, callbacks, and run_manager parameters from proxied tool payloads. LangGraph topology extraction folds common helper nodes such as enter_*, *_safe_tools, and *_sensitive_tools into the owning assistant node.

Quickstart — REST client

Authenticate with an API key created in the Pipelines web app. The SDK reads PIPELINES_API_KEY from the environment, or you can pass api_key= directly.

from pipelines import PipelinesClient

client = PipelinesClient(api_key="pk_live_...")  # or rely on PIPELINES_API_KEY

# Auto-paginate every project the key can see
for project in client.iter_projects(page_size=50):
    print(project["id"], project["name"])

# Create tasks on a workflow
client.create_tasks(project_id=22, workflow_id=10, payload={"count": 1})

# Wait for an experiment to finish
status = client.wait_for_experiment(
    project_id=22, workflow_id=10, experiment_id="exp-1",
    poll_interval=2.0, timeout=600.0,
)

Async usage mirrors the sync surface:

import asyncio
from pipelines import AsyncPipelinesClient

async def main():
    async with AsyncPipelinesClient(api_key="pk_live_...") as client:
        me = await client.whoami()
        async for project in client.paginate("/api/projects", page_size=50):
            print(project["id"])

asyncio.run(main())

Typed errors let you branch on the HTTP status without parsing bodies:

from pipelines import (
    AuthenticationError, ForbiddenError, RateLimitError,
)

try:
    client.update_prompt(7, {"name": "..."})
except RateLimitError as exc:
    sleep(exc.retry_after or 1.0)
except (AuthenticationError, ForbiddenError):
    refresh_credentials()

The SDK defaults to https://api.pipelines.tech. Point elsewhere with PIPELINES_ENV=staging, the CLI's --env flag, or base_url=.

See API_REFERENCE.md for the full method inventory.

Hosting an agent

With pipelines-sdk[odyssey] installed you can host your own agent behind a Pipelines-compatible POST /dispatch endpoint. register_dispatch_route handles inbound auth, the reachability probe, envelope parsing, and response shaping; proxy_call is the HTTP shim your tool bodies use to reach the platform mid-run.

from fastapi import FastAPI
from pipelines.odyssey import register_dispatch_route, proxy_call

app = FastAPI()

@register_dispatch_route(app, agent_token_env="AGENT_TOKEN")
async def run(envelope):
    # envelope.user_instruction is the inbound prompt; proxy_call reaches
    # the platform (tools, context) for the duration of this run.
    result = await build_and_run_agent(envelope.user_instruction)
    return result  # a str → {"final_response": ...}, or return a dict/envelope

The decorated handler can return a str (treated as final_response), a dict with at least {"final_response": "..."}, or any object exposing a final_output / final_response / output attribute. Framework adapters under pipelines.odyssey.adapters wrap Anthropic, OpenAI Agents, LangChain, and Strands runners into this shape. The LangChain adapter's pipelines_proxy handles Tool / StructuredTool and plain callables, and also BaseTool subclasses without a func slot — the shape of most langchain_community tools (DuckDuckGoSearchRun, WikipediaQueryRun, ...):

from langchain_community.tools import DuckDuckGoSearchRun
from pipelines.odyssey.adapters.langchain import pipelines_proxy

search = pipelines_proxy()(DuckDuckGoSearchRun())  # proxied in-run, real locally

Tool returns are model-facing payloads

In sandbox mode, proxy_call (and every adapter-wrapped tool) returns the platform simulator's response verbatim. That response is generated by an LLM for the model's benefit — by default nothing guarantees it matches the shape your real tool implementation returns. Agent code that consumes a proxied tool result structurally (a LangGraph seed/bootstrap node indexing into the payload, a handler doing result["orders"][0]["id"]) must either parse defensively or declare an output_schema for the tool.

A declared output_schema is a contract, not a hint: the simulator validates its tool_response against it and regenerates on violation, so conforming shapes are safe to consume from code. If the simulator cannot conform within its retry budget the tool call fails with a structured error payload instead of returning a malformed result.

from pipelines.odyssey.adapters.langchain import dump_tools_schema, proxy_tool

ORDER_SCHEMA = {
    "type": "object",
    "properties": {"id": {"type": "string"}, "status": {"type": "string"}},
    "required": ["id", "status"],
}

# Tools you build yourself: declare it at construction.
get_order = proxy_tool(
    "get_order",
    description="Look up an order by id.",
    args_schema=GetOrderArgs,
    output_schema=ORDER_SCHEMA,
)

# Tools you don't construct (community tools): pass a mapping at export time.
tools_schema = dump_tools_schema(tools, output_schemas={"get_order": ORDER_SCHEMA})

Declaring tools

Tool builds one validated entry of the platform's tools_schema from Python, so you don't hand-author the JSON. Each tool picks one of three execution modes:

  • simulated() — the default. The tool runs as an Odyssey-mocked (PWSA-simulated) call: the platform synthesises the response from the scenario, your tool body never runs live.
  • passthrough(...) — the call is forwarded to a live ToolEndpoint. Pass tool_name plus one of endpoint_id / endpoint_name.
  • native — your agent's own un-proxied code. Nothing to declare; just don't route it through Odyssey.
from pipelines.odyssey import Tool, simulated, passthrough, adapter

tools = [
    # 1. Simulated (default) — Odyssey mocks the response.
    Tool(
        name="get_consensus_rating",
        description="Analyst consensus for a ticker",
        input_schema={
            "type": "object",
            "properties": {"ticker": {"type": "string"}},
            "required": ["ticker"],
        },
        mode=simulated(),
    ),
    # 2. Passthrough — forwarded to a live ToolEndpoint, with a declarative
    #    ledger adapter that records each call into the simulated world-state.
    Tool(
        name="save_analysis",
        input_schema={
            "type": "object",
            "properties": {
                "ticker": {"type": "string"},
                "recommendation": {"type": "string"},
            },
        },
        mode=passthrough(endpoint_name="mcp:analysis", tool_name="save"),
        ledger=adapter(
            op="add",
            entity_type="analysis",
            id_from="$.ticker",
            field_map={"recommendation": "$.recommendation"},
        ),
    ),
]

Wire the list straight into any registration / payload builder that takes tools_schema — they accept list[Tool] (each is serialized via Tool.to_dict()) as well as raw dicts:

from pipelines.odyssey import registration

payload = registration.build_http_agent_payload(
    name="analyst-agent",
    endpoint_url="https://my-agent.example.com/dispatch",
    tools_schema=tools,
)
# …or build_code_agent_payload(...) / create_http_agent(...) /
# create_code_agent(...) / generate_ledger_schema(...) — same tools_schema arg.

A ledger=adapter(...) with an entity op (add / update / remove) REQUIRES a matching ledger_schema whose entities include that entity_type — a mismatch is not caught locally; it surfaces as a create-time 422 from the platform's validate_adapter_ontology_consistency check (set_flag adapters are exempt). Draft the ontology with registration.generate_ledger_schema(...), edit it, and pass it as ledger_schema= in the same call.

In-sandbox tool calls

For a BYO in-sandbox / code-mode agent (a script the platform runs directly in the run's sandbox, not a hosted /dispatch endpoint), there's no inbound envelope to parse — the platform injects the proxy URL and run token as PIPELINES_* env vars instead. Envelope.from_env() reads them, and proxy_call_with reaches the proxy:

from pipelines.odyssey import Envelope, proxy_call_with

env = Envelope.from_env()  # reads PIPELINES_ODYSSEY_PROXY_URL + PIPELINES_RUN_TOKEN
order = proxy_call_with(env, "get_order", {"order_id": "4521"})
print(order)

Envelope.from_env() raises KeyError if either load-bearing var (PIPELINES_ODYSSEY_PROXY_URL, PIPELINES_RUN_TOKEN) is missing; the correlation ids (PIPELINES_RUN_TOKEN_JTI, PIPELINES_RUN_ID, PIPELINES_TASK_ID) are optional. A hosted handler reads the same fields off the parsed envelope, so the plain proxy_call(name, args) (which pulls the active envelope from the request ContextVar) is the equivalent there.

CLI

# Core: auth and resource management
pipelines auth login --api-key "pk_live_..."   # or export PIPELINES_API_KEY
pipelines whoami
pipelines projects list
pipelines workflow list --project-id 22
pipelines datasets export --dataset-id 123 --format csv --output dataset.csv

# Agents: scaffold, run locally, and register
pipelines odyssey init --framework anthropic --dir my-agent
pipelines odyssey dev --app my_agent.app:app
pipelines odyssey push

The core CLI autoloads a .env file if present (--no-dotenv opts out). Config is saved at ~/.config/pipelines/config.json with owner-only permissions; in CI prefer PIPELINES_API_KEY over a checked-in config file.

Documentation

Full guides live at https://platform.pipelines.tech/docs.

License

Proprietary — see LICENSE. © Pipelines.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pipelines_sdk-0.1.7-py3-none-any.whl (221.1 kB view details)

Uploaded Python 3

File details

Details for the file pipelines_sdk-0.1.7-py3-none-any.whl.

File metadata

  • Download URL: pipelines_sdk-0.1.7-py3-none-any.whl
  • Upload date:
  • Size: 221.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pipelines_sdk-0.1.7-py3-none-any.whl
Algorithm Hash digest
SHA256 3f73a19e460568980d667792907c7fb41f9b1ee2f9bb31bce9f960fff0116a2c
MD5 6a28aedbe7c503e370fd38f4af099eb9
BLAKE2b-256 ed2ca8dacff71b10e9004cc3c21e47ffede42d3dd8ff397fea263f28adb02262

See more details on using hashes here.

Provenance

The following attestation bundles were made for pipelines_sdk-0.1.7-py3-none-any.whl:

Publisher: sdk-publish.yml on BuildPipelines/pipelines_monorepo

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page