Skip to main content

Official Python SDK for the Floaty API

Project description

Floaty SDK for Python

Official Python SDK for the Floaty API.

Installation

pip install floaty-sdk

Quick Start

from floaty import FloatyClient

# Initialize the client
client = FloatyClient(api_key="flt_live_your_key_here")

# Create a chat completion
response = client.chat.completions.create(
    model="auto",
    messages=[
        {"role": "user", "content": "Hello, Floaty!"}
    ]
)

print(response.choices[0].message.content)

Streaming

# Stream responses
stream = client.chat.completions.create(
    model="claude-sonnet",
    messages=[{"role": "user", "content": "Write a poem about coding"}],
    stream=True
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")

AI Patterns

Access 35+ research-backed reasoning patterns:

# List available patterns
patterns = client.patterns.list()

# Apply a pattern
result = client.patterns.apply(
    pattern_id="chain-of-thought",
    prompt="What is 17 * 24?",
    execute=True
)

print(result.result)  # "408"

Available Models

Model Description
auto Automatically selects the best model
gpt-4o OpenAI GPT-4o
claude-sonnet Anthropic Claude Sonnet 4.5
gemini-flash Google Gemini 2.0 Flash
deepseek DeepSeek Chat

Managed Agents (beta)

Managed Agents are server-hosted, stateful agents with a per-session workspace, tool execution (built-in + custom + MCP), vault-backed credentials with OAuth refresh, file and GitHub repo mounts, streaming token output, interrupts, and permission-gated tool calls. The namespace shape mirrors @floatyos/sdk and @anthropic-ai/sdk.

The mandatory flow is Agent (once) → Session (every run). Create and store the agent's id, then reference it on each sessions.create.

Minimal end-to-end

from floaty import FloatyClient

client = FloatyClient(api_key="flt_live_...")

# --- ONE-TIME SETUP ---
environment = client.beta.environments.create(
    name="dev",
    config={"type": "cloud", "networking": {"type": "unrestricted"}},
)

agent = client.beta.agents.create(
    name="Coding Assistant",
    model="claude-opus-4-7",
    system="You are a careful senior engineer.",
    tools=[{"type": "agent_toolset_20260401"}],  # read/write/edit/glob/grep/bash/web_fetch
)
# Save: agent.id, environment.id (reuse; don't create() per run)

# --- RUNTIME ---
session = client.beta.sessions.create(
    agent=agent.id,
    environment_id=environment.id,
)

# Stream-first: open the SSE stream before sending the kickoff message,
# otherwise fast early events arrive in a single buffered batch.
stream = client.beta.sessions.events.stream(session.id)

client.beta.sessions.events.send(session.id, events=[
    {
        "type": "user.message",
        "content": [{"type": "text", "text": "List the largest files in /workspace."}],
    }
])

for event in stream:
    if event.type == "agent.message_delta":
        print(event.payload.get("delta", ""), end="", flush=True)
    elif event.type == "session.status_terminated":
        break
    elif event.type == "session.status_idle":
        stop = event.payload.get("stop_reason") or {}
        # Don't break on requires_action — the session idles transiently
        # while waiting on tool confirmations or custom tool results.
        if stop.get("type") != "requires_action":
            break

Custom tools — agent asks, your code answers

agent = client.beta.agents.create(
    name="Notifier",
    model="claude-opus-4-7",
    tools=[{
        "type": "custom",
        "name": "send_slack",
        "description": "Post a message to Slack",
        "input_schema": {
            "type": "object",
            "properties": {
                "channel": {"type": "string"},
                "text": {"type": "string"},
            },
            "required": ["channel", "text"],
        },
    }],
)

for event in stream:
    if event.type == "agent.custom_tool_use":
        tool_use_id = event.payload["tool_use_id"]
        inp = event.payload["input"]
        result = my_slack.post(inp["channel"], inp["text"])
        client.beta.sessions.events.send(session.id, events=[{
            "type": "user.custom_tool_result",
            "tool_use_id": tool_use_id,
            "content": [{"type": "text", "text": f"posted: {result.ts}"}],
        }])

Permission-gated tools

Set permission_policy: {"type": "always_ask"} on specific tools to gate execution pending user confirmation:

client.beta.agents.create(
    name="Careful writer",
    model="claude-opus-4-7",
    tools=[{
        "type": "agent_toolset_20260401",
        "default_config": {"enabled": True},
        "configs": [
            {"name": "write", "permission_policy": {"type": "always_ask"}},
            {"name": "edit",  "permission_policy": {"type": "always_ask"}},
            {"name": "bash",  "permission_policy": {"type": "always_ask"}},
        ],
    }],
)

# On agent.tool_use with evaluated_permission="ask", respond:
client.beta.sessions.events.send(session.id, events=[{
    "type": "user.tool_confirmation",
    "tool_use_id": "toolu_...",
    "result": "allow",  # or "deny" with optional "deny_message"
}])

Interrupt

client.beta.sessions.events.send(session.id, events=[{"type": "user.interrupt"}])
# In-flight inference aborts immediately;
# session settles idle with stop_reason.type == "interrupted".

GitHub repo mount

import os

session = client.beta.sessions.create(
    agent=agent.id,
    environment_id=environment.id,
    resources=[{
        "type": "github_repository",
        "url": "https://github.com/owner/repo",
        "authorization_token": os.environ["GITHUB_TOKEN"],
        "checkout": {"type": "branch", "name": "main"},
    }],
)
# Repo clones into /workspace/repo/. Agent can git pull / commit / push
# via the bash tool — auth persists in .git/config for the session.

Vaults + MCP

vault = client.beta.vaults.create(name="integrations")

client.beta.vaults.credentials.create(
    vault.id,
    display_name="Linear (workspace-foo)",
    auth={
        "type": "mcp_oauth",
        "mcp_server_url": "https://mcp.linear.app/mcp",
        "access_token": "...",
        "expires_at": "2026-04-24T10:00:00Z",
        "refresh": {
            "refresh_token": "...",
            "client_id": "...",
            "token_endpoint": "https://api.linear.app/oauth/token",
            "token_endpoint_auth": {
                "type": "client_secret_post",
                "client_secret": "...",
            },
        },
    },
)

agent = client.beta.agents.create(
    name="Linear Agent",
    model="claude-opus-4-7",
    mcp_servers=[{"type": "url", "name": "linear", "url": "https://mcp.linear.app/mcp"}],
    tools=[
        {"type": "agent_toolset_20260401"},
        {"type": "mcp_toolset", "mcp_server_name": "linear"},
    ],
)

session = client.beta.sessions.create(
    agent=agent.id,
    environment_id=environment.id,
    vault_ids=[vault.id],  # Linear MCP calls auto-inject the vault's access token
)

Reconnecting after a dropped stream

SSE has no replay. On reconnect, overlap history + live stream and dedupe by event.id / seq:

seen = set()
stream = client.beta.sessions.events.stream(session_id)

# 1. Pull history since the last seen seq
for event in client.beta.sessions.events.list(session_id, after_seq=last_seq):
    seen.add(event.id)
    handle(event)

# 2. Tail the live stream, skipping anything already processed
for event in stream:
    if event.id in seen:
        continue
    seen.add(event.id)
    handle(event)
    if event.type == "session.status_terminated":
        break
    if event.type == "session.status_idle":
        stop = event.payload.get("stop_reason") or {}
        if stop.get("type") != "requires_action":
            break

Usage Statistics

# Get usage stats
stats = client.usage.get()
print(f"Total requests: {stats.total_requests}")
print(f"Total tokens: {stats.total_tokens}")

# Check rate limits
limits = client.usage.limits()
print(f"Remaining: {limits.requests_remaining}/{limits.rate_limit_per_minute}")

Error Handling

from floaty.types import APIError

try:
    response = client.chat.completions.create(
        messages=[{"role": "user", "content": "Hello"}]
    )
except APIError as e:
    print(f"Error {e.code}: {e.message}")

Configuration

client = FloatyClient(
    api_key="flt_live_...",
    base_url="https://floaty.pro/api/v1",  # Optional
    max_retries=3,  # Retry on transient errors
    timeout=30.0,   # Request timeout in seconds
)

Currents API (Automated Workflows)

Currents are automated AI workflows that run on schedule, events, or conditions.

Create a Current

current = client.currents.create(
    name="Daily Competitor Monitor",
    trigger={
        "type": "schedule",
        "preset": "daily",
        "timezone": "America/New_York"
    },
    assignment={
        "type": "single",
        "floatRole": "researcher"
    },
    input={
        "prompt": "Research competitor {{competitor}} for pricing and feature changes.",
        "variables": {"competitor": "Acme Inc"}
    },
    output={
        "destinations": [
            {"type": "inbox"},
            {"type": "slack", "channel": "#competitive-intel"}
        ]
    }
)

print(f"Created Current: {current.id}")

List and Manage Currents

# List all Currents
currents = client.currents.list()

# Filter by status
active_currents = client.currents.list(status="active")

# Pause a Current
client.currents.pause("current_id")

# Resume a Current
client.currents.resume("current_id")

# Trigger manual run
run = client.currents.run("current_id")
print(f"Run ID: {run.run_id}")

# Delete a Current
client.currents.delete("current_id")

Trigger Types

# Schedule trigger
{"type": "schedule", "preset": "daily"}  # daily, weekly, monthly, quarterly

# Event trigger (webhooks, Stripe, GitHub)
{"type": "event", "source": "stripe", "eventType": "payment_intent.succeeded"}

# Condition trigger (metric-based)
{
    "type": "condition",
    "checkInterval": 60,
    "conditions": [{"source": "stripe", "metric": "mrr", "operator": "gt", "value": 10000}]
}

Assignment Types

# Single Float
{"type": "single", "floatRole": "analyst"}

# Chain (sequential pipeline)
{
    "type": "chain",
    "steps": [
        {"floatRole": "researcher", "instruction": "Gather data"},
        {"floatRole": "analyst", "instruction": "Analyze findings"}
    ]
}

# Parallel execution
{
    "type": "parallel",
    "branches": [
        {"floatRole": "researcher", "instruction": "Research market"},
        {"floatRole": "analyst", "instruction": "Analyze competitors"}
    ]
}

Output Destinations

"destinations": [
    {"type": "inbox"},
    {"type": "email", "to": "team@company.com"},
    {"type": "slack", "channel": "#updates"},
    {"type": "webhook", "url": "https://api.example.com/hook"},
    {"type": "twitter", "accountId": "account_123"},
    {"type": "linkedin", "accountId": "account_456"}
]

Webhooks

Create webhook endpoints to trigger Currents from external systems:

# Create a webhook
webhook = client.webhooks.create(
    name="CRM Webhook",
    description="Receives lead events from CRM"
)

print(f"Webhook URL: {webhook.url}")
print(f"Secret: {webhook.secret}")

# List webhooks
webhooks = client.webhooks.list()

# Delete a webhook
client.webhooks.delete("webhook_id")

Async Support

import asyncio
from floaty import AsyncFloatyClient

async def main():
    client = AsyncFloatyClient(api_key="flt_live_...")

    response = await client.chat.completions.create(
        model="auto",
        messages=[{"role": "user", "content": "Hello!"}]
    )

    print(response.choices[0].message.content)

asyncio.run(main())

License

Proprietary — see the repository LICENSE for terms.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

floaty_sdk-0.1.1.tar.gz (19.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

floaty_sdk-0.1.1-py3-none-any.whl (17.4 kB view details)

Uploaded Python 3

File details

Details for the file floaty_sdk-0.1.1.tar.gz.

File metadata

  • Download URL: floaty_sdk-0.1.1.tar.gz
  • Upload date:
  • Size: 19.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for floaty_sdk-0.1.1.tar.gz
Algorithm Hash digest
SHA256 bf70b44dc0b296d590ba74ba0b37175d8eb6b78f9e6031bd1178399cfb8b2a52
MD5 976f2ca55b650943567d77a3f4cb2ceb
BLAKE2b-256 3f43a2656fb56555b9bf05f8883b26f63bb1d9c2c4028fde1dc10a7ed06413bd

See more details on using hashes here.

File details

Details for the file floaty_sdk-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: floaty_sdk-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 17.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for floaty_sdk-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8766f461059b8df1ea8d5c00053607df21fc7fb909edd64a01735e7342b3a231
MD5 0724ba4080c8443ef41aa1cc01830713
BLAKE2b-256 273d2723ea34d1ffe6d3e60a0f8617c4a5010dcf327fde50c4cc6e552f22b4ce

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page