Python SDK for Inspectica Workflow Server - Activity worker implementation
Project description
@inspectica/workflow-sdk (Python)
Python SDK for Inspectica Workflow Server - Execute workflows and activities.
Installation
From PyPI (when published)
pip install inspectica-workflow-sdk
Local Development Install
# From the python-sdk directory
pip install -e .
# Or from the repo root
pip install -e ./packages/python-sdk
Quick Start
import logging
from workflow_sdk import Worker, workflow, activity
logging.basicConfig(level=logging.INFO)
# Define an activity
@activity.defn
async def say_hello(input_data: dict) -> dict:
name = input_data.get("name", "World")
return {"message": f"Hello, {name}!"}
# Define a workflow
@workflow.defn
class HelloWorldWorkflow:
@workflow.run
async def run(self, input_data: dict) -> dict:
result = await workflow.execute_activity(say_hello, input_data)
return result
# Create a worker with API key (get from server UI)
worker = Worker(
server_url="http://localhost:8080",
api_key="wk_abc123...",
)
# Register workflows and activities
worker.register(
workflows=[HelloWorldWorkflow],
activities=[say_hello],
)
# Start the worker
worker.run()
The worker will:
- Connect to the server with the API key
- Receive Kafka credentials and topic names automatically
- Send periodic heartbeats (visible in server UI)
- Gracefully disconnect on shutdown
Configuration
Worker Configuration
| Parameter | Type | Required | Description |
|---|---|---|---|
server_url |
str | Yes | Workflow server URL |
api_key |
str | Yes | Worker API key (from server UI) |
group_id |
str | No | Consumer group ID (default: auto-generated) |
max_concurrent_workflows |
int | No | Max concurrent workflows (default: 10) |
worker = Worker(
server_url="https://workflow.example.com",
api_key="wk_abc123...",
max_concurrent_workflows=10,
)
Activities
Activities are the building blocks of workflows. They represent individual units of work.
from workflow_sdk import activity
@activity.defn
async def my_activity(input_data: dict) -> dict:
# input_data is the JSON input from the workflow
# Return value is serialized to JSON
return {"result": "done"}
# Sync activities are also supported
@activity.defn
def sync_activity(input_data: dict) -> dict:
return {"upper": input_data.get("text", "").upper()}
Error Handling
Raise exceptions to signal activity failure:
@activity.defn
async def risky_activity(input_data: dict) -> dict:
if not input_data.get("valid"):
raise ValueError("Invalid input data")
result = await external_api_call(input_data)
return {"result": result}
Workflows
Workflows orchestrate activities and define the business logic.
from workflow_sdk import workflow, activity
@activity.defn
async def step1(input_data: dict) -> dict:
return {"result": input_data["data"] + "_step1"}
@activity.defn
async def step2(input_data: dict) -> dict:
return {"result": input_data["data"] + "_step2"}
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self, input_data: dict) -> dict:
# Execute activities sequentially
result1 = await workflow.execute_activity(step1, {"data": input_data["initial"]})
result2 = await workflow.execute_activity(step2, {"data": result1["result"]})
return {"final_result": result2["result"]}
Accessing Workflow Context
You can access the workflow context to get metadata (like access tokens):
from workflow_sdk import workflow
@workflow.defn
class AuthenticatedWorkflow:
@workflow.run
async def run(self, input_data: dict) -> dict:
ctx = workflow.get_current_context()
# Access metadata passed from the workflow submission
access_token = ctx.metadata.get("access_token")
workflow_id = ctx.workflow_id
# Use the token for authorization checks
# ...
return {"success": True}
Cross-Worker Activity Execution
You can call activities registered on other workers by using the activity name as a string:
@workflow.defn
class CrossWorkerWorkflow:
@workflow.run
async def run(self, input_data: dict) -> dict:
# Call an activity on another worker by name
result = await workflow.execute_activity(
"remote_activity_name", # Activity registered on another worker
{"data": input_data["data"]},
start_to_close_timeout_ms=120000, # timeout in ms
)
return result
Complete Example
import logging
import os
from workflow_sdk import Worker, workflow, activity
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
@activity.defn
async def say_hello(input_data: dict) -> dict:
name = input_data.get("name", "World")
return {"message": f"Hello, {name}!"}
@activity.defn
async def process_order(input_data: dict) -> dict:
order_id = input_data.get("order_id")
return {"order_id": order_id, "status": "processed"}
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, input_data: dict) -> dict:
greeting = await workflow.execute_activity(say_hello, {"name": "Customer"})
order = await workflow.execute_activity(process_order, input_data)
return {"greeting": greeting, "order": order}
worker = Worker(
server_url=os.environ.get("WORKFLOW_SERVER_URL", "http://localhost:8080"),
api_key=os.environ.get("WORKFLOW_API_KEY"),
)
worker.register(
workflows=[OrderWorkflow],
activities=[say_hello, process_order],
)
if __name__ == "__main__":
worker.run()
API Reference
Worker
class Worker:
def __init__(
self,
server_url: str,
api_key: str,
group_id: str | None = None,
max_concurrent_workflows: int = 10,
): ...
def register(
self,
workflows: list[type] | None = None,
activities: list[Callable] | None = None,
) -> None: ...
def run(self) -> None: ... # Blocking
async def stop(self) -> None: ...
Workflow Context
class WorkflowContext:
workflow_id: str
metadata: dict[str, Any]
activity_results: list[Any]
async def execute_activity(
self,
activity: Callable | str,
input_data: Any = None,
start_to_close_timeout_ms: int = 60000,
) -> Any: ...
Features
- Workflow Execution: Define and execute complex workflows with multiple activities
- Activity Replay: Automatic replay of completed activities on workflow resume
- Cross-Worker Routing: Execute activities on different workers via the server
- Concurrency Control: Limit concurrent workflow executions to prevent OOM
- AWS MSK IAM Support: Built-in support for AWS MSK with IAM authentication
- Graceful Shutdown: Proper cleanup on SIGINT/SIGTERM signals
- Heartbeat: Automatic heartbeat to keep worker connection alive
Development
# Create conda environment
conda create -n workflow-sdk python=3.11
conda activate workflow-sdk
# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Lint
ruff check .
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file inspectica_workflow_sdk-1.3.6.tar.gz.
File metadata
- Download URL: inspectica_workflow_sdk-1.3.6.tar.gz
- Upload date:
- Size: 27.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8aa8e09c2b3c41b533c7aa026b5ebdd1915a505b9c424ddd687d895dd5e6aa5f
|
|
| MD5 |
e4002d3ce2c81f608b0160fc14eafec7
|
|
| BLAKE2b-256 |
eba9565a280f4fb68cec2eeff3a14d780379aa3180231f56b14894274da56a2c
|
File details
Details for the file inspectica_workflow_sdk-1.3.6-py3-none-any.whl.
File metadata
- Download URL: inspectica_workflow_sdk-1.3.6-py3-none-any.whl
- Upload date:
- Size: 21.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8c08f715629f674f0814ca8f5e786be6e69455b2e477a0c05c5169b23a3b1152
|
|
| MD5 |
c75bf079aade896df6ab91fc776e8055
|
|
| BLAKE2b-256 |
41f6a829bad44459a90a06c5a66e8fa548a34c422109facfbe87520c65f6ca33
|