Skip to main content

Official Python client for StreamFlow Pulse — AI Agent Platform

Project description

streamflow-pulse-client — Python SDK for StreamFlow Pulse

Official Python client for the Pulse AI Agent Platform.

Distribution name on PyPI is streamflow-pulse-client; import statement stays the natural from pulse_client import ... (same convention as python-dateutilimport dateutil).

from pulse_client import PulseClient

with PulseClient("http://localhost:9090") as client:
    client.auth.login("alice", "secret")
    for pipeline in client.pipelines.list():
        print(pipeline["name"])

Install

pip install streamflow-pulse-client

Requires Python 3.10+. Pure Python — only depends on httpx.

Why pulse-client

  • Pythonic — context-manager friendly, typed exceptions, attribute-style resource access (client.pipelines.list()).
  • Lightweight — single dependency (httpx), <500 LoC, no generated bloat.
  • Spec-aligned — every method corresponds 1:1 to an endpoint in the Pulse OpenAPI 3.1 spec. Drift is caught at PR time by the in-tree spec invariant tests (B-103).
  • Async-ready — the sync client ships today; an AsyncPulseClient (same surface, await everywhere) will follow in v3.0.

Quick start

from pulse_client import PulseClient, PulseAuthError

client = PulseClient("http://localhost:9090")

# Authenticate — the returned JWT is cached on the client automatically
try:
    response = client.auth.login("alice", "secret")
    print(f"Logged in as {response['user']['username']}")
except PulseAuthError as e:
    print(f"Login failed: {e}")

# List + inspect resources
for pipeline in client.pipelines.list():
    print(pipeline["name"], pipeline["status"])

# Create a pipeline from a template
new_pipeline = client.pipelines.create({
    "name": "my-fraud-detector",
    "templateId": "fintech-fraud-detection-realtime",
    "nodes": [
        {"id": "source", "type": "source", "subType": "kafka-source"},
        {"id": "agent", "type": "agent", "subType": "streaming"},
        {"id": "sink", "type": "sink", "subType": "telegram"},
    ],
})

# Inspect deployed agents
for agent in client.agents.list():
    print(f"  {agent['name']}{agent['engineType']}{agent['status']}")

client.close()

Supported surfaces (v2.5.8)

Resource Methods Notes
client.auth login(), refresh(), organizations(), switch_org() Auto-caches JWT on the client after login / refresh / switch_org.
client.pipelines list(), get(id), create(definition), delete(id) definition follows the CreatePipelineRequest schema (see OpenAPI spec).
client.agents list(), get(id) Read-only — agents are owned by pipelines.
client.templates list() The 223+ first-party templates.
client.users list() Requires USERS_LIST permission (Owner / Platform Admin personas).
client.version() top-level Public — no JWT required.

The full ~112-endpoint surface (admin, audit, backups, chat, workspace, etc.) is documented in the OpenAPI spec at <pulse-server>/api-docs. SDK methods for those land opportunistically as user-facing demand surfaces.

Authentication

Three patterns, pick what fits:

# 1. Username + password (interactive / CLI tools)
client = PulseClient("http://localhost:9090")
client.auth.login("alice", "secret")

# 2. Pre-minted JWT (CI / service accounts)
client = PulseClient("http://localhost:9090", token="ey...")

# 3. JWT from environment (12-factor apps)
import os
client = PulseClient(os.environ["PULSE_URL"], token=os.environ["PULSE_TOKEN"])

For long-running daemons, store the refreshToken from login() and call client.auth.refresh(refresh_token) when the JWT nears expiry (default 1 h TTL).

Error handling

Every server error becomes a typed exception you can catch precisely:

from pulse_client import (
    PulseClientError,   # base — catches every client-side error
    PulseAuthError,     # 401 — invalid / missing / expired JWT
    PulseNotFoundError, # 404
    PulseValidationError, # 400 — malformed request
    PulseRateLimitError,  # 429 — carries .retry_after_seconds
    PulseAPIError,      # everything else (5xx, etc.)
)

try:
    client.pipelines.get("nope")
except PulseNotFoundError:
    print("Doesn't exist — fine")
except PulseRateLimitError as e:
    print(f"Backing off {e.retry_after_seconds}s")
    time.sleep(e.retry_after_seconds or 60)
except PulseClientError as e:
    print(f"Something else went wrong: {e}")

Every exception carries .status_code, .path, and .body so log lines + bug reports are actionable.

Development

git clone https://github.com/olsisoft/streamflow.git
cd streamflow/pulse-py

# Install in editable mode with dev deps
pip install -e ".[dev]"

# Run tests
pytest

# Lint
ruff check src tests
mypy src

CI runs the same on every push touching pulse-py/ — see .github/workflows/pulse-py.yaml.

Roadmap

  • v2.5.x — current sync API, 5 core resources (auth, pipelines, agents, templates, users), version().
  • v2.6.x — expanded resource coverage: backups, schedules, credentials, settings, approvals, chat.
  • v3.0AsyncPulseClient with async def everywhere; same surface; one library, two clients.
  • B-098 satellite — once olsisoft/pulse-py exists as its own repo, this in-tree code lifts out wholesale. Pip-install will switch to the satellite; in-tree continues to mirror for one release cycle so the migration is non-breaking.

Track progress in docs/STREAMFLOW-BACKLOG.md under item B-098.

License

Apache 2.0 — same as the parent Pulse repository.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamflow_pulse_client-2.5.8.tar.gz (17.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamflow_pulse_client-2.5.8-py3-none-any.whl (14.5 kB view details)

Uploaded Python 3

File details

Details for the file streamflow_pulse_client-2.5.8.tar.gz.

File metadata

  • Download URL: streamflow_pulse_client-2.5.8.tar.gz
  • Upload date:
  • Size: 17.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for streamflow_pulse_client-2.5.8.tar.gz
Algorithm Hash digest
SHA256 f6b1b69928956df50a3fa026aab68aa42ba3c4773e1e4d07b8f7388448ea5260
MD5 d2cb34d177d1e2fa2f55c207686fa56d
BLAKE2b-256 40d77840bf831f235c226d590b08b226d5aa69145286a71fd321268d5cfc7045

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamflow_pulse_client-2.5.8.tar.gz:

Publisher: release.yaml on olsisoft/pulse-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file streamflow_pulse_client-2.5.8-py3-none-any.whl.

File metadata

File hashes

Hashes for streamflow_pulse_client-2.5.8-py3-none-any.whl
Algorithm Hash digest
SHA256 de87e07fcc76458594cc80ccc2ae2575021debabc6ad2a4d5f26d100c88ad049
MD5 5752f9becaba7384ceece40465f7336a
BLAKE2b-256 07485084760ada1dbd9350635d7f2e7b713e31745ca0b16dfe4857a1f0c9e3d5

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamflow_pulse_client-2.5.8-py3-none-any.whl:

Publisher: release.yaml on olsisoft/pulse-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page