Skip to main content

A Python library for communicating with the Flowscale APIs

Project description

Flowscale Python SDK

A comprehensive Python SDK designed to simplify interaction with the FlowScale ComfyUI API. This library abstracts away the complexities of API calls, enabling you to effortlessly invoke workflows, retrieve outputs, manage workflow runs, and monitor system health.

🚀 What's New in v1.1.2

  • 📚 Comprehensive Documentation: Completely updated README with advanced usage examples
  • ✨ Enhanced get_runs(): Pagination, sorting, and filtering with detailed examples
  • 🖼️ Dynamic Image Parameters: Support for dynamic parameter names (e.g., image_35728)
  • 📤 Multiple Image Uploads: Handle multiple images in a single workflow execution
  • 🔧 Improved File Handling: Better support for various file types and upload methods
  • 📊 Updated Response Types: New response structure with comprehensive metadata

Installation

Install the Flowscale SDK using pip:

pip install flowscale

Quick Start

Importing the SDK

To get started, import the Flowscale SDK into your project:

from flowscale import FlowscaleAPI
import os

# Initialize the SDK
api_key = os.environ.get("FLOWSCALE_API_KEY")
api_url = os.environ.get("FLOWSCALE_API_URL")

if not api_key or not api_url:
    print("FLOWSCALE_API_KEY or FLOWSCALE_API_URL not set in environment")
    exit(1)

flowscale = FlowscaleAPI(api_key, api_url)

Environment Variables: Add the following to your .env file:

FLOWSCALE_API_KEY=your-api-key
FLOWSCALE_API_URL=https://your-api-url.pod.flowscale.ai

SDK Methods

Below is a detailed guide to the SDK methods, including descriptions, usage, and response formats.

1. check_health()

Description: Check the health status of the Flowscale platform, including the status of containers and services.

Usage:

health = flowscale.check_health()
print(f"API Health: {health}")

Response Example:

{
  "status": "success",
  "data": [
    {
      "container": "container #1",
      "status": "idle"
    },
    {
      "container": "container #2",
      "status": "running"
    }
  ]
}

2. get_queue()

Description: Retrieve the current status of the workflow queue, including running and pending jobs.

Usage:

queue = flowscale.get_queue()
print(f"Queue Details: {queue}")

Response Example:

{
  "status": "success",
  "data": [
    {
      "container": "container #1",
      "queue": {
        "queue_running": [
          [
            0,
            "2a0babc4-acce-4521-9576-00fa0e6ecc91"
          ]
        ],
        "queue_pending": [
          [
            1,
            "5d60718a-7e89-4c64-b32d-0d1366b44e2a"
          ]
        ]
      }
    }
  ]
}

3. execute_workflow(workflow_id, data, group_id=None)

Description: Trigger a workflow execution using its unique workflow_id. Input data and an optional group_id can be provided for better organization and tracking.

Parameters:

  • workflow_id (string): The unique ID of the workflow.
  • data (object): Input parameters for the workflow.
  • group_id (string, optional): A custom identifier for grouping runs.

Usage:

workflow_id = "bncu0a1kipv"
group_id = "test_group"

# Basic usage with text inputs
inputs = {
   "text_51536": "Prompt test",
   "another_param": "some value"
}

# Advanced usage with files and dynamic parameters
inputs = {
   "text_51536": "Prompt test",
   "image_35728": open("path/to/image.png", "rb"),  # File object
   "image_42561": "path/to/another_image.jpg",      # File path (auto-detected)
   "video_1239": open("path/to/video.mp4", "rb")   # Multiple file types
}

result = flowscale.execute_workflow(workflow_id, inputs, group_id)
print(f"Workflow Result: {result}")

Dynamic Parameter Names: The SDK now supports dynamic parameter names that match your workflow's requirements:

# Example with ComfyUI dynamic parameter names
inputs = {
    "image_35728": open("input.png", "rb"),      # Dynamic image parameter
    "image_42561": "path/to/second_image.jpg",   # Another dynamic image
    "text_input_123": "Your prompt here",        # Dynamic text parameter
    "seed_456": 12345                            # Dynamic numeric parameter
}

Response Example:

{
  "status": "success",
  "data": {
    "run_id": "808f34d0-ef97-4b78-a00f-1268077ea6db",
    "workflow_id": "550e8400-e29b-41d4-a716-446655440000"
  }
}

3a. execute_workflow_async(workflow_id, data, group_id=None, timeout=300, polling_interval=1)

Description: Execute a workflow and automatically poll for its output until completion or timeout. This is a convenience method that combines execute_workflow and get_output with polling logic.

Parameters:

  • workflow_id (string): The unique ID of the workflow
  • data (object): Input parameters for the workflow
  • group_id (string, optional): A custom identifier for grouping runs
  • timeout (int, optional): Maximum time to wait for results in seconds (default: 300)
  • polling_interval (int, optional): Time between polling attempts in seconds (default: 1)

Usage:

workflow_id = "bncu0a1kipv"

# Simple usage
inputs = {
   "text_51536": "Prompt test",
   "image_1234": open("path/to/image.png", "rb")
}

# This will wait for the output, up to 5 minutes by default
result = flowscale.execute_workflow_async(workflow_id, inputs)
print(f"Workflow Result: {result}")

# With custom timeout and polling interval
result = flowscale.execute_workflow_async(
    workflow_id, 
    inputs,
    timeout=600,  # 10 minutes
    polling_interval=5  # Check every 5 seconds
)

# With multiple dynamic image parameters
inputs = {
    "prompt_text": "A beautiful landscape",
    "image_35728": open("reference1.png", "rb"),
    "image_42561": open("reference2.jpg", "rb"),
    "style_image_789": "path/to/style.png"
}

result = flowscale.execute_workflow_async(workflow_id, inputs, group_id="batch_001")

New Response Format: The method now returns a comprehensive response with all outputs:

{
  "run_id": "808f34d0-ef97-4b78-a00f-1268077ea6db",
  "workflow_id": "550e8400-e29b-41d4-a716-446655440000",
  "outputs": {
    "filename_prefix_58358_5WWF7GQUYF": {
      "status": "success",
      "data": {
        "download_url": "https://runs.s3.amazonaws.com/generations/...",
        "generation_status": "success"
      }
    }
  },
  "total_outputs": 1,
  "status": "completed"
}

Polling Behavior: The method now:

  1. Executes the workflow to get a run_id and workflow_id
  2. Polls /api/v1/runs/{run_id} to get run status and output_names
  3. Once output names are available, polls each output until completion
  4. Returns all outputs with run and workflow IDs when complete or times out

Note: If the workflow doesn't complete within the timeout period, an exception will be raised.


4. get_output(filename)

Description: Fetch the output of a completed workflow using its filename. Outputs typically include downloadable files or results.

Parameters:

  • filename (string): The name of the output file.

Usage:

output = flowscale.get_output("filename_prefix_58358_5WWF7GQUYF.png")
print(f"Workflow Output: {output}")

Response Example:

{
  "status": "success",
  "data": {
    "download_url": "https://runs.s3.amazonaws.com/generations/...",
    "generation_status": "success"
  }
}

5. cancel_run(run_id)

Description: Cancel a running workflow execution using its unique run_id.

Parameters:

  • run_id (string): The unique identifier of the running workflow.

Usage:

result = flowscale.cancel_run("808f34d0-ef97-4b78-a00f-1268077ea6db")
print(f"Cancellation Result: {result}")

Response Example:

{
  "status": "success",
  "data": "Run cancelled successfully"
}

6. get_run(run_id)

Description: Retrieve detailed information about a specific workflow run.

Parameters:

  • run_id (string): The unique identifier of the run.

Usage:

run_details = flowscale.get_run("808f34d0-ef97-4b78-a00f-1268077ea6db")
print(f"Run Details: {run_details}")

Response Example:

{
  "status": "success",
  "data": {
    "_id": "808f34d0-ef97-4b78-a00f-1268077ea6db",
    "status": "completed",
    "inputs": [
      {
        "path": "text_51536",
        "value": "a man riding a bike"
      }
    ],
    "outputs": [
      {
        "filename": "filename_prefix_58358_5WWF7GQUYF.png",
        "url": "https://runs.s3.amazonaws.com/generations/..."
      }
    ],
    "created_at": "2024-01-15T10:30:00Z",
    "started_at": "2024-01-15T10:30:15Z",
    "completed_at": "2024-01-15T10:32:45Z"
  }
}

7. get_runs() - ✨ Enhanced with Pagination & Sorting

Description: Retrieve workflow runs with advanced pagination, sorting, and filtering capabilities. This method now supports comprehensive run management with metadata.

Parameters:

  • group_id (string, optional): Filter runs by group identifier
  • sort_by (string, optional): Field to sort by - "created_at", "started_at", or "completed_at" (default: "created_at")
  • sort_order (string, optional): Sort order - "asc" or "desc" (default: "desc")
  • page (int, optional): Page number starting from 1 (default: 1)
  • page_size (int, optional): Number of items per page, 1-100 (default: 10)

Usage Examples:

# Basic usage - get recent runs
runs = flowscale.get_runs()
print(f"Recent Runs: {runs}")

# Filter by group
group_runs = flowscale.get_runs(group_id="test_group")
print(f"Group Runs: {group_runs}")

# Pagination with custom page size
paginated_runs = flowscale.get_runs(page=2, page_size=20)
print(f"Page 2 Runs: {paginated_runs}")

# Sort by completion time (oldest first)
completed_runs = flowscale.get_runs(
    sort_by="completed_at", 
    sort_order="asc",
    page_size=50
)

# Advanced filtering and sorting
filtered_runs = flowscale.get_runs(
    group_id="production_batch",
    sort_by="started_at",
    sort_order="desc",
    page=1,
    page_size=25
)

New Response Format with Metadata:

{
  "status": "success",
  "meta": {
    "sort_by": "created_at",
    "sort_order": "desc",
    "page": 1,
    "group_id": "test_group",
    "page_size": 10,
    "total_count": 156,
    "total_pages": 16
  },
  "data": {
    "runs": [
      {
        "_id": "cc29a72d-75b9-4c7b-b991-ccaf2a04d6ea",
        "team_id": "team_123",
        "workflow_id": "bncu0a1kipv",
        "group_id": "test_group",
        "status": "completed",
        "inputs": [
          {
            "path": "text_51536",
            "value": "a man riding a bike",
            "s3_key": null,
            "url": null
          }
        ],
        "outputs": [
          {
            "filename": "filename_prefix_58358_G3DRLIVVYP.png",
            "s3_key": "generations/...",
            "url": "https://runs.s3.amazonaws.com/generations/..."
          }
        ],
        "created_at": "2024-01-15T10:30:00Z",
        "started_at": "2024-01-15T10:30:15Z",
        "completed_at": "2024-01-15T10:32:45Z"
      }
    ]
  }
}

Pagination Helper:

def get_all_runs(group_id=None, sort_by="created_at"):
    """Example function to get all runs across multiple pages"""
    all_runs = []
    page = 1
    
    while True:
        response = flowscale.get_runs(
            group_id=group_id,
            sort_by=sort_by,
            page=page,
            page_size=100  # Max page size
        )
        
        runs = response["data"]["runs"]
        all_runs.extend(runs)
        
        # Check if we've reached the last page
        if page >= response["meta"]["total_pages"]:
            break
            
        page += 1
    
    return all_runs

Advanced Usage Examples

Working with Multiple File Types

# Handle various file inputs
inputs = {
    "text_prompt": "Generate an image",
    "reference_image_1": open("ref1.png", "rb"),     # File object
    "reference_image_2": "/path/to/ref2.jpg",        # File path  
    "style_image_789": open("style.png", "rb"),      # Dynamic parameter name
    "mask_image_456": "mask.png",                    # Another file path
    "config_json": {"strength": 0.8, "steps": 20}   # JSON data
}

result = flowscale.execute_workflow_async("workflow_id", inputs)

Batch Processing with Groups

import time
from pathlib import Path

def process_image_batch(image_folder, workflow_id):
    """Process multiple images in batches with grouping"""
    image_files = list(Path(image_folder).glob("*.{png,jpg,jpeg}"))
    group_id = f"batch_{int(time.time())}"
    
    results = []
    for i, image_path in enumerate(image_files):
        inputs = {
            "input_image": str(image_path),
            "prompt": f"Process image {i+1}",
            "batch_index": i
        }
        
        result = flowscale.execute_workflow_async(
            workflow_id, 
            inputs, 
            group_id=group_id
        )
        results.append(result)
        
    # Get all results for this batch
    batch_runs = flowscale.get_runs(group_id=group_id, page_size=100)
    return batch_runs

Error Handling and Retries

import time

def robust_workflow_execution(workflow_id, inputs, max_retries=3):
    """Execute workflow with retry logic"""
    for attempt in range(max_retries):
        try:
            result = flowscale.execute_workflow_async(
                workflow_id, 
                inputs,
                timeout=600,  # 10 minutes
                polling_interval=2
            )
            return result
            
        except Exception as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # Exponential backoff
            else:
                raise

FastAPI Integration Example

The SDK includes a complete FastAPI server example that demonstrates all functionality:

from fastapi import FastAPI, UploadFile, File, Form
from flowscale import FlowscaleAPI

app = FastAPI()
flowscale = FlowscaleAPI(api_key, api_url)

@app.post("/execute_with_images")
async def execute_with_multiple_images(
    workflow_id: str = Form(...),
    image_35728: UploadFile = File(...),      # Dynamic parameter names
    image_42561: UploadFile = File(...),      # Multiple images supported
    text_prompt: str = Form("default prompt"),
    group_id: str = Form(None),
    timeout_ms: int = Form(600000),           # 10 minutes
    poll_interval_ms: int = Form(2000)        # 2 seconds
):
    """Execute workflow with multiple dynamic image parameters"""
    data = {
        "image_35728": image_35728.file,
        "image_42561": image_42561.file,
        "text_prompt": text_prompt
    }
    
    result = flowscale.execute_workflow_async(
        workflow_id, 
        data, 
        group_id=group_id,
        timeout_ms=timeout_ms,
        poll_interval_ms=poll_interval_ms
    )
    return result

@app.get("/runs")
async def get_runs_paginated(
    group_id: str = None,
    sort_by: str = "created_at",
    sort_order: str = "desc",
    page: int = 1,
    page_size: int = 10
):
    """Get paginated runs with sorting"""
    return flowscale.get_runs(group_id, sort_by, sort_order, page, page_size)

Best Practices

Environment Configuration

  • Always store sensitive information such as API keys in environment variables.
  • Use .env files and libraries like python-dotenv for easy environment management.

File Handling

  • File Objects: Use open("file.png", "rb") for direct file uploads
  • File Paths: Pass string paths for automatic file detection and opening
  • Dynamic Parameters: Match parameter names exactly as defined in your ComfyUI workflow
  • Multiple Files: The SDK handles multiple file uploads seamlessly

Error Handling

  • Wrap API calls in try-catch blocks to handle errors gracefully.
  • Implement retry logic for network-related failures.
  • Log errors for debugging and improve resilience.

Performance Optimization

  • Use execute_workflow_async() for long-running workflows
  • Implement proper pagination when fetching large numbers of runs
  • Use appropriate polling intervals based on expected workflow duration
  • Group related runs for better organization and batch processing

Testing and Debugging

  • Test workflows in a development environment before deploying to production.
  • Validate inputs to ensure they match the workflow requirements.
  • Use the health check and queue status methods to monitor system status.

Changelog

v1.1.4 (Latest)

  • 🔄 Updated execute_workflow response format - now includes workflow_id in addition to run_id
  • Enhanced execute_workflow_async output - returns workflow_id along with run details
  • 📚 Updated documentation - reflects new response format across all examples

v1.1.3

  • Updated execute_workflow_async() parameters to match JavaScript SDK
    • New parameters: timeout_ms (default: 600000 = 10 minutes), poll_interval_ms (default: 2000 = 2 seconds)
    • Backward compatibility: old parameters (timeout, polling_interval) still work with deprecation warnings
  • 🔄 Enhanced polling behavior - now polls run details to get output names instead of expecting them immediately
  • 📊 Improved return type - returns comprehensive output data with run_id, all outputs, and status
  • 🔧 Better error handling - enhanced run status checking and failure detection
  • 📚 Updated documentation - reflects new parameter names and polling behavior
  • 🚀 Updated FastAPI examples - uses new parameter names throughout

v1.1.2

  • Enhanced get_runs() with pagination, sorting, and filtering

v1.1.0

  • Dynamic Image Parameters support for flexible workflow inputs
  • Multiple Image Uploads in single workflow execution
  • 🔧 Improved File Handling with better error handling and type detection
  • 📝 Updated Response Types with comprehensive metadata
  • 🐛 Fixed async file upload issues in FastAPI examples
  • 📚 Enhanced Documentation with advanced usage examples

v1.0.0

  • Initial release with core functionality
  • Basic workflow execution and management
  • Health monitoring and queue status

Support

For any questions or assistance:


Simplify your workflow management with the Flowscale Python SDK. Happy coding! 🚀

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowscale-1.1.4.tar.gz (11.6 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flowscale-1.1.4-py3-none-any.whl (11.4 kB view details)

Uploaded Python 3

File details

Details for the file flowscale-1.1.4.tar.gz.

File metadata

  • Download URL: flowscale-1.1.4.tar.gz
  • Upload date:
  • Size: 11.6 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.13

File hashes

Hashes for flowscale-1.1.4.tar.gz
Algorithm Hash digest
SHA256 178656110a2c45eb22c93f172480930704bf5fd39cdf72a3c80a93a83c101e9e
MD5 5fcbef85156d9e10bf79a4c93917df8b
BLAKE2b-256 f5668fe77430d4a32440e3e1ec524dd8208967b2c46dbee6174efa1ddda3b4b0

See more details on using hashes here.

File details

Details for the file flowscale-1.1.4-py3-none-any.whl.

File metadata

  • Download URL: flowscale-1.1.4-py3-none-any.whl
  • Upload date:
  • Size: 11.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.13

File hashes

Hashes for flowscale-1.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 d92d9424d286a27ae90a4f19f780893b210dc6fdf44cc970b131c4231582c886
MD5 31864418b282df7f71c2cd3cb482b13d
BLAKE2b-256 7556ab43d378dc0b4e59ac8e061544058d70c33e5fe2481074ffd1f825136a78

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page