Skip to main content

Python SDK for Inspectica Workflow Server - Activity worker implementation

Project description

@inspectica/workflow-sdk (Python)

Python SDK for Inspectica Workflow Server - Execute workflows and activities.

Installation

From PyPI (when published)

pip install inspectica-workflow-sdk

Local Development Install

# From the python-sdk directory
pip install -e .

# Or from the repo root
pip install -e ./packages/python-sdk

Quick Start

import logging
from workflow_sdk import Worker, workflow, activity

logging.basicConfig(level=logging.INFO)

# Define an activity
@activity.defn
async def say_hello(input_data: dict) -> dict:
    name = input_data.get("name", "World")
    return {"message": f"Hello, {name}!"}

# Define a workflow
@workflow.defn
class HelloWorldWorkflow:
    @workflow.run
    async def run(self, input_data: dict) -> dict:
        result = await workflow.execute_activity(say_hello, input_data)
        return result

# Create a worker with API key (get from server UI)
worker = Worker(
    server_url="http://localhost:8080",
    api_key="wk_abc123...",
)

# Register workflows and activities
worker.register(
    workflows=[HelloWorldWorkflow],
    activities=[say_hello],
)

# Start the worker
worker.run()

The worker will:

  1. Connect to the server with the API key
  2. Receive Kafka credentials and topic names automatically
  3. Send periodic heartbeats (visible in server UI)
  4. Gracefully disconnect on shutdown

Configuration

Worker Configuration

Parameter Type Required Description
server_url str Yes Workflow server URL
api_key str Yes Worker API key (from server UI)
group_id str No Consumer group ID (default: auto-generated)
max_concurrent_workflows int No Max concurrent workflows (default: 10)
worker = Worker(
    server_url="https://workflow.example.com",
    api_key="wk_abc123...",
    max_concurrent_workflows=10,
)

Activities

Activities are the building blocks of workflows. They represent individual units of work.

from workflow_sdk import activity

@activity.defn
async def my_activity(input_data: dict) -> dict:
    # input_data is the JSON input from the workflow
    # Return value is serialized to JSON
    return {"result": "done"}

# Sync activities are also supported
@activity.defn
def sync_activity(input_data: dict) -> dict:
    return {"upper": input_data.get("text", "").upper()}

Error Handling

Raise exceptions to signal activity failure:

@activity.defn
async def risky_activity(input_data: dict) -> dict:
    if not input_data.get("valid"):
        raise ValueError("Invalid input data")
    
    result = await external_api_call(input_data)
    return {"result": result}

Workflows

Workflows orchestrate activities and define the business logic.

from workflow_sdk import workflow, activity

@activity.defn
async def step1(input_data: dict) -> dict:
    return {"result": input_data["data"] + "_step1"}

@activity.defn
async def step2(input_data: dict) -> dict:
    return {"result": input_data["data"] + "_step2"}

@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self, input_data: dict) -> dict:
        # Execute activities sequentially
        result1 = await workflow.execute_activity(step1, {"data": input_data["initial"]})
        result2 = await workflow.execute_activity(step2, {"data": result1["result"]})
        
        return {"final_result": result2["result"]}

Accessing Workflow Context

You can access the workflow context to get metadata (like access tokens):

from workflow_sdk import workflow

@workflow.defn
class AuthenticatedWorkflow:
    @workflow.run
    async def run(self, input_data: dict) -> dict:
        ctx = workflow.get_current_context()
        
        # Access metadata passed from the workflow submission
        access_token = ctx.metadata.get("access_token")
        workflow_id = ctx.workflow_id
        
        # Use the token for authorization checks
        # ...
        
        return {"success": True}

Cross-Worker Activity Execution

You can call activities registered on other workers by using the activity name as a string:

@workflow.defn
class CrossWorkerWorkflow:
    @workflow.run
    async def run(self, input_data: dict) -> dict:
        # Call an activity on another worker by name
        result = await workflow.execute_activity(
            "remote_activity_name",  # Activity registered on another worker
            {"data": input_data["data"]},
            start_to_close_timeout_ms=120000,  # timeout in ms
        )
        
        return result

Complete Example

import logging
import os
from workflow_sdk import Worker, workflow, activity

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)

@activity.defn
async def say_hello(input_data: dict) -> dict:
    name = input_data.get("name", "World")
    return {"message": f"Hello, {name}!"}

@activity.defn
async def process_order(input_data: dict) -> dict:
    order_id = input_data.get("order_id")
    return {"order_id": order_id, "status": "processed"}

@workflow.defn
class OrderWorkflow:
    @workflow.run
    async def run(self, input_data: dict) -> dict:
        greeting = await workflow.execute_activity(say_hello, {"name": "Customer"})
        order = await workflow.execute_activity(process_order, input_data)
        return {"greeting": greeting, "order": order}

worker = Worker(
    server_url=os.environ.get("WORKFLOW_SERVER_URL", "http://localhost:8080"),
    api_key=os.environ.get("WORKFLOW_API_KEY"),
)

worker.register(
    workflows=[OrderWorkflow],
    activities=[say_hello, process_order],
)

if __name__ == "__main__":
    worker.run()

API Reference

Worker

class Worker:
    def __init__(
        self,
        server_url: str,
        api_key: str,
        group_id: str | None = None,
        max_concurrent_workflows: int = 10,
    ): ...
    
    def register(
        self,
        workflows: list[type] | None = None,
        activities: list[Callable] | None = None,
    ) -> None: ...
    
    def run(self) -> None: ...  # Blocking
    
    async def stop(self) -> None: ...

Workflow Context

class WorkflowContext:
    workflow_id: str
    metadata: dict[str, Any]
    activity_results: list[Any]
    
    async def execute_activity(
        self,
        activity: Callable | str,
        input_data: Any = None,
        start_to_close_timeout_ms: int = 60000,
    ) -> Any: ...

Features

  • Workflow Execution: Define and execute complex workflows with multiple activities
  • Activity Replay: Automatic replay of completed activities on workflow resume
  • Cross-Worker Routing: Execute activities on different workers via the server
  • Concurrency Control: Limit concurrent workflow executions to prevent OOM
  • AWS MSK IAM Support: Built-in support for AWS MSK with IAM authentication
  • Graceful Shutdown: Proper cleanup on SIGINT/SIGTERM signals
  • Heartbeat: Automatic heartbeat to keep worker connection alive

Development

# Create conda environment
conda create -n workflow-sdk python=3.11
conda activate workflow-sdk

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Lint
ruff check .

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

inspectica_workflow_sdk-1.2.0.tar.gz (19.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

inspectica_workflow_sdk-1.2.0-py3-none-any.whl (18.6 kB view details)

Uploaded Python 3

File details

Details for the file inspectica_workflow_sdk-1.2.0.tar.gz.

File metadata

  • Download URL: inspectica_workflow_sdk-1.2.0.tar.gz
  • Upload date:
  • Size: 19.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.14

File hashes

Hashes for inspectica_workflow_sdk-1.2.0.tar.gz
Algorithm Hash digest
SHA256 f2ff4f02c679cee1a7c027cf43d2f831f339c6e2d3f90e26a3bcfa677f77ef24
MD5 b5cbd7cbcc6bcc2916bb2506252a7ce6
BLAKE2b-256 fe5003c5f7af258948e24086e3c8847dac8146a452f141ed6111877f13c5ac44

See more details on using hashes here.

File details

Details for the file inspectica_workflow_sdk-1.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for inspectica_workflow_sdk-1.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 47573abab2b6c68f7e128ab408d304e1feff4a0bcee97bca77e4adad84680ab7
MD5 30f6266290d27dea18005d1da86c895b
BLAKE2b-256 75ec0bd0feb04fdf4ada6f5f0bcc8d90fa73981fef0b50226ec3f13f091e9a89

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page