Python SDK for Inspectica Workflow Server - Activity worker implementation
Project description
Workflow SDK (Python)
Python SDK for implementing activity workers with Inspectica Workflow Server.
Installation
From PyPI (when published)
pip install workflow-sdk
Local Development Install
# From the python-sdk directory
pip install -e .
# Or from the repo root
pip install -e ./packages/python-sdk
Quick Start (API Key Mode - Recommended)
import logging
from workflow_sdk import ActivityWorker, activity
logging.basicConfig(level=logging.INFO)
# Create a worker with API key (get from server UI at /workers-ui)
worker = ActivityWorker(
server_url="http://localhost:8080",
api_key="wk_abc123...",
task_queue="default",
)
# Define an activity
@activity.defn
async def say_hello(input_data: dict) -> dict:
name = input_data.get("name", "World")
return {"message": f"Hello, {name}!"}
# Register and run
worker.register(say_hello)
worker.run()
The worker will:
- Connect to the server with the API key
- Receive Kafka credentials and topic names automatically
- Send periodic heartbeats (visible in server UI)
- Gracefully disconnect on shutdown
Configuration
API Key Mode (Recommended)
| Parameter | Type | Required | Description |
|---|---|---|---|
server_url |
str | Yes | Workflow server URL |
api_key |
str | Yes | Worker API key (from server UI) |
task_queue |
str | Yes | Task queue name |
group_id |
str | No | Consumer group ID (default: workflow-worker-{task_queue}) |
worker = ActivityWorker(
server_url="https://workflow.example.com",
api_key="wk_abc123...",
task_queue="default",
)
Direct Kafka Mode (Legacy)
For cases where you need direct Kafka access without server handshake:
| Parameter | Type | Required | Description |
|---|---|---|---|
kafka_brokers |
str | Yes | Kafka bootstrap servers |
task_queue |
str | Yes | Task queue name |
task_topic |
str | Yes | Kafka topic for receiving tasks |
result_topic |
str | Yes | Kafka topic for sending results |
aws_msk_iam |
bool | No | Enable AWS MSK IAM authentication |
aws_region |
str | No | AWS region for MSK IAM |
worker = ActivityWorker(
kafka_brokers="b-1.mycluster.xxx.kafka.ap-east-1.amazonaws.com:9098",
task_queue="default",
task_topic="workflow-activity-default",
result_topic="workflow-activity-results",
aws_msk_iam=True,
aws_region="ap-east-1",
)
Activity Definition
Activities are functions decorated with @activity.defn:
from workflow_sdk import activity
@activity.defn
async def my_activity(input_data: dict) -> dict:
# input_data is the JSON input from the workflow
# Return value is serialized to JSON
return {"result": "done"}
# Sync activities are also supported
@activity.defn
def sync_activity(input_data: dict) -> dict:
return {"upper": input_data.get("text", "").upper()}
Error Handling
Raise exceptions to signal activity failure:
@activity.defn
async def risky_activity(input_data: dict) -> dict:
if not input_data.get("valid"):
raise ValueError("Invalid input data")
result = await external_api_call(input_data)
return {"result": result}
Complete Example
import logging
import os
from workflow_sdk import ActivityWorker, activity
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
@activity.defn
async def say_hello(input_data: dict) -> dict:
name = input_data.get("name", "World")
return {"message": f"Hello, {name}!"}
@activity.defn
async def process_order(input_data: dict) -> dict:
order_id = input_data.get("order_id")
return {"order_id": order_id, "status": "processed"}
worker = ActivityWorker(
server_url=os.environ.get("WORKFLOW_SERVER_URL", "http://localhost:8080"),
api_key=os.environ.get("WORKFLOW_API_KEY"),
task_queue="default",
)
worker.register(say_hello)
worker.register(process_order)
if __name__ == "__main__":
worker.run()
Graceful Shutdown
The worker handles SIGINT/SIGTERM for graceful shutdown:
worker.run() # Blocks until shutdown signal
Development
# Create conda environment
conda create -n workflow-sdk python=3.11
conda activate workflow-sdk
# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Lint
ruff check .
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file inspectica_workflow_sdk-0.1.0.tar.gz.
File metadata
- Download URL: inspectica_workflow_sdk-0.1.0.tar.gz
- Upload date:
- Size: 13.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bce5770f24684c05d4d27db01ecee1f4b53d4463c9fc927e7e2831bdfcc12a84
|
|
| MD5 |
49aa737928ef4d6155aa71a1d85a4bec
|
|
| BLAKE2b-256 |
cb994fb6adfca2a558bd01634ae98c7b08e80241b4f01dc07ab8cbd3ec71927c
|
File details
Details for the file inspectica_workflow_sdk-0.1.0-py3-none-any.whl.
File metadata
- Download URL: inspectica_workflow_sdk-0.1.0-py3-none-any.whl
- Upload date:
- Size: 14.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
415d794ce36091a58011d694a3843e003ece5296121643aa43bfd972326c28f3
|
|
| MD5 |
6cbd5a61f7283f0f9ec5e388c100ed35
|
|
| BLAKE2b-256 |
b28115e78313b2497b13b5028bde497b4fa338d7ef38fe81a5b123eca517bfe6
|