Python client for Witrium cloud browser automation API
Project description
Witrium Client
A Python client library for interacting with the Witrium API. Witrium is a cloud-based browser automation platform that allows you to create and execute web automations through a visual interface and control them programmatically via this client.
How Witrium Works
Witrium operates by spinning up browser instances in the cloud to execute predefined automations that you create through the Witrium UI. Here's the typical workflow:
- Create Automations via UI: You use the Witrium web interface to record and define your automations (workflows)
- Execute via API: You use this Python client to trigger those automations programmatically
- Cloud Execution: Witrium runs your automation in a real browser instance in the cloud
- Retrieve Results: You poll for results and handle the automation outcomes
Each workflow is identified by a unique workflow_id and can accept arguments to customize its execution.
Installation
pip install witrium
Quick Start
The snippet below shows the minimum you need to get up-and-running:
from witrium import SyncWitriumClient, WorkflowRunStatus
from witrium.types import WorkflowRunOptionsSchema, WaitUntilStateOptionsSchema
# 1. Provide your API endpoint & token (export these as env-vars in production)
api_token = "YOUR_WITRIUM_API_TOKEN" # Obtain from dashboard
with SyncWitriumClient(api_token=api_token) as client:
# 2. Kick-off the **login** workflow and keep the browser alive
login = client.run_workflow(
workflow_id="login-workflow-id", # This workflow performs the sign-in steps
options=WorkflowRunOptionsSchema(
args={"username": "user@example.com", "password": "secretPass!"},
keep_session_alive=True # 🔑 keep the browser running after login
)
)
# 3. Block until the browser is *ready for reuse*
client.wait_until_state(
run_id=login.run_id,
target_status=WorkflowRunStatus.RUNNING, # Wait until browser is alive
options=WaitUntilStateOptionsSchema(
all_instructions_executed=True # …and the last login step finished
)
)
# 4. Re-use that **same** browser session in a follow-up workflow
scrape = client.run_workflow(
workflow_id="dashboard-scrape-workflow-id",
options=WorkflowRunOptionsSchema(
args={"section": "sales"},
use_existing_session=login.run_id # 👈 same browser instance
)
)
# 5. Wait for the scrape to finish and collect the results
results = client.wait_until_state(
run_id=scrape.run_id,
target_status=WorkflowRunStatus.COMPLETED
)
print("Sales data:", results.result)
Workflow Lifecycle & Polling Essentials
client.run_workflow(...) only submits a job – the real browser work happens asynchronously in the cloud. Keep these steps in mind whenever you design multi-step automations:
- Submit – your call returns instantly with a
run_id. - Poll / Wait – use
wait_until_state()(orrun_workflow_and_wait()) to block until the run reaches: •WorkflowRunStatus.RUNNING– the browser has spun-up and is ready (handy when you enabledkeep_session_alive).
•WorkflowRunStatus.COMPLETED– the workflow has finished executing. - Chain or Fetch Results – once the target state is reached you can either run another workflow (chaining sessions) or read the data via
get_workflow_results().
When to wait for which state?
| Scenario | Recommended target_status |
Extra flags |
|---|---|---|
You saved state using preserve_state |
COMPLETED |
– |
You kept the session alive using keep_session_alive and intend to reuse it |
RUNNING |
all_instructions_executed=True |
⏳ Tip: For very long login flows (e.g. multi-factor auth) combine
min_wait_timewithpolling_intervalto reduce server load.
Concurrency vs. Serial Execution
• State Preservation (preserve_state) – Each follow-up workflow spins up its own browser. Scale horizontally & run many in parallel.
• Session Persistence (keep_session_alive) – All follow-up workflows share one browser instance. Run them serially (until multi-tab support lands).
Common Use Cases and Session Management
The Authentication Challenge
A common pattern in web automation involves authentication: you need to log into a service first, then perform actions in the authenticated session. Witrium provides two powerful approaches to handle this:
Approach 1: State Preservation (Concurrent-Friendly)
- Best for: Running multiple post-login automations concurrently
- How it works: Save browser state after login, then restore it in new browser instances
Approach 2: Session Persistence (Resource-Efficient)
- Best for: Sequential automations that need to share the exact same browser session
- How it works: Keep the browser alive after login, run subsequent automations in the same instance
Session Management Patterns
Pattern 1: Disconnected Sessions with State Preservation
This approach allows you to save the browser state (cookies, localStorage, etc.) after a login workflow and then restore that state in new browser instances for subsequent workflows.
Advantages:
- Multiple post-login workflows can run concurrently
- Each workflow gets its own browser instance
- Horizontal scaling of browser instances
- Robust isolation between concurrent executions
Use Case Example:
from witrium import SyncWitriumClient, WorkflowRunStatus
from witrium.types import WorkflowRunOptionsSchema
with SyncWitriumClient(api_token="your-api-token") as client:
# Step 1: Run login workflow and preserve the authenticated state
login_response = client.run_workflow(
workflow_id="login-workflow-id",
options=WorkflowRunOptionsSchema(
args={"username": "user@example.com", "password": "secure123"},
preserve_state="authenticated-session" # Save state with this name
)
)
# Step 2: Wait for login to complete
login_results = client.wait_until_state(
run_id=login_response.run_id,
target_status=WorkflowRunStatus.COMPLETED
)
# Step 3: Run multiple post-login workflows concurrently
# Each will spawn a new browser but restore the authenticated state
# Workflow A: Extract data from dashboard
dashboard_response = client.run_workflow(
workflow_id="dashboard-scraping-workflow-id",
options=WorkflowRunOptionsSchema(
args={"report_type": "monthly"},
use_states=["authenticated-session"] # Restore the saved state
)
)
# Workflow B: Update user profile (can run concurrently)
profile_response = client.run_workflow(
workflow_id="profile-update-workflow-id",
options=WorkflowRunOptionsSchema(
args={"new_email": "newemail@example.com"},
use_states=["authenticated-session"] # Same state, different browser instance
)
)
# Both workflows are now running concurrently in separate browser instances
# but both have access to the authenticated session
Pattern 2: Persistent Session with Keep-Alive
This approach keeps the browser instance alive after the login workflow completes, allowing subsequent workflows to run in the same browser session.
Advantages:
- More resource-efficient (reuses same browser instance)
- Maintains exact session continuity
- No need to restore state (session never ends)
- Faster execution for subsequent workflows
Limitations:
- Subsequent workflows must run serially (one after another)
- Cannot run multiple post-login workflows concurrently in the same session
Use Case Example:
from witrium import SyncWitriumClient, WorkflowRunStatus
from witrium.types import WorkflowRunOptionsSchema, WaitUntilStateOptionsSchema
with SyncWitriumClient(api_token="your-api-token") as client:
# Step 1: Run login workflow and keep the browser session alive
login_response = client.run_workflow(
workflow_id="login-workflow-id",
options=WorkflowRunOptionsSchema(
args={"username": "user@example.com", "password": "secure123"},
keep_session_alive=True # Keep browser instance running
)
)
# Step 2: Wait for login to complete and start running
# We wait for RUNNING status because the browser is kept alive
login_results = client.wait_until_state(
run_id=login_response.run_id,
target_status=WorkflowRunStatus.RUNNING,
options=WaitUntilStateOptionsSchema(
all_instructions_executed=True # Ensure login steps are done
)
)
# Step 3: Run subsequent workflows in the same browser session
# These must run serially, not concurrently
# Workflow A: Extract data from dashboard
dashboard_response = client.run_workflow(
workflow_id="dashboard-scraping-workflow-id",
options=WorkflowRunOptionsSchema(
args={"report_type": "monthly"},
use_existing_session=login_response.run_id # Use the live session
)
)
# Wait for dashboard workflow to complete before next one
dashboard_results = client.wait_until_state(
run_id=dashboard_response.run_id,
target_status=WorkflowRunStatus.COMPLETED
)
# Workflow B: Update user profile (must wait for previous to complete)
profile_response = client.run_workflow(
workflow_id="profile-update-workflow-id",
options=WorkflowRunOptionsSchema(
args={"new_email": "newemail@example.com"},
use_existing_session=login_response.run_id # Same live session
)
)
Choosing the Right Pattern
| Factor | State Preservation | Session Persistence |
|---|---|---|
| Concurrency | ✅ Multiple workflows can run simultaneously | ❌ Must run serially |
| Resource Usage | Higher (multiple browser instances) | ✅ Lower (single browser instance) |
| Isolation | ✅ Complete isolation between workflows | ❌ Shared session state |
| Setup Complexity | Medium (manage state names) | ✅ Simple (just workflow run IDs) |
| Use Case | Bulk data processing, parallel operations | Sequential workflows, state-dependent operations |
Complete Examples
Example 1: E-commerce Data Extraction (State Preservation)
from witrium import SyncWitriumClient, WorkflowRunStatus
from witrium.types import WorkflowRunOptionsSchema
import concurrent.futures
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def extract_category_data(client, category, state_name):
"""Extract data for a specific product category."""
try:
response = client.run_workflow(
workflow_id="category-scraper-workflow",
options=WorkflowRunOptionsSchema(
args={"category": category},
use_states=[state_name]
)
)
results = client.wait_until_state(
run_id=response.run_id,
target_status=WorkflowRunStatus.COMPLETED
)
return {"category": category, "data": results.result}
except Exception as e:
logger.error(f"Failed to extract {category}: {e}")
return {"category": category, "error": str(e)}
with SyncWitriumClient(api_token="your-api-token") as client:
# Step 1: Login and save state
logger.info("Logging into e-commerce platform...")
login_response = client.run_workflow(
workflow_id="ecommerce-login-workflow",
options=WorkflowRunOptionsSchema(
args={"email": "seller@example.com", "password": "secure123"},
preserve_state="ecommerce-authenticated"
)
)
# Wait for login completion
client.wait_until_state(
run_id=login_response.run_id,
target_status=WorkflowRunStatus.COMPLETED
)
logger.info("Login completed, state preserved")
# Step 2: Extract data from multiple categories concurrently
categories = ["electronics", "clothing", "home-garden", "books", "sports"]
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Submit all category extraction tasks concurrently
future_to_category = {
executor.submit(extract_category_data, client, category,
"ecommerce-authenticated"): category
for category in categories
}
results = []
for future in concurrent.futures.as_completed(future_to_category):
result = future.result()
results.append(result)
logger.info(f"Completed extraction for {result['category']}")
logger.info(f"Extracted data from {len(results)} categories")
for result in results:
if "error" in result:
logger.error(f"Error in {result['category']}: {result['error']}")
else:
logger.info(f"{result['category']}: {len(result['data'])} items extracted")
Example 2: Banking Workflow (Session Persistence)
from witrium import SyncWitriumClient, WorkflowRunStatus
from witrium.types import WorkflowRunOptionsSchema, WaitUntilStateOptionsSchema
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
with SyncWitriumClient(api_token="your-api-token") as client:
# Step 1: Secure login with 2FA
logger.info("Initiating secure banking login...")
login_response = client.run_workflow(
workflow_id="bank-login-with-2fa-workflow",
options=WorkflowRunOptionsSchema(
args={
"username": "customer123",
"password": "secure456",
"phone_number": "+1234567890" # For 2FA
},
keep_session_alive=True # Keep session for subsequent operations
)
)
# Wait for login and 2FA to complete
logger.info("Waiting for login and 2FA completion...")
login_results = client.wait_until_state(
run_id=login_response.run_id,
target_status=WorkflowRunStatus.RUNNING,
options=WaitUntilStateOptionsSchema(
all_instructions_executed=True,
min_wait_time=30 # 2FA usually takes some time
)
)
logger.info("Secure login completed")
# Step 2: Check account balances
logger.info("Checking account balances...")
balance_response = client.run_workflow(
workflow_id="check-balances-workflow",
options=WorkflowRunOptionsSchema(
args={"account_types": ["checking", "savings", "credit"]},
use_existing_session=login_response.run_id
)
)
balance_results = client.wait_until_state(
run_id=balance_response.run_id,
target_status=WorkflowRunStatus.COMPLETED
)
logger.info(f"Account balances retrieved: {balance_results.result}")
# Step 3: Download transaction history
logger.info("Downloading transaction history...")
transaction_response = client.run_workflow(
workflow_id="download-transactions-workflow",
options=WorkflowRunOptionsSchema(
args={
"date_range": "last_30_days",
"format": "csv",
"accounts": ["checking", "savings"]
},
use_existing_session=login_response.run_id
)
)
transaction_results = client.wait_until_state(
run_id=transaction_response.run_id,
target_status=WorkflowRunStatus.COMPLETED
)
logger.info("Transaction history downloaded")
# Step 4: Generate financial report
logger.info("Generating financial report...")
report_response = client.run_workflow(
workflow_id="generate-financial-report-workflow",
options=WorkflowRunOptionsSchema(
args={
"report_type": "monthly_summary",
"include_charts": True
},
use_existing_session=login_response.run_id
)
)
report_results = client.wait_until_state(
run_id=report_response.run_id,
target_status=WorkflowRunStatus.COMPLETED
)
logger.info("Financial report generated successfully")
logger.info("All banking operations completed in the same secure session")
Running Talents
In addition to workflows, you can execute "Talents" directly. Talents are pre-defined capabilities or simpler automation units that can be executed with specific arguments.
from witrium import SyncWitriumClient
from witrium.types import TalentRunOptionsSchema
with SyncWitriumClient(api_token="your-api-token") as client:
# Run a talent by ID with options
result = client.run_talent(
talent_id="talent-uuid",
options=TalentRunOptionsSchema(
args={"key": "value"},
# Optional parameters:
# files=[...],
# use_states=["state-id"],
# preserve_state="new-state-name"
)
)
# The result is a TalentResultSchema object
print(f"Status: {result.status}")
print(f"Result data: {result.result}")
if result.error_message:
print(f"Error: {result.error_message}")
Basic Usage
Synchronous Client
from witrium import SyncWitriumClient, WorkflowRunStatus
from witrium.types import (
WorkflowRunOptionsSchema,
RunWorkflowAndWaitOptionsSchema,
WaitUntilStateOptionsSchema
)
# Using with context manager (recommended)
with SyncWitriumClient(api_token="your-api-token") as client:
# Run a workflow and wait for results
results = client.run_workflow_and_wait(
workflow_id="workflow-uuid",
options=RunWorkflowAndWaitOptionsSchema(
args={"key1": "value1", "key2": 42},
polling_interval=5,
timeout=300
)
)
print(f"Workflow completed with status: {results.status}")
print(f"Results: {results.result}")
# Or run a workflow without waiting
response = client.run_workflow(
workflow_id="workflow-uuid",
options=WorkflowRunOptionsSchema(
args={"key1": "value1"}
)
)
print(f"Workflow run started: {response.run_id}")
# Get results later
results = client.get_workflow_results(
run_id=response.run_id
)
# Wait for workflow to start running
results = client.wait_until_state(
run_id=response.run_id,
target_status=WorkflowRunStatus.RUNNING
)
print(f"Workflow is now running: {results.status}")
Asynchronous Client
import asyncio
from witrium import AsyncWitriumClient, WorkflowRunStatus
from witrium.types import (
WorkflowRunOptionsSchema,
RunWorkflowAndWaitOptionsSchema
)
async def run_workflow():
# Using with async context manager (recommended)
async with AsyncWitriumClient(api_token="your-api-token") as client:
# Run a workflow and wait for results
results = await client.run_workflow_and_wait(
workflow_id="workflow-uuid",
options=RunWorkflowAndWaitOptionsSchema(
args={"key1": "value1", "key2": 42},
polling_interval=5,
timeout=300
)
)
print(f"Workflow completed with status: {results.status}")
print(f"Results: {results.result}")
# Or start a workflow and wait for it to begin running
response = await client.run_workflow(
workflow_id="workflow-uuid",
options=WorkflowRunOptionsSchema(
args={"key1": "value1"}
)
)
# Wait until workflow starts running
results = await client.wait_until_state(
run_id=response.run_id,
target_status=WorkflowRunStatus.RUNNING
)
print(f"Workflow is now running: {results.status}")
# Run the async function
asyncio.run(run_workflow())
Progress Tracking and Monitoring
Real-time Progress Tracking
import time
from tqdm import tqdm
from witrium import SyncWitriumClient, WorkflowRunStatus
from witrium.types import RunWorkflowAndWaitOptionsSchema
def create_progress_tracker():
"""Create a progress tracking function."""
pbar = tqdm(total=100, desc="Workflow Progress")
last_execution_count = 0
def update_progress(result):
nonlocal last_execution_count
# Get execution count
executions = result.executions or []
execution_count = len(executions)
# Update progress bar only if we have new executions
if execution_count > last_execution_count:
pbar.update(execution_count - last_execution_count)
last_execution_count = execution_count
# Update description based on status
pbar.set_description(f"Status: {result.status}")
# Show individual execution details
for execution in executions:
if execution.status == "C": # Completed
tqdm.write(f"✅ {execution.instruction}")
elif execution.status == "F": # Failed
tqdm.write(f"❌ {execution.instruction}: {execution.error_message}")
return update_progress, pbar
with SyncWitriumClient(api_token="your-api-token") as client:
progress_callback, progress_bar = create_progress_tracker()
try:
# Run workflow with progress tracking
result = client.run_workflow_and_wait(
workflow_id="workflow-uuid",
options=RunWorkflowAndWaitOptionsSchema(
args={"key1": "value1"},
on_progress=progress_callback
)
)
progress_bar.close()
print("Workflow completed!")
except Exception as e:
progress_bar.close()
print(f"Workflow failed: {e}")
Using Callbacks for Custom Monitoring
from witrium.types import RunWorkflowAndWaitOptionsSchema
# Define a custom progress callback
def monitor_workflow_progress(result):
"""Custom monitoring function."""
status = result.status
executions = result.executions or []
print(f"📊 Status: {status}, Executions: {len(executions)}")
# Log each execution step
for i, execution in enumerate(executions):
status_emoji = {
"P": "⏳", # Pending
"R": "🔄", # Running
"C": "✅", # Completed
"F": "❌", # Failed
}.get(execution.status, "❓")
print(f" {status_emoji} Step {i+1}: {execution.instruction}")
if execution.error_message:
print(f" ⚠️ Error: {execution.error_message}")
# Use the callback
with SyncWitriumClient(api_token="your-api-token") as client:
results = client.run_workflow_and_wait(
workflow_id="workflow-uuid",
options=RunWorkflowAndWaitOptionsSchema(
args={"key1": "value1"},
on_progress=monitor_workflow_progress
)
)
API Reference
SyncWitriumClient / AsyncWitriumClient
Initialization
SyncWitriumClient(
api_token: str, # API token for authentication
timeout: int = 60, # Request timeout in seconds
verify_ssl: bool = True # Whether to verify SSL certificates
)
Core Methods
run_workflow()
Execute a workflow in the Witrium platform.
run_workflow(
workflow_id: str, # Required: ID of the workflow to run
options: Optional[WorkflowRunOptionsSchema] = None # Optional workflow run options
) -> WorkflowRunSubmittedSchema
WorkflowRunOptionsSchema fields:
args: Optional[dict[str, str | int | float]] - Arguments to pass to the workflowfiles: Optional[List[FileUpload]] - Files to upload with the workflowuse_states: Optional[List[str]] - List of saved state names to restorepreserve_state: Optional[str] - Name to save the browser state asno_intelligence: bool = False - Disable AI assistancerecord_session: bool = False - Record the browser sessionkeep_session_alive: bool = False - Keep browser alive after completionuse_existing_session: Optional[str] - Workflow run ID of existing session to use
Session Management:
preserve_state: Save the browser state with this name after workflow completion. Other workflows can then restore this state usinguse_states.use_states: List of previously saved state names to restore at the start of this workflow.keep_session_alive: If True, keeps the browser instance running after workflow completion.use_existing_session: Run this workflow in an existing browser session (identified by workflow run ID).
run_talent()
Run a talent by ID.
run_talent(
talent_id: str, # Required: ID of the talent to run
options: Optional[TalentRunOptionsSchema] = None # Optional talent run options
) -> TalentResultSchema
TalentRunOptionsSchema fields:
args: Optional[dict[str, Any]] - Arguments to pass to the talentfiles: Optional[List[FileUpload]] - Files to upload with the talentuse_states: Optional[List[str]] - List of saved state names to restorepreserve_state: Optional[str] - Name to save the browser state askeep_session_alive: bool = False - Keep browser alive after completionuse_existing_session: Optional[str] - Workflow run ID of existing session to use
wait_until_state()
Wait for a workflow run to reach a specific status.
wait_until_state(
run_id: str, # The workflow run ID to wait for
target_status: str, # Target status (e.g., WorkflowRunStatus.RUNNING)
options: Optional[WaitUntilStateOptionsSchema] = None # Optional wait options
) -> WorkflowRunResultsSchema
WaitUntilStateOptionsSchema fields:
all_instructions_executed: bool = False - Also wait for all executions to completemin_wait_time: int = 0 - Minimum seconds to wait before polling startspolling_interval: int = 2 - Seconds between polling attemptstimeout: int = 60 - Maximum seconds to wait
Key Parameters:
target_status: UseWorkflowRunStatusconstants (PENDING, RUNNING, COMPLETED, FAILED, CANCELLED)all_instructions_executed: When True, also waits for all individual execution steps to completemin_wait_time: Useful for long-running workflows to reduce unnecessary polling
run_workflow_and_wait()
Run a workflow and wait for results by polling until completion.
run_workflow_and_wait(
workflow_id: str, # Required: ID of the workflow to run
options: Optional[RunWorkflowAndWaitOptionsSchema] = None # Optional run and wait options
) -> Union[WorkflowRunResultSchema, List[WorkflowRunResultSchema]]
RunWorkflowAndWaitOptionsSchema fields:
Inherits all fields from WorkflowRunOptionsSchema, plus:
polling_interval: int = 5 - Seconds to wait between polling attemptstimeout: int = 300 - Maximum seconds to poll before timing outreturn_intermediate_results: bool = False - If True, returns list of all polled resultson_progress: Optional[Callable] - Callback function called with each intermediate result
Other Methods
get_workflow_results(run_id): Get current results of a workflow runcancel_run(run_id): Cancel a workflow run and clean up associated resourcesclose(): Close the HTTP client (called automatically with context manager)
Status Constants
WorkflowRunStatus
WorkflowRunStatus.PENDING # "P" - Workflow is queued
WorkflowRunStatus.RUNNING # "R" - Workflow is executing
WorkflowRunStatus.COMPLETED # "C" - Workflow finished successfully
WorkflowRunStatus.FAILED # "F" - Workflow failed
WorkflowRunStatus.CANCELLED # "X" - Workflow was cancelled
# Helper lists
WorkflowRunStatus.TERMINAL_STATUSES # [COMPLETED, FAILED, CANCELLED]
AgentExecutionStatus
AgentExecutionStatus.PENDING # "P" - Execution step is queued
AgentExecutionStatus.RUNNING # "R" - Execution step is running
AgentExecutionStatus.COMPLETED # "C" - Execution step completed
AgentExecutionStatus.FAILED # "F" - Execution step failed
AgentExecutionStatus.CANCELLED # "X" - Execution step cancelled
Response Schemas
WorkflowRunSubmittedSchema
{
"workflow_id": str,
"run_id": str, # Use this for polling and session management
"status": str
}
WorkflowRunResultsSchema
{
"workflow_id": str,
"run_id": str,
"status": str,
"started_at": Optional[str],
"completed_at": Optional[str],
"message": Optional[str],
"executions": List[AgentExecutionSchema], # Individual execution steps
"result": Optional[dict | list], # Final workflow result
"result_format": Optional[str],
"error_message": Optional[str]
}
AgentExecutionSchema
{
"status": str,
"instruction_order": int,
"instruction": str,
"result": Optional[dict | list],
"result_format": Optional[str],
"error_message": Optional[str]
}
Option Schemas
WorkflowRunOptionsSchema
{
"args": Optional[dict[str, str | int | float]],
"files": Optional[List[FileUpload]],
"use_states": Optional[List[str]],
"preserve_state": Optional[str],
"no_intelligence": bool = False,
"record_session": bool = False,
"keep_session_alive": bool = False,
"use_existing_session": Optional[str]
}
TalentRunOptionsSchema
{
"args": Optional[dict[str, Any]],
"files": Optional[List[FileUpload]],
"use_states": Optional[List[str]],
"preserve_state": Optional[str],
"keep_session_alive": bool = False,
"use_existing_session": Optional[str]
}
WaitUntilStateOptionsSchema
{
"all_instructions_executed": bool = False,
"min_wait_time": int = 0,
"polling_interval": int = 2,
"timeout": int = 60
}
RunWorkflowAndWaitOptionsSchema
Extends WorkflowRunOptionsSchema with additional fields:
{
# All WorkflowRunOptionsSchema fields, plus:
"polling_interval": int = 5,
"timeout": int = 300,
"return_intermediate_results": bool = False,
"on_progress": Optional[Callable]
}
Exception Handling
from witrium import WitriumClientException
from witrium.types import RunWorkflowAndWaitOptionsSchema
try:
result = client.run_workflow_and_wait(
workflow_id="my-workflow",
options=RunWorkflowAndWaitOptionsSchema(
args={"key": "value"}
)
)
except WitriumClientException as e:
print(f"Witrium API error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Cancelling Workflow Runs
You can cancel a workflow run that is in progress:
from witrium import SyncWitriumClient
from witrium.types import WorkflowRunOptionsSchema
with SyncWitriumClient(api_token="your-api-token") as client:
# Start a workflow
response = client.run_workflow(
workflow_id="long-running-workflow",
options=WorkflowRunOptionsSchema(
args={"parameter": "value"}
)
)
# Later, decide to cancel it
cancel_result = client.cancel_run(run_id=response.run_id)
print(f"Workflow cancelled with status: {cancel_result.status}")
This is particularly useful for:
- Long-running workflows that are no longer needed
- Error recovery scenarios
- Resource management (freeing up browser sessions)
- User-initiated cancellations in interactive applications
Best Practices
1. Always Use Context Managers
# ✅ Good - Automatically closes connections
with SyncWitriumClient(api_token=token) as client:
results = client.run_workflow(...)
# ❌ Bad - Manual cleanup required
client = SyncWitriumClient(api_token=token)
results = client.run_workflow(...)
client.close() # Easy to forget!
2. Choose the Right Session Management Pattern
from witrium.types import WorkflowRunOptionsSchema
# ✅ For concurrent operations - use state preservation
for category in categories:
client.run_workflow(
workflow_id="scraper",
options=WorkflowRunOptionsSchema(
args={"category": category},
use_states=["logged-in-state"] # Each runs in new browser
)
)
# ✅ For sequential operations - use session persistence
login_run_id = client.run_workflow(
"login-workflow",
options=WorkflowRunOptionsSchema(keep_session_alive=True)
).run_id
client.wait_until_state(login_run_id, target_status=WorkflowRunStatus.RUNNING)
client.run_workflow(
"next-workflow",
options=WorkflowRunOptionsSchema(use_existing_session=login_run_id)
) # Same browser
3. Implement Proper Error Handling
from witrium.types import RunWorkflowAndWaitOptionsSchema
def run_workflow_with_retry(client, workflow_id, args, max_retries=3):
"""Run workflow with retry logic."""
for attempt in range(max_retries):
try:
return client.run_workflow_and_wait(
workflow_id=workflow_id,
options=RunWorkflowAndWaitOptionsSchema(
args=args,
timeout=300
)
)
except WitriumClientException as e:
if attempt == max_retries - 1:
raise
logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying...")
time.sleep(2 ** attempt) # Exponential backoff
4. Use Appropriate Timeouts
from witrium.types import RunWorkflowAndWaitOptionsSchema
# ✅ Adjust timeouts based on workflow complexity
quick_results = client.run_workflow_and_wait(
workflow_id="simple-data-extraction",
options=RunWorkflowAndWaitOptionsSchema(
timeout=60 # Simple workflows
)
)
complex_results = client.run_workflow_and_wait(
workflow_id="complex-multi-page-workflow",
options=RunWorkflowAndWaitOptionsSchema(
timeout=600 # Complex workflows need more time
)
)
5. Monitor Progress for Long-Running Workflows
from witrium.types import RunWorkflowAndWaitOptionsSchema
# ✅ Use callbacks for visibility into long-running processes
def log_progress(result):
completed_steps = sum(1 for ex in result.executions if ex.status == "C")
total_steps = len(result.executions)
logger.info(f"Progress: {completed_steps}/{total_steps} steps completed")
client.run_workflow_and_wait(
workflow_id="long-running-workflow",
options=RunWorkflowAndWaitOptionsSchema(
on_progress=log_progress,
polling_interval=10 # Poll less frequently for long workflows
)
)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file witrium-0.5.1.tar.gz.
File metadata
- Download URL: witrium-0.5.1.tar.gz
- Upload date:
- Size: 31.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
467672a6bbe72d58e909a213f5b74887fada2230f622081e9a451a62808ac3cc
|
|
| MD5 |
3d39541b4f786d23c5fb2e777d04ce21
|
|
| BLAKE2b-256 |
112c186ee4da1a022a8e574bf3c47ee35fe50d457f58de546edc77811689b4e9
|
File details
Details for the file witrium-0.5.1-py3-none-any.whl.
File metadata
- Download URL: witrium-0.5.1-py3-none-any.whl
- Upload date:
- Size: 16.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e10aeb048790c5738d1376456e026726988a653ad36a895fede1e2c42f4c7b51
|
|
| MD5 |
40360890d16c2ce50d7d41c0e8cd3378
|
|
| BLAKE2b-256 |
2bbc9132cc0d7a9c66471d364e17997126138310401a0f343f64a22ebb898641
|