Skip to main content

A Python library for communicating with the Flowscale APIs

Project description

Flowscale Python SDK

A comprehensive Python SDK designed to simplify interaction with the FlowScale ComfyUI API. This library abstracts away the complexities of API calls, enabling you to effortlessly invoke workflows, retrieve outputs, manage workflow runs, and monitor system health.


Installation Steps

Install the Flowscale SDK using pip:

pip install flowscale

For WebSocket support (optional):

pip install flowscale[websocket]

Quick Start

Importing the SDK

To get started, import the Flowscale SDK into your project:

from flowscale import FlowscaleAPI, FlowscaleConfig
import os

# Initialize the SDK with configuration
config: FlowscaleConfig = {
    'api_key': os.environ.get("FLOWSCALE_API_KEY"),
    'base_url': os.environ.get("FLOWSCALE_API_URL", "https://api.flowscale.ai"),
    'timeout': 6000,      # 6000 seconds default timeout
    'max_retries': 3,     # Maximum retry attempts
    'retry_delay': 1000,  # Delay between retries in milliseconds
}

api = FlowscaleAPI(config)

Environment Variables: Add the following to your .env file:

FLOWSCALE_API_KEY=your-api-key
FLOWSCALE_API_URL=https://your-api-url.pod.flowscale.ai

What's New in v1.1.0

  • Updated Constructor: Now uses a configuration dictionary similar to the npm SDK
  • Enhanced Retry Logic: Built-in retry mechanism with exponential backoff
  • Improved Async Workflow Execution: Better polling with status checking
  • New Methods: Added get_workflows() method
  • WebSocket Support: Placeholder for WebSocket functionality (requires additional package)
  • Better Error Handling: More detailed error messages and logging
  • Timeout Configuration: Configurable timeouts with sensible defaults

SDK Methods

Below is a detailed guide to the SDK methods, including descriptions, usage, and response formats.

1. check_health()

Description: Check the health status of the Flowscale platform, including the status of containers and services.

Usage:

health = api.check_health()
print(f"API Health: {health}")

Response Example:

{
  "status": "success",
  "data": [
    {
      "container": "container #1",
      "status": "idle"
    },
    {
      "container": "container #2",
      "status": "running"
    }
  ]
}

2. get_queue()

Description: Retrieve the current status of the workflow queue, including running and pending jobs.

Usage:

queue = api.get_queue()
print(f"Queue Details: {queue}")

Response Example:

{
  "status": "success",
  "data": [
    {
      "container": "container #1",
      "queue": {
        "queue_running": [
          [
            0,
            "2a0babc4-acce-4521-9576-00fa0e6ecc91"
          ]
        ],
        "queue_pending": [
          [
            1,
            "5d60718a-7e89-4c64-b32d-0d1366b44e2a"
          ]
        ]
      }
    }
  ]
}

3. execute_workflow(workflow_id, data, group_id=None)

Description: Trigger a workflow execution using its unique workflowId. Input data and an optional groupId can be provided for better organization and tracking.

Parameters:

  • workflowId (string): The unique ID of the workflow.
  • data (object): Input parameters for the workflow.
  • groupId (string, optional): A custom identifier for grouping runs.

Usage:

workflow_id = "bncu0a1kipv"
group_id = "test_group"

inputs = {
   "text_51536": "Prompt test",
   "image_1234": open("path/to/image.png", "rb"),
   "video_1239": "path/to/video.mp4"  # File path will be detected and opened
}

result = api.execute_workflow(workflow_id, inputs, group_id)
print(f"Workflow Result: {result}")

Response Example:

{
  "status": "success",
  "data": {
    "number": 0,
    "node_errors": {},
    "output_names": [
      "filename_prefix_58358_5WWF7GQUYF"
    ],
    "run_id": "808f34d0-ef97-4b78-a00f-1268077ea6db"
  }
}

3a. execute_workflow_async(workflow_id, data, group_id=None, timeout=300, polling_interval=2)

Description: Execute a workflow and automatically poll for its output until completion or timeout. This is a convenience method that combines execute_workflow and get_output with polling logic.

Parameters:

  • workflow_id (string): The unique ID of the workflow
  • data (object): Input parameters for the workflow
  • group_id (string, optional): A custom identifier for grouping runs
  • timeout (int, optional): Maximum time to wait for results in seconds (default: 300)
  • polling_interval (int, optional): Time between polling attempts in seconds (default: 2)

Usage:

workflow_id = "bncu0a1kipv"
inputs = {
   "text_51536": "Prompt test",
   "image_1234": open("path/to/image.png", "rb")
}
# This will wait for the output, up to 5 minutes by default
result = api.execute_workflow_async(workflow_id, inputs)
print(f"Workflow Result: {result}")

# With custom timeout and polling interval
result = flowscale.execute_workflow_async(
    workflow_id, 
    inputs,
    timeout=600,  # 10 minutes
    polling_interval=5  # Check every 5 seconds
)

Response Example:

{
  "status": "success",
  "data": {
    "download_url": "https://runs.s3.amazonaws.com/generations/...",
    "generation_status": "success"
  }
}

Note: If the workflow doesn't complete within the timeout period, an exception will be raised.


4. get_output(filename)

Description: Fetch the output of a completed workflow using its filename. Outputs typically include downloadable files or results.

Parameters:

  • filename (string): The name of the output file.

Usage:

output = flowscale.get_output("filename_prefix_58358_5WWF7GQUYF.png")
print(f"Workflow Output: {output}")

Response Example:

{
  "status": "success",
  "data": {
    "download_url": "https://runs.s3.amazonaws.com/generations/...",
    "generation_status": "success"
  }
}

5. cancel_run(run_id)

Description: Cancel a running workflow execution using its unique runId.

Parameters:

  • runId (string): The unique identifier of the running workflow.

Usage:

result = flowscale.cancel_run("808f34d0-ef97-4b78-a00f-1268077ea6db")
print(f"Cancellation Result: {result}")

Response Example:

{
  "status": "success",
  "data": "Run cancelled successfully"
}

6. get_run(run_id)

Description: Retrieve detailed information about a specific workflow run.

Parameters:

  • runId (string): The unique identifier of the run.

Usage:

run_details = flowscale.get_run("808f34d0-ef97-4b78-a00f-1268077ea6db")
print(f"Run Details: {run_details}")

Response Example:

{
  "status": "success",
  "data": {
    "_id": "808f34d0-ef97-4b78-a00f-1268077ea6db",
    "status": "completed",
    "inputs": [
      {
        "path": "text_51536",
        "value": "a man riding a bike"
      }
    ],
    "outputs": [
      {
        "filename": "filename_prefix_58358_5WWF7GQUYF.png",
        "url": "https://runs.s3.amazonaws.com/generations/..."
      }
    ]
  }
}

7. get_runs(group_id=None)

Description: Retrieve all workflow runs associated with a specific groupId. If no groupId is provided, all runs for the team are returned.

Parameters:

  • groupId (string, optional): The identifier for grouping runs.

Usage:

runs = flowscale.get_runs("test_group")
print(f"Runs for Group: {runs}")

# Get all runs for the team
all_runs = flowscale.get_runs()
print(f"All Runs: {all_runs}")

Response Example:

{
  "status": "success",
  "data": {
    "group_id": "test_group",
    "count": 2,
    "runs": [
      {
        "_id": "cc29a72d-75b9-4c7b-b991-ccaf2a04d6ea",
        "status": "completed",
        "outputs": [
          {
            "filename": "filename_prefix_58358_G3DRLIVVYP.png",
            "url": "https://runs.s3.amazonaws.com/generations/..."
          }
        ]
      }
    ]
  }
}

Best Practices

Environment Configuration

  • Always store sensitive information such as API keys in environment variables.
  • Use .env files and libraries like python-dotenv for easy environment management.

Error Handling

  • Wrap API calls in try-catch blocks to handle errors gracefully.
  • Log errors for debugging and improve resilience.

Testing and Debugging

  • Test workflows in a development environment before deploying to production.
  • Validate inputs to ensure they match the workflow requirements.

Support

For any questions or assistance, join the Flowscale community on Discord or refer to the Flowscale Documentation.


Simplify your workflow management with the Flowscale Python SDK. Happy coding!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowscale-1.1.0b1.tar.gz (12.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flowscale-1.1.0b1-py3-none-any.whl (10.1 kB view details)

Uploaded Python 3

File details

Details for the file flowscale-1.1.0b1.tar.gz.

File metadata

  • Download URL: flowscale-1.1.0b1.tar.gz
  • Upload date:
  • Size: 12.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.13

File hashes

Hashes for flowscale-1.1.0b1.tar.gz
Algorithm Hash digest
SHA256 d708249499a42d49a58b249c9b22b9335c51b0d7db2ddd4beb60b5d8aab68dab
MD5 10e50fa0db10c06cbd54db0e29260459
BLAKE2b-256 799f796ffcc5561e9baa65919772d7b2b48e77eefec4e08062df2e03643917ae

See more details on using hashes here.

File details

Details for the file flowscale-1.1.0b1-py3-none-any.whl.

File metadata

  • Download URL: flowscale-1.1.0b1-py3-none-any.whl
  • Upload date:
  • Size: 10.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.13

File hashes

Hashes for flowscale-1.1.0b1-py3-none-any.whl
Algorithm Hash digest
SHA256 29964e9486949427bc8c80000272b759a7e0132add1428d5f4102ee01aad0f2f
MD5 8862b31bd86d5029790410fd567caf12
BLAKE2b-256 c0ecaa6be42f2df8d0fcec3f6041e949e818527f333638a1fed02c0506c6a043

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page