Skip to main content

Python SDK for Inspectica Workflow Server - Activity worker implementation

Project description

@inspectica/workflow-sdk (Python)

Python SDK for Inspectica Workflow Server - Execute workflows and activities.

Installation

From PyPI (when published)

pip install inspectica-workflow-sdk

Local Development Install

# From the python-sdk directory
pip install -e .

# Or from the repo root
pip install -e ./packages/python-sdk

Quick Start

import logging
from workflow_sdk import Worker, workflow, activity

logging.basicConfig(level=logging.INFO)

# Define an activity
@activity.defn
async def say_hello(input_data: dict) -> dict:
    name = input_data.get("name", "World")
    return {"message": f"Hello, {name}!"}

# Define a workflow
@workflow.defn
class HelloWorldWorkflow:
    @workflow.run
    async def run(self, input_data: dict) -> dict:
        result = await workflow.execute_activity(say_hello, input_data)
        return result

# Create a worker with API key (get from server UI)
worker = Worker(
    server_url="http://localhost:8080",
    api_key="wk_abc123...",
)

# Register workflows and activities
worker.register(
    workflows=[HelloWorldWorkflow],
    activities=[say_hello],
)

# Start the worker
worker.run()

The worker will:

  1. Connect to the server with the API key
  2. Receive Kafka credentials and topic names automatically
  3. Send periodic heartbeats (visible in server UI)
  4. Gracefully disconnect on shutdown

Configuration

Worker Configuration

Parameter Type Required Description
server_url str Yes Workflow server URL
api_key str Yes Worker API key (from server UI)
group_id str No Consumer group ID (default: auto-generated)
max_concurrent_workflows int No Max concurrent workflows (default: 10)
worker = Worker(
    server_url="https://workflow.example.com",
    api_key="wk_abc123...",
    max_concurrent_workflows=10,
)

Activities

Activities are the building blocks of workflows. They represent individual units of work.

from workflow_sdk import activity

@activity.defn
async def my_activity(input_data: dict) -> dict:
    # input_data is the JSON input from the workflow
    # Return value is serialized to JSON
    return {"result": "done"}

# Sync activities are also supported
@activity.defn
def sync_activity(input_data: dict) -> dict:
    return {"upper": input_data.get("text", "").upper()}

Error Handling

Raise exceptions to signal activity failure:

@activity.defn
async def risky_activity(input_data: dict) -> dict:
    if not input_data.get("valid"):
        raise ValueError("Invalid input data")
    
    result = await external_api_call(input_data)
    return {"result": result}

Workflows

Workflows orchestrate activities and define the business logic.

from workflow_sdk import workflow, activity

@activity.defn
async def step1(input_data: dict) -> dict:
    return {"result": input_data["data"] + "_step1"}

@activity.defn
async def step2(input_data: dict) -> dict:
    return {"result": input_data["data"] + "_step2"}

@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self, input_data: dict) -> dict:
        # Execute activities sequentially
        result1 = await workflow.execute_activity(step1, {"data": input_data["initial"]})
        result2 = await workflow.execute_activity(step2, {"data": result1["result"]})
        
        return {"final_result": result2["result"]}

Accessing Workflow Context

You can access the workflow context to get metadata (like access tokens):

from workflow_sdk import workflow

@workflow.defn
class AuthenticatedWorkflow:
    @workflow.run
    async def run(self, input_data: dict) -> dict:
        ctx = workflow.get_current_context()
        
        # Access metadata passed from the workflow submission
        access_token = ctx.metadata.get("access_token")
        workflow_id = ctx.workflow_id
        
        # Use the token for authorization checks
        # ...
        
        return {"success": True}

Cross-Worker Activity Execution

You can call activities registered on other workers by using the activity name as a string:

@workflow.defn
class CrossWorkerWorkflow:
    @workflow.run
    async def run(self, input_data: dict) -> dict:
        # Call an activity on another worker by name
        result = await workflow.execute_activity(
            "remote_activity_name",  # Activity registered on another worker
            {"data": input_data["data"]},
            start_to_close_timeout_ms=120000,  # timeout in ms
        )
        
        return result

Complete Example

import logging
import os
from workflow_sdk import Worker, workflow, activity

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)

@activity.defn
async def say_hello(input_data: dict) -> dict:
    name = input_data.get("name", "World")
    return {"message": f"Hello, {name}!"}

@activity.defn
async def process_order(input_data: dict) -> dict:
    order_id = input_data.get("order_id")
    return {"order_id": order_id, "status": "processed"}

@workflow.defn
class OrderWorkflow:
    @workflow.run
    async def run(self, input_data: dict) -> dict:
        greeting = await workflow.execute_activity(say_hello, {"name": "Customer"})
        order = await workflow.execute_activity(process_order, input_data)
        return {"greeting": greeting, "order": order}

worker = Worker(
    server_url=os.environ.get("WORKFLOW_SERVER_URL", "http://localhost:8080"),
    api_key=os.environ.get("WORKFLOW_API_KEY"),
)

worker.register(
    workflows=[OrderWorkflow],
    activities=[say_hello, process_order],
)

if __name__ == "__main__":
    worker.run()

API Reference

Worker

class Worker:
    def __init__(
        self,
        server_url: str,
        api_key: str,
        group_id: str | None = None,
        max_concurrent_workflows: int = 10,
    ): ...
    
    def register(
        self,
        workflows: list[type] | None = None,
        activities: list[Callable] | None = None,
    ) -> None: ...
    
    def run(self) -> None: ...  # Blocking
    
    async def stop(self) -> None: ...

Workflow Context

class WorkflowContext:
    workflow_id: str
    metadata: dict[str, Any]
    activity_results: list[Any]
    
    async def execute_activity(
        self,
        activity: Callable | str,
        input_data: Any = None,
        start_to_close_timeout_ms: int = 60000,
    ) -> Any: ...

Features

  • Workflow Execution: Define and execute complex workflows with multiple activities
  • Activity Replay: Automatic replay of completed activities on workflow resume
  • Cross-Worker Routing: Execute activities on different workers via the server
  • Concurrency Control: Limit concurrent workflow executions to prevent OOM
  • AWS MSK IAM Support: Built-in support for AWS MSK with IAM authentication
  • Graceful Shutdown: Proper cleanup on SIGINT/SIGTERM signals
  • Heartbeat: Automatic heartbeat to keep worker connection alive

Development

# Create conda environment
conda create -n workflow-sdk python=3.11
conda activate workflow-sdk

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Lint
ruff check .

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

inspectica_workflow_sdk-1.1.0.tar.gz (19.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

inspectica_workflow_sdk-1.1.0-py3-none-any.whl (18.6 kB view details)

Uploaded Python 3

File details

Details for the file inspectica_workflow_sdk-1.1.0.tar.gz.

File metadata

  • Download URL: inspectica_workflow_sdk-1.1.0.tar.gz
  • Upload date:
  • Size: 19.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.14

File hashes

Hashes for inspectica_workflow_sdk-1.1.0.tar.gz
Algorithm Hash digest
SHA256 d542c225fe304067342532afae68b2ef193e6a45d9643740b7e355b65f3a5819
MD5 4cbd3aa74df9acffcdd821a8ae814627
BLAKE2b-256 598dc1db2cd0e2da3e7e7c989d579450bfcc0daa3808bbddbe374dce3ee626de

See more details on using hashes here.

File details

Details for the file inspectica_workflow_sdk-1.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for inspectica_workflow_sdk-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 097e7f2ee44910dc99f32b94d6f342984f8c8f551539b34dde289a5572840f94
MD5 5ca402c05ca1d892b64e08d24eee8553
BLAKE2b-256 4e2d8ad8ffa2ad5cad3853c20a1afd7a81e06f62f3896bd524f071ee41f7c89b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page