A Python library for communicating with the Flowscale APIs
Project description
Flowscale Python SDK
A comprehensive Python SDK designed to simplify interaction with the FlowScale ComfyUI API. This library abstracts away the complexities of API calls, enabling you to effortlessly invoke workflows, retrieve outputs, manage workflow runs, and monitor system health.
🚀 What's New in v1.1.2
- 📚 Comprehensive Documentation: Completely updated README with advanced usage examples
- ✨ Enhanced
get_runs(): Pagination, sorting, and filtering with detailed examples - 🖼️ Dynamic Image Parameters: Support for dynamic parameter names (e.g.,
image_35728) - 📤 Multiple Image Uploads: Handle multiple images in a single workflow execution
- 🔧 Improved File Handling: Better support for various file types and upload methods
- 📊 Updated Response Types: New response structure with comprehensive metadata
Installation
Install the Flowscale SDK using pip:
pip install flowscale
Quick Start
Importing the SDK
To get started, import the Flowscale SDK into your project:
from flowscale import FlowscaleAPI
import os
# Initialize the SDK
api_key = os.environ.get("FLOWSCALE_API_KEY")
api_url = os.environ.get("FLOWSCALE_API_URL")
if not api_key or not api_url:
print("FLOWSCALE_API_KEY or FLOWSCALE_API_URL not set in environment")
exit(1)
flowscale = FlowscaleAPI(api_key, api_url)
Environment Variables: Add the following to your .env file:
FLOWSCALE_API_KEY=your-api-key
FLOWSCALE_API_URL=https://your-api-url.pod.flowscale.ai
SDK Methods
Below is a detailed guide to the SDK methods, including descriptions, usage, and response formats.
1. check_health()
Description: Check the health status of the Flowscale platform, including the status of containers and services.
Usage:
health = flowscale.check_health()
print(f"API Health: {health}")
Response Example:
{
"status": "success",
"data": [
{
"container": "container #1",
"status": "idle"
},
{
"container": "container #2",
"status": "running"
}
]
}
2. get_queue()
Description: Retrieve the current status of the workflow queue, including running and pending jobs.
Usage:
queue = flowscale.get_queue()
print(f"Queue Details: {queue}")
Response Example:
{
"status": "success",
"data": [
{
"container": "container #1",
"queue": {
"queue_running": [
[
0,
"2a0babc4-acce-4521-9576-00fa0e6ecc91"
]
],
"queue_pending": [
[
1,
"5d60718a-7e89-4c64-b32d-0d1366b44e2a"
]
]
}
}
]
}
3. execute_workflow(workflow_id, data, group_id=None)
Description:
Trigger a workflow execution using its unique workflow_id. Input data and an optional group_id can be provided for better organization and tracking.
Parameters:
workflow_id(string): The unique ID of the workflow.data(object): Input parameters for the workflow.group_id(string, optional): A custom identifier for grouping runs.
Usage:
workflow_id = "bncu0a1kipv"
group_id = "test_group"
# Basic usage with text inputs
inputs = {
"text_51536": "Prompt test",
"another_param": "some value"
}
# Advanced usage with files and dynamic parameters
inputs = {
"text_51536": "Prompt test",
"image_35728": open("path/to/image.png", "rb"), # File object
"image_42561": "path/to/another_image.jpg", # File path (auto-detected)
"video_1239": open("path/to/video.mp4", "rb") # Multiple file types
}
result = flowscale.execute_workflow(workflow_id, inputs, group_id)
print(f"Workflow Result: {result}")
Dynamic Parameter Names: The SDK now supports dynamic parameter names that match your workflow's requirements:
# Example with ComfyUI dynamic parameter names
inputs = {
"image_35728": open("input.png", "rb"), # Dynamic image parameter
"image_42561": "path/to/second_image.jpg", # Another dynamic image
"text_input_123": "Your prompt here", # Dynamic text parameter
"seed_456": 12345 # Dynamic numeric parameter
}
Response Example:
{
"status": "success",
"data": {
"number": 0,
"node_errors": {},
"output_names": [
"filename_prefix_58358_5WWF7GQUYF"
],
"run_id": "808f34d0-ef97-4b78-a00f-1268077ea6db"
}
}
3a. execute_workflow_async(workflow_id, data, group_id=None, timeout=300, polling_interval=1)
Description:
Execute a workflow and automatically poll for its output until completion or timeout. This is a convenience method that combines execute_workflow and get_output with polling logic.
Parameters:
workflow_id(string): The unique ID of the workflowdata(object): Input parameters for the workflowgroup_id(string, optional): A custom identifier for grouping runstimeout(int, optional): Maximum time to wait for results in seconds (default: 300)polling_interval(int, optional): Time between polling attempts in seconds (default: 1)
Usage:
workflow_id = "bncu0a1kipv"
# Simple usage
inputs = {
"text_51536": "Prompt test",
"image_1234": open("path/to/image.png", "rb")
}
# This will wait for the output, up to 5 minutes by default
result = flowscale.execute_workflow_async(workflow_id, inputs)
print(f"Workflow Result: {result}")
# With custom timeout and polling interval
result = flowscale.execute_workflow_async(
workflow_id,
inputs,
timeout=600, # 10 minutes
polling_interval=5 # Check every 5 seconds
)
# With multiple dynamic image parameters
inputs = {
"prompt_text": "A beautiful landscape",
"image_35728": open("reference1.png", "rb"),
"image_42561": open("reference2.jpg", "rb"),
"style_image_789": "path/to/style.png"
}
result = flowscale.execute_workflow_async(workflow_id, inputs, group_id="batch_001")
New Response Format: The method now returns a comprehensive response with all outputs:
{
"run_id": "808f34d0-ef97-4b78-a00f-1268077ea6db",
"outputs": {
"filename_prefix_58358_5WWF7GQUYF": {
"status": "success",
"data": {
"download_url": "https://runs.s3.amazonaws.com/generations/...",
"generation_status": "success"
}
}
},
"total_outputs": 1,
"status": "completed"
}
Polling Behavior: The method now:
- Executes the workflow to get a
run_id - Polls
/api/v1/runs/{run_id}to get run status andoutput_names - Once output names are available, polls each output until completion
- Returns all outputs when complete or times out
Note: If the workflow doesn't complete within the timeout period, an exception will be raised.
4. get_output(filename)
Description:
Fetch the output of a completed workflow using its filename. Outputs typically include downloadable files or results.
Parameters:
filename(string): The name of the output file.
Usage:
output = flowscale.get_output("filename_prefix_58358_5WWF7GQUYF.png")
print(f"Workflow Output: {output}")
Response Example:
{
"status": "success",
"data": {
"download_url": "https://runs.s3.amazonaws.com/generations/...",
"generation_status": "success"
}
}
5. cancel_run(run_id)
Description:
Cancel a running workflow execution using its unique run_id.
Parameters:
run_id(string): The unique identifier of the running workflow.
Usage:
result = flowscale.cancel_run("808f34d0-ef97-4b78-a00f-1268077ea6db")
print(f"Cancellation Result: {result}")
Response Example:
{
"status": "success",
"data": "Run cancelled successfully"
}
6. get_run(run_id)
Description: Retrieve detailed information about a specific workflow run.
Parameters:
run_id(string): The unique identifier of the run.
Usage:
run_details = flowscale.get_run("808f34d0-ef97-4b78-a00f-1268077ea6db")
print(f"Run Details: {run_details}")
Response Example:
{
"status": "success",
"data": {
"_id": "808f34d0-ef97-4b78-a00f-1268077ea6db",
"status": "completed",
"inputs": [
{
"path": "text_51536",
"value": "a man riding a bike"
}
],
"outputs": [
{
"filename": "filename_prefix_58358_5WWF7GQUYF.png",
"url": "https://runs.s3.amazonaws.com/generations/..."
}
],
"created_at": "2024-01-15T10:30:00Z",
"started_at": "2024-01-15T10:30:15Z",
"completed_at": "2024-01-15T10:32:45Z"
}
}
7. get_runs() - ✨ Enhanced with Pagination & Sorting
Description: Retrieve workflow runs with advanced pagination, sorting, and filtering capabilities. This method now supports comprehensive run management with metadata.
Parameters:
group_id(string, optional): Filter runs by group identifiersort_by(string, optional): Field to sort by -"created_at","started_at", or"completed_at"(default:"created_at")sort_order(string, optional): Sort order -"asc"or"desc"(default:"desc")page(int, optional): Page number starting from 1 (default:1)page_size(int, optional): Number of items per page, 1-100 (default:10)
Usage Examples:
# Basic usage - get recent runs
runs = flowscale.get_runs()
print(f"Recent Runs: {runs}")
# Filter by group
group_runs = flowscale.get_runs(group_id="test_group")
print(f"Group Runs: {group_runs}")
# Pagination with custom page size
paginated_runs = flowscale.get_runs(page=2, page_size=20)
print(f"Page 2 Runs: {paginated_runs}")
# Sort by completion time (oldest first)
completed_runs = flowscale.get_runs(
sort_by="completed_at",
sort_order="asc",
page_size=50
)
# Advanced filtering and sorting
filtered_runs = flowscale.get_runs(
group_id="production_batch",
sort_by="started_at",
sort_order="desc",
page=1,
page_size=25
)
New Response Format with Metadata:
{
"status": "success",
"meta": {
"sort_by": "created_at",
"sort_order": "desc",
"page": 1,
"group_id": "test_group",
"page_size": 10,
"total_count": 156,
"total_pages": 16
},
"data": {
"runs": [
{
"_id": "cc29a72d-75b9-4c7b-b991-ccaf2a04d6ea",
"team_id": "team_123",
"workflow_id": "bncu0a1kipv",
"group_id": "test_group",
"status": "completed",
"inputs": [
{
"path": "text_51536",
"value": "a man riding a bike",
"s3_key": null,
"url": null
}
],
"outputs": [
{
"filename": "filename_prefix_58358_G3DRLIVVYP.png",
"s3_key": "generations/...",
"url": "https://runs.s3.amazonaws.com/generations/..."
}
],
"created_at": "2024-01-15T10:30:00Z",
"started_at": "2024-01-15T10:30:15Z",
"completed_at": "2024-01-15T10:32:45Z"
}
]
}
}
Pagination Helper:
def get_all_runs(group_id=None, sort_by="created_at"):
"""Example function to get all runs across multiple pages"""
all_runs = []
page = 1
while True:
response = flowscale.get_runs(
group_id=group_id,
sort_by=sort_by,
page=page,
page_size=100 # Max page size
)
runs = response["data"]["runs"]
all_runs.extend(runs)
# Check if we've reached the last page
if page >= response["meta"]["total_pages"]:
break
page += 1
return all_runs
Advanced Usage Examples
Working with Multiple File Types
# Handle various file inputs
inputs = {
"text_prompt": "Generate an image",
"reference_image_1": open("ref1.png", "rb"), # File object
"reference_image_2": "/path/to/ref2.jpg", # File path
"style_image_789": open("style.png", "rb"), # Dynamic parameter name
"mask_image_456": "mask.png", # Another file path
"config_json": {"strength": 0.8, "steps": 20} # JSON data
}
result = flowscale.execute_workflow_async("workflow_id", inputs)
Batch Processing with Groups
import time
from pathlib import Path
def process_image_batch(image_folder, workflow_id):
"""Process multiple images in batches with grouping"""
image_files = list(Path(image_folder).glob("*.{png,jpg,jpeg}"))
group_id = f"batch_{int(time.time())}"
results = []
for i, image_path in enumerate(image_files):
inputs = {
"input_image": str(image_path),
"prompt": f"Process image {i+1}",
"batch_index": i
}
result = flowscale.execute_workflow_async(
workflow_id,
inputs,
group_id=group_id
)
results.append(result)
# Get all results for this batch
batch_runs = flowscale.get_runs(group_id=group_id, page_size=100)
return batch_runs
Error Handling and Retries
import time
def robust_workflow_execution(workflow_id, inputs, max_retries=3):
"""Execute workflow with retry logic"""
for attempt in range(max_retries):
try:
result = flowscale.execute_workflow_async(
workflow_id,
inputs,
timeout=600, # 10 minutes
polling_interval=2
)
return result
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
else:
raise
FastAPI Integration Example
The SDK includes a complete FastAPI server example that demonstrates all functionality:
from fastapi import FastAPI, UploadFile, File, Form
from flowscale import FlowscaleAPI
app = FastAPI()
flowscale = FlowscaleAPI(api_key, api_url)
@app.post("/execute_with_images")
async def execute_with_multiple_images(
workflow_id: str = Form(...),
image_35728: UploadFile = File(...), # Dynamic parameter names
image_42561: UploadFile = File(...), # Multiple images supported
text_prompt: str = Form("default prompt"),
group_id: str = Form(None),
timeout_ms: int = Form(600000), # 10 minutes
poll_interval_ms: int = Form(2000) # 2 seconds
):
"""Execute workflow with multiple dynamic image parameters"""
data = {
"image_35728": image_35728.file,
"image_42561": image_42561.file,
"text_prompt": text_prompt
}
result = flowscale.execute_workflow_async(
workflow_id,
data,
group_id=group_id,
timeout_ms=timeout_ms,
poll_interval_ms=poll_interval_ms
)
return result
@app.get("/runs")
async def get_runs_paginated(
group_id: str = None,
sort_by: str = "created_at",
sort_order: str = "desc",
page: int = 1,
page_size: int = 10
):
"""Get paginated runs with sorting"""
return flowscale.get_runs(group_id, sort_by, sort_order, page, page_size)
Best Practices
Environment Configuration
- Always store sensitive information such as API keys in environment variables.
- Use
.envfiles and libraries likepython-dotenvfor easy environment management.
File Handling
- File Objects: Use
open("file.png", "rb")for direct file uploads - File Paths: Pass string paths for automatic file detection and opening
- Dynamic Parameters: Match parameter names exactly as defined in your ComfyUI workflow
- Multiple Files: The SDK handles multiple file uploads seamlessly
Error Handling
- Wrap API calls in try-catch blocks to handle errors gracefully.
- Implement retry logic for network-related failures.
- Log errors for debugging and improve resilience.
Performance Optimization
- Use
execute_workflow_async()for long-running workflows - Implement proper pagination when fetching large numbers of runs
- Use appropriate polling intervals based on expected workflow duration
- Group related runs for better organization and batch processing
Testing and Debugging
- Test workflows in a development environment before deploying to production.
- Validate inputs to ensure they match the workflow requirements.
- Use the health check and queue status methods to monitor system status.
Changelog
v1.1.3 (Latest)
- ✨ Updated
execute_workflow_async()parameters to match JavaScript SDK- New parameters:
timeout_ms(default: 600000 = 10 minutes),poll_interval_ms(default: 2000 = 2 seconds) - Backward compatibility: old parameters (
timeout,polling_interval) still work with deprecation warnings
- New parameters:
- 🔄 Enhanced polling behavior - now polls run details to get output names instead of expecting them immediately
- 📊 Improved return type - returns comprehensive output data with run_id, all outputs, and status
- 🔧 Better error handling - enhanced run status checking and failure detection
- 📚 Updated documentation - reflects new parameter names and polling behavior
- 🚀 Updated FastAPI examples - uses new parameter names throughout
v1.1.2
- ✨ Enhanced
get_runs()with pagination, sorting, and filtering
v1.1.0
- ✨ Dynamic Image Parameters support for flexible workflow inputs
- ✨ Multiple Image Uploads in single workflow execution
- 🔧 Improved File Handling with better error handling and type detection
- 📝 Updated Response Types with comprehensive metadata
- 🐛 Fixed async file upload issues in FastAPI examples
- 📚 Enhanced Documentation with advanced usage examples
v1.0.0
- Initial release with core functionality
- Basic workflow execution and management
- Health monitoring and queue status
Support
For any questions or assistance:
- Join the Flowscale community on Discord
- Refer to the Flowscale Documentation
- Submit issues on GitHub
Simplify your workflow management with the Flowscale Python SDK. Happy coding! 🚀
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flowscale-1.1.3.tar.gz.
File metadata
- Download URL: flowscale-1.1.3.tar.gz
- Upload date:
- Size: 11.6 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bd62b09b19d6c3126864381b78acef78c31a93a324690cf55d962136297b2f6a
|
|
| MD5 |
b1487c44ffdfc4bc4a513ba4b9e42a8c
|
|
| BLAKE2b-256 |
77bece8c6ec27c6c51ef595320a3d2ce3640b70b3219818967552a46224cb85b
|
File details
Details for the file flowscale-1.1.3-py3-none-any.whl.
File metadata
- Download URL: flowscale-1.1.3-py3-none-any.whl
- Upload date:
- Size: 11.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a81ab860f9dcf6cb5d1fa4ab395f16508ff14ab2c6403ac2146348eece9a4b18
|
|
| MD5 |
fa7a042ec6ad98576931191e9f73967f
|
|
| BLAKE2b-256 |
2c6312917c824d0d9520a044331c487f1f4a30282e6ab2ea14ca04b92558c295
|