Skip to main content

Official Python client for StreamFlow Pulse — AI Agent Platform

Project description

streamflow-pulse-client — Python SDK for StreamFlow Pulse

Official Python client for the Pulse AI Agent Platform.

Distribution name on PyPI is streamflow-pulse-client; import statement stays the natural from pulse_client import ... (same convention as python-dateutilimport dateutil).

from pulse_client import PulseClient

with PulseClient("http://localhost:9090") as client:
    client.auth.login("alice", "secret")
    for pipeline in client.pipelines.list():
        print(pipeline["name"])

Install

pip install streamflow-pulse-client

Requires Python 3.10+. Pure Python — only depends on httpx.

Why pulse-client

  • Pythonic — context-manager friendly, typed exceptions, attribute-style resource access (client.pipelines.list()).
  • Lightweight — single dependency (httpx), <500 LoC, no generated bloat.
  • Spec-aligned — every method corresponds 1:1 to an endpoint in the Pulse OpenAPI 3.1 spec. Drift is caught at PR time by the in-tree spec invariant tests (B-103).
  • Async-ready — the sync client ships today; an AsyncPulseClient (same surface, await everywhere) will follow in v3.0.

Quick start

from pulse_client import PulseClient, PulseAuthError

client = PulseClient("http://localhost:9090")

# Authenticate — the returned JWT is cached on the client automatically
try:
    response = client.auth.login("alice", "secret")
    print(f"Logged in as {response['user']['username']}")
except PulseAuthError as e:
    print(f"Login failed: {e}")

# List + inspect resources
for pipeline in client.pipelines.list():
    print(pipeline["name"], pipeline["status"])

# Create a pipeline from a template
new_pipeline = client.pipelines.create({
    "name": "my-fraud-detector",
    "templateId": "fintech-fraud-detection-realtime",
    "nodes": [
        {"id": "source", "type": "source", "subType": "kafka-source"},
        {"id": "agent", "type": "agent", "subType": "streaming"},
        {"id": "sink", "type": "sink", "subType": "telegram"},
    ],
})

# Inspect deployed agents
for agent in client.agents.list():
    print(f"  {agent['name']}{agent['engineType']}{agent['status']}")

client.close()

Supported surfaces (v2.6.0)

Resource Methods Notes
client.auth login(), refresh(), organizations(), switch_org() Auto-caches JWT on the client after login / refresh / switch_org.
client.pipelines list(), get(id), create(definition), delete(id) definition follows the CreatePipelineRequest schema (see OpenAPI spec).
client.agents list(), get(id) Read-only — agents are owned by pipelines.
client.templates list() The 223+ first-party templates.
client.users list() Requires USERS_LIST permission (Owner / Platform Admin personas).
client.version() top-level Public — no JWT required.

The full ~112-endpoint surface (admin, audit, backups, chat, workspace, etc.) is documented in the OpenAPI spec at <pulse-server>/api-docs. SDK methods for those land opportunistically as user-facing demand surfaces.

Embedded ML inference & duplex

Score events with an uploaded ONNX model in-process (B-112), and open a bidirectional duplex channel for synchronous decisions (B-114). Full guide: ML inference & duplex.

# Upload + score with an ONNX model (no model-server hop)
client.models.upload(name="fraud", path="./fraud.onnx",
                     input_schema={"amount": "float", "country": "float"})
builder.from_topic("transactions").ml_predict(
    model="fraud", input_fields=["amount", "country"], output_field="prediction"
).filter("prediction.fraud_score > 0.8").to_topic("flagged")

# Duplex: send in, receive the correlated output on one connection
# (pip install streamflow-pulse-client[duplex])
async with client.duplex("fraud-detector") as ch:
    await ch.send({"amount": 5000}, correlation_id="tx-1")
    signal = await ch.recv()        # signal["correlation_id"] == "tx-1"

Authentication

Three patterns, pick what fits:

# 1. Username + password (interactive / CLI tools)
client = PulseClient("http://localhost:9090")
client.auth.login("alice", "secret")

# 2. Pre-minted JWT (CI / service accounts)
client = PulseClient("http://localhost:9090", token="ey...")

# 3. JWT from environment (12-factor apps)
import os
client = PulseClient(os.environ["PULSE_URL"], token=os.environ["PULSE_TOKEN"])

For long-running daemons, store the refreshToken from login() and call client.auth.refresh(refresh_token) when the JWT nears expiry (default 1 h TTL).

Error handling

Every server error becomes a typed exception you can catch precisely:

from pulse_client import (
    PulseClientError,   # base — catches every client-side error
    PulseAuthError,     # 401 — invalid / missing / expired JWT
    PulseNotFoundError, # 404
    PulseValidationError, # 400 — malformed request
    PulseRateLimitError,  # 429 — carries .retry_after_seconds
    PulseAPIError,      # everything else (5xx, etc.)
)

try:
    client.pipelines.get("nope")
except PulseNotFoundError:
    print("Doesn't exist — fine")
except PulseRateLimitError as e:
    print(f"Backing off {e.retry_after_seconds}s")
    time.sleep(e.retry_after_seconds or 60)
except PulseClientError as e:
    print(f"Something else went wrong: {e}")

Every exception carries .status_code, .path, and .body so log lines + bug reports are actionable.

Development

git clone https://github.com/olsisoft/pulse-py.git
cd pulse-py

# Install in editable mode with dev deps
pip install -e ".[dev]"

# Run tests
pytest

# Lint
ruff check src tests
mypy src

CI runs the same on every push touching pulse-py/ — see .github/workflows/pulse-py.yaml.

Roadmap

  • v2.5.x — current sync API, 5 core resources (auth, pipelines, agents, templates, users), version().
  • v2.6.x — expanded resource coverage: backups, schedules, credentials, settings, approvals, chat.
  • v3.0AsyncPulseClient with async def everywhere; same surface; one library, two clients.
  • B-098 satellite — once olsisoft/pulse-py exists as its own repo, this in-tree code lifts out wholesale. Pip-install will switch to the satellite; in-tree continues to mirror for one release cycle so the migration is non-breaking.

Track progress in docs/STREAMFLOW-BACKLOG.md under item B-098.

License

Apache 2.0 — same as the parent Pulse repository.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamflow_pulse_client-2.6.1.tar.gz (53.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamflow_pulse_client-2.6.1-py3-none-any.whl (35.0 kB view details)

Uploaded Python 3

File details

Details for the file streamflow_pulse_client-2.6.1.tar.gz.

File metadata

  • Download URL: streamflow_pulse_client-2.6.1.tar.gz
  • Upload date:
  • Size: 53.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for streamflow_pulse_client-2.6.1.tar.gz
Algorithm Hash digest
SHA256 6be0dd170b128655b22658db0bf9a76f311161daf045e07b709df80e0886b4cc
MD5 237fd95fcef0d25f544d63fd7a498e7a
BLAKE2b-256 360a9cbc34a11072931559cecf95f44c2f4b9bf4329db4027b37ebc921da7b6a

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamflow_pulse_client-2.6.1.tar.gz:

Publisher: release.yaml on olsisoft/pulse-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file streamflow_pulse_client-2.6.1-py3-none-any.whl.

File metadata

File hashes

Hashes for streamflow_pulse_client-2.6.1-py3-none-any.whl
Algorithm Hash digest
SHA256 4c4c2851d6f1af009590678264baae0b094535db62942007616f2048c557994c
MD5 01df9f92d6239dccc3bd81d1264893c2
BLAKE2b-256 fdbe81332cfbcd6ec731b7a0042faaf5c4165991aa8c016608c50696948b81a9

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamflow_pulse_client-2.6.1-py3-none-any.whl:

Publisher: release.yaml on olsisoft/pulse-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page