Skip to main content

Official Python client for StreamFlow Pulse — AI Agent Platform

Project description

streamflow-pulse-client — Python SDK for StreamFlow Pulse

Official Python client for the Pulse AI Agent Platform.

Distribution name on PyPI is streamflow-pulse-client; import statement stays the natural from pulse_client import ... (same convention as python-dateutilimport dateutil).

from pulse_client import PulseClient

with PulseClient("http://localhost:9090") as client:
    client.auth.login("alice", "secret")
    for pipeline in client.pipelines.list():
        print(pipeline["name"])

Install

pip install streamflow-pulse-client

Requires Python 3.10+. Pure Python — only depends on httpx.

Why pulse-client

  • Pythonic — context-manager friendly, typed exceptions, attribute-style resource access (client.pipelines.list()).
  • Lightweight — single dependency (httpx), <500 LoC, no generated bloat.
  • Spec-aligned — every method corresponds 1:1 to an endpoint in the Pulse OpenAPI 3.1 spec. Drift is caught at PR time by the in-tree spec invariant tests (B-103).
  • Async-ready — the sync client ships today; an AsyncPulseClient (same surface, await everywhere) will follow in v3.0.

Quick start

from pulse_client import PulseClient, PulseAuthError

client = PulseClient("http://localhost:9090")

# Authenticate — the returned JWT is cached on the client automatically
try:
    response = client.auth.login("alice", "secret")
    print(f"Logged in as {response['user']['username']}")
except PulseAuthError as e:
    print(f"Login failed: {e}")

# List + inspect resources
for pipeline in client.pipelines.list():
    print(pipeline["name"], pipeline["status"])

# Create a pipeline from a template
new_pipeline = client.pipelines.create({
    "name": "my-fraud-detector",
    "templateId": "fintech-fraud-detection-realtime",
    "nodes": [
        {"id": "source", "type": "source", "subType": "kafka-source"},
        {"id": "agent", "type": "agent", "subType": "streaming"},
        {"id": "sink", "type": "sink", "subType": "telegram"},
    ],
})

# Inspect deployed agents
for agent in client.agents.list():
    print(f"  {agent['name']}{agent['engineType']}{agent['status']}")

client.close()

Supported surfaces (v2.6.0)

Resource Methods Notes
client.auth login(), refresh(), organizations(), switch_org() Auto-caches JWT on the client after login / refresh / switch_org.
client.pipelines list(), get(id), create(definition), delete(id) definition follows the CreatePipelineRequest schema (see OpenAPI spec).
client.agents list(), get(id) Read-only — agents are owned by pipelines.
client.templates list() The 223+ first-party templates.
client.users list() Requires USERS_LIST permission (Owner / Platform Admin personas).
client.version() top-level Public — no JWT required.

The full ~112-endpoint surface (admin, audit, backups, chat, workspace, etc.) is documented in the OpenAPI spec at <pulse-server>/api-docs. SDK methods for those land opportunistically as user-facing demand surfaces.

Authentication

Three patterns, pick what fits:

# 1. Username + password (interactive / CLI tools)
client = PulseClient("http://localhost:9090")
client.auth.login("alice", "secret")

# 2. Pre-minted JWT (CI / service accounts)
client = PulseClient("http://localhost:9090", token="ey...")

# 3. JWT from environment (12-factor apps)
import os
client = PulseClient(os.environ["PULSE_URL"], token=os.environ["PULSE_TOKEN"])

For long-running daemons, store the refreshToken from login() and call client.auth.refresh(refresh_token) when the JWT nears expiry (default 1 h TTL).

Error handling

Every server error becomes a typed exception you can catch precisely:

from pulse_client import (
    PulseClientError,   # base — catches every client-side error
    PulseAuthError,     # 401 — invalid / missing / expired JWT
    PulseNotFoundError, # 404
    PulseValidationError, # 400 — malformed request
    PulseRateLimitError,  # 429 — carries .retry_after_seconds
    PulseAPIError,      # everything else (5xx, etc.)
)

try:
    client.pipelines.get("nope")
except PulseNotFoundError:
    print("Doesn't exist — fine")
except PulseRateLimitError as e:
    print(f"Backing off {e.retry_after_seconds}s")
    time.sleep(e.retry_after_seconds or 60)
except PulseClientError as e:
    print(f"Something else went wrong: {e}")

Every exception carries .status_code, .path, and .body so log lines + bug reports are actionable.

Development

git clone https://github.com/olsisoft/streamflow.git
cd streamflow/pulse-py

# Install in editable mode with dev deps
pip install -e ".[dev]"

# Run tests
pytest

# Lint
ruff check src tests
mypy src

CI runs the same on every push touching pulse-py/ — see .github/workflows/pulse-py.yaml.

Roadmap

  • v2.5.x — current sync API, 5 core resources (auth, pipelines, agents, templates, users), version().
  • v2.6.x — expanded resource coverage: backups, schedules, credentials, settings, approvals, chat.
  • v3.0AsyncPulseClient with async def everywhere; same surface; one library, two clients.
  • B-098 satellite — once olsisoft/pulse-py exists as its own repo, this in-tree code lifts out wholesale. Pip-install will switch to the satellite; in-tree continues to mirror for one release cycle so the migration is non-breaking.

Track progress in docs/STREAMFLOW-BACKLOG.md under item B-098.

License

Apache 2.0 — same as the parent Pulse repository.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamflow_pulse_client-2.6.0.tar.gz (40.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamflow_pulse_client-2.6.0-py3-none-any.whl (27.2 kB view details)

Uploaded Python 3

File details

Details for the file streamflow_pulse_client-2.6.0.tar.gz.

File metadata

  • Download URL: streamflow_pulse_client-2.6.0.tar.gz
  • Upload date:
  • Size: 40.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for streamflow_pulse_client-2.6.0.tar.gz
Algorithm Hash digest
SHA256 a584fbac1902ab24645b9c07a3c24c9a63631b99c8b152a8ecace468b7e4ac13
MD5 09c128182e0cc0f383e9d381dae8ddaa
BLAKE2b-256 58fdba7fedb86dfc7582c6efd8cdd7f3f2af3ea015d80f62172659a10d4877ba

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamflow_pulse_client-2.6.0.tar.gz:

Publisher: release.yaml on olsisoft/pulse-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file streamflow_pulse_client-2.6.0-py3-none-any.whl.

File metadata

File hashes

Hashes for streamflow_pulse_client-2.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cdc51e4628792b6bf5fb2e0bbe0e9d50d88de908280b1cb77ab989387e9e0b7a
MD5 b6af524c16ebd625a1d376289cabc935
BLAKE2b-256 9f13604f2d6f773bdc9ba99d7b91b51c654eacda789f8550657006531dbb3542

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamflow_pulse_client-2.6.0-py3-none-any.whl:

Publisher: release.yaml on olsisoft/pulse-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page