Official Python SDK for the Floaty API
Project description
Floaty SDK for Python
Official Python SDK for the Floaty API.
Installation
pip install floaty-sdk
Quick Start
from floaty import FloatyClient
# Initialize the client
client = FloatyClient(api_key="flt_live_your_key_here")
# Create a chat completion
response = client.chat.completions.create(
model="auto",
messages=[
{"role": "user", "content": "Hello, Floaty!"}
]
)
print(response.choices[0].message.content)
Streaming
# Stream responses
stream = client.chat.completions.create(
model="claude-sonnet",
messages=[{"role": "user", "content": "Write a poem about coding"}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
AI Patterns
Access 35+ research-backed reasoning patterns:
# List available patterns
patterns = client.patterns.list()
# Apply a pattern
result = client.patterns.apply(
pattern_id="chain-of-thought",
prompt="What is 17 * 24?",
execute=True
)
print(result.result) # "408"
Available Models
| Model | Description |
|---|---|
auto |
Automatically selects the best model |
gpt-4o |
OpenAI GPT-4o |
claude-sonnet |
Anthropic Claude Sonnet 4.5 |
gemini-flash |
Google Gemini 2.0 Flash |
deepseek |
DeepSeek Chat |
Managed Agents (beta)
Managed Agents are server-hosted, stateful agents with a per-session workspace,
tool execution (built-in + custom + MCP), vault-backed credentials with OAuth
refresh, file and GitHub repo mounts, streaming token output, interrupts, and
permission-gated tool calls. The namespace shape mirrors
@floatyos/sdk and
@anthropic-ai/sdk.
The mandatory flow is Agent (once) → Session (every run). Create and store
the agent's id, then reference it on each sessions.create.
Minimal end-to-end
from floaty import FloatyClient
client = FloatyClient(api_key="flt_live_...")
# --- ONE-TIME SETUP ---
environment = client.beta.environments.create(
name="dev",
config={"type": "cloud", "networking": {"type": "unrestricted"}},
)
agent = client.beta.agents.create(
name="Coding Assistant",
model="claude-opus-4-7",
system="You are a careful senior engineer.",
tools=[{"type": "agent_toolset_20260401"}], # read/write/edit/glob/grep/bash/web_fetch
)
# Save: agent.id, environment.id (reuse; don't create() per run)
# --- RUNTIME ---
session = client.beta.sessions.create(
agent=agent.id,
environment_id=environment.id,
)
# Stream-first: open the SSE stream before sending the kickoff message,
# otherwise fast early events arrive in a single buffered batch.
stream = client.beta.sessions.events.stream(session.id)
client.beta.sessions.events.send(session.id, events=[
{
"type": "user.message",
"content": [{"type": "text", "text": "List the largest files in /workspace."}],
}
])
for event in stream:
if event.type == "agent.message_delta":
print(event.payload.get("delta", ""), end="", flush=True)
elif event.type == "session.status_terminated":
break
elif event.type == "session.status_idle":
stop = event.payload.get("stop_reason") or {}
# Don't break on requires_action — the session idles transiently
# while waiting on tool confirmations or custom tool results.
if stop.get("type") != "requires_action":
break
Custom tools — agent asks, your code answers
agent = client.beta.agents.create(
name="Notifier",
model="claude-opus-4-7",
tools=[{
"type": "custom",
"name": "send_slack",
"description": "Post a message to Slack",
"input_schema": {
"type": "object",
"properties": {
"channel": {"type": "string"},
"text": {"type": "string"},
},
"required": ["channel", "text"],
},
}],
)
for event in stream:
if event.type == "agent.custom_tool_use":
tool_use_id = event.payload["tool_use_id"]
inp = event.payload["input"]
result = my_slack.post(inp["channel"], inp["text"])
client.beta.sessions.events.send(session.id, events=[{
"type": "user.custom_tool_result",
"tool_use_id": tool_use_id,
"content": [{"type": "text", "text": f"posted: {result.ts}"}],
}])
Permission-gated tools
Set permission_policy: {"type": "always_ask"} on specific tools to gate
execution pending user confirmation:
client.beta.agents.create(
name="Careful writer",
model="claude-opus-4-7",
tools=[{
"type": "agent_toolset_20260401",
"default_config": {"enabled": True},
"configs": [
{"name": "write", "permission_policy": {"type": "always_ask"}},
{"name": "edit", "permission_policy": {"type": "always_ask"}},
{"name": "bash", "permission_policy": {"type": "always_ask"}},
],
}],
)
# On agent.tool_use with evaluated_permission="ask", respond:
client.beta.sessions.events.send(session.id, events=[{
"type": "user.tool_confirmation",
"tool_use_id": "toolu_...",
"result": "allow", # or "deny" with optional "deny_message"
}])
Interrupt
client.beta.sessions.events.send(session.id, events=[{"type": "user.interrupt"}])
# In-flight inference aborts immediately;
# session settles idle with stop_reason.type == "interrupted".
GitHub repo mount
import os
session = client.beta.sessions.create(
agent=agent.id,
environment_id=environment.id,
resources=[{
"type": "github_repository",
"url": "https://github.com/owner/repo",
"authorization_token": os.environ["GITHUB_TOKEN"],
"checkout": {"type": "branch", "name": "main"},
}],
)
# Repo clones into /workspace/repo/. Agent can git pull / commit / push
# via the bash tool — auth persists in .git/config for the session.
Vaults + MCP
vault = client.beta.vaults.create(name="integrations")
client.beta.vaults.credentials.create(
vault.id,
display_name="Linear (workspace-foo)",
auth={
"type": "mcp_oauth",
"mcp_server_url": "https://mcp.linear.app/mcp",
"access_token": "...",
"expires_at": "2026-04-24T10:00:00Z",
"refresh": {
"refresh_token": "...",
"client_id": "...",
"token_endpoint": "https://api.linear.app/oauth/token",
"token_endpoint_auth": {
"type": "client_secret_post",
"client_secret": "...",
},
},
},
)
agent = client.beta.agents.create(
name="Linear Agent",
model="claude-opus-4-7",
mcp_servers=[{"type": "url", "name": "linear", "url": "https://mcp.linear.app/mcp"}],
tools=[
{"type": "agent_toolset_20260401"},
{"type": "mcp_toolset", "mcp_server_name": "linear"},
],
)
session = client.beta.sessions.create(
agent=agent.id,
environment_id=environment.id,
vault_ids=[vault.id], # Linear MCP calls auto-inject the vault's access token
)
Reconnecting after a dropped stream
SSE has no replay. On reconnect, overlap history + live stream and dedupe
by event.id / seq:
seen = set()
stream = client.beta.sessions.events.stream(session_id)
# 1. Pull history since the last seen seq
for event in client.beta.sessions.events.list(session_id, after_seq=last_seq):
seen.add(event.id)
handle(event)
# 2. Tail the live stream, skipping anything already processed
for event in stream:
if event.id in seen:
continue
seen.add(event.id)
handle(event)
if event.type == "session.status_terminated":
break
if event.type == "session.status_idle":
stop = event.payload.get("stop_reason") or {}
if stop.get("type") != "requires_action":
break
Usage Statistics
# Get usage stats
stats = client.usage.get()
print(f"Total requests: {stats.total_requests}")
print(f"Total tokens: {stats.total_tokens}")
# Check rate limits
limits = client.usage.limits()
print(f"Remaining: {limits.requests_remaining}/{limits.rate_limit_per_minute}")
Error Handling
from floaty.types import APIError
try:
response = client.chat.completions.create(
messages=[{"role": "user", "content": "Hello"}]
)
except APIError as e:
print(f"Error {e.code}: {e.message}")
Configuration
client = FloatyClient(
api_key="flt_live_...",
base_url="https://floaty.pro/api/v1", # Optional
max_retries=3, # Retry on transient errors
timeout=30.0, # Request timeout in seconds
)
Currents API (Automated Workflows)
Currents are automated AI workflows that run on schedule, events, or conditions.
Create a Current
current = client.currents.create(
name="Daily Competitor Monitor",
trigger={
"type": "schedule",
"preset": "daily",
"timezone": "America/New_York"
},
assignment={
"type": "single",
"floatRole": "researcher"
},
input={
"prompt": "Research competitor {{competitor}} for pricing and feature changes.",
"variables": {"competitor": "Acme Inc"}
},
output={
"destinations": [
{"type": "inbox"},
{"type": "slack", "channel": "#competitive-intel"}
]
}
)
print(f"Created Current: {current.id}")
List and Manage Currents
# List all Currents
currents = client.currents.list()
# Filter by status
active_currents = client.currents.list(status="active")
# Pause a Current
client.currents.pause("current_id")
# Resume a Current
client.currents.resume("current_id")
# Trigger manual run
run = client.currents.run("current_id")
print(f"Run ID: {run.run_id}")
# Delete a Current
client.currents.delete("current_id")
Trigger Types
# Schedule trigger
{"type": "schedule", "preset": "daily"} # daily, weekly, monthly, quarterly
# Event trigger (webhooks, Stripe, GitHub)
{"type": "event", "source": "stripe", "eventType": "payment_intent.succeeded"}
# Condition trigger (metric-based)
{
"type": "condition",
"checkInterval": 60,
"conditions": [{"source": "stripe", "metric": "mrr", "operator": "gt", "value": 10000}]
}
Assignment Types
# Single Float
{"type": "single", "floatRole": "analyst"}
# Chain (sequential pipeline)
{
"type": "chain",
"steps": [
{"floatRole": "researcher", "instruction": "Gather data"},
{"floatRole": "analyst", "instruction": "Analyze findings"}
]
}
# Parallel execution
{
"type": "parallel",
"branches": [
{"floatRole": "researcher", "instruction": "Research market"},
{"floatRole": "analyst", "instruction": "Analyze competitors"}
]
}
Output Destinations
"destinations": [
{"type": "inbox"},
{"type": "email", "to": "team@company.com"},
{"type": "slack", "channel": "#updates"},
{"type": "webhook", "url": "https://api.example.com/hook"},
{"type": "twitter", "accountId": "account_123"},
{"type": "linkedin", "accountId": "account_456"}
]
Webhooks
Create webhook endpoints to trigger Currents from external systems:
# Create a webhook
webhook = client.webhooks.create(
name="CRM Webhook",
description="Receives lead events from CRM"
)
print(f"Webhook URL: {webhook.url}")
print(f"Secret: {webhook.secret}")
# List webhooks
webhooks = client.webhooks.list()
# Delete a webhook
client.webhooks.delete("webhook_id")
Async Support
import asyncio
from floaty import AsyncFloatyClient
async def main():
client = AsyncFloatyClient(api_key="flt_live_...")
response = await client.chat.completions.create(
model="auto",
messages=[{"role": "user", "content": "Hello!"}]
)
print(response.choices[0].message.content)
asyncio.run(main())
License
Proprietary — see the repository LICENSE for terms.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file floaty_sdk-0.1.1.tar.gz.
File metadata
- Download URL: floaty_sdk-0.1.1.tar.gz
- Upload date:
- Size: 19.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bf70b44dc0b296d590ba74ba0b37175d8eb6b78f9e6031bd1178399cfb8b2a52
|
|
| MD5 |
976f2ca55b650943567d77a3f4cb2ceb
|
|
| BLAKE2b-256 |
3f43a2656fb56555b9bf05f8883b26f63bb1d9c2c4028fde1dc10a7ed06413bd
|
File details
Details for the file floaty_sdk-0.1.1-py3-none-any.whl.
File metadata
- Download URL: floaty_sdk-0.1.1-py3-none-any.whl
- Upload date:
- Size: 17.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8766f461059b8df1ea8d5c00053607df21fc7fb909edd64a01735e7342b3a231
|
|
| MD5 |
0724ba4080c8443ef41aa1cc01830713
|
|
| BLAKE2b-256 |
273d2723ea34d1ffe6d3e60a0f8617c4a5010dcf327fde50c4cc6e552f22b4ce
|