Flowise integration for xLink identity-based authentication (low-code AI workflows, 603× faster M2M auth)
Project description
Private.Me xLink for Flowise (Python)
Identity-based M2M authentication for Flowise workflows.
Python bindings for @private.me/flowise. Build Flowise workflows that communicate securely using Ed25519 DID identity instead of API keys.
Installation
Prerequisites
This package requires both the Node.js module and Python bindings:
# 1. Install Node.js package
npm install @private.me/flowise flowise
# 2. Install Python bindings
pip install private-me-flowise
Requirements:
- Python 3.9+
- Node.js 18+ (for backend)
- npm (for Node.js package installation)
- Flowise 1.0+
Quick Start
from private_me import flowise_xlink
# Connect to xLink workflow agent
connection = await flowise_xlink.connect('data-pipeline')
# Execute workflow with identity authentication
result = await connection.call_workflow(
workflow='transform-data',
params={'source': 'database', 'output': 'api'}
)
print(result) # Workflow execution result
API Reference
connect(workflow_id: str) -> XLinkWorkflowConnection
Connect to xLink workflow agent.
Parameters:
workflow_id(str): Workflow identifier
Returns:
XLinkWorkflowConnection: Connection instance
Raises:
RuntimeError: If Node.js module not found
Example:
conn = await flowise_xlink.connect('etl-pipeline')
XLinkWorkflowConnection.call_workflow(workflow: str, params: Dict[str, Any], timeout: int = 30000) -> Dict[str, Any]
Execute a Flowise workflow with identity authentication.
Parameters:
workflow(str): Workflow name to executeparams(Dict[str, Any]): Workflow parameterstimeout(int): Timeout in milliseconds (default: 30000)
Returns:
Dict[str, Any]: Workflow execution result
Raises:
RuntimeError: If Node.js backend failsTimeoutError: If request times out
Example:
result = await connection.call_workflow(
workflow='data-transformation',
params={'input': 'raw_data.csv', 'format': 'json'},
timeout=60000 # 60 seconds
)
XLinkWorkflowConnection.send_message(to: str, payload: Dict[str, Any], timeout: int = 10000) -> Dict[str, Any]
Send message to another workflow agent.
Parameters:
to(str): Target workflow DIDpayload(Dict[str, Any]): Message payloadtimeout(int): Timeout in milliseconds (default: 10000)
Returns:
Dict[str, Any]: Acknowledgment or response
Raises:
RuntimeError: If Node.js backend failsTimeoutError: If request times out
Example:
result = await connection.send_message(
to='did:key:z6Mk...',
payload={'type': 'data-ready', 'dataset': 'sales_2024'},
timeout=5000
)
XLinkWorkflowConnection.receive_messages(timeout: int = 5000) -> List[Dict[str, Any]]
Receive pending messages from other workflow agents.
Parameters:
timeout(int): Timeout in milliseconds (default: 5000)
Returns:
List[Dict[str, Any]]: List of received messages
Raises:
RuntimeError: If Node.js backend fails
Example:
messages = await connection.receive_messages(timeout=10000)
for msg in messages:
print(f"From: {msg['from']}, Payload: {msg['payload']}")
XLinkWorkflowConnection.get_did() -> str
Get the workflow agent's decentralized identifier.
Returns:
str: DID in formatdid:key:z6Mk...
Example:
did = connection.get_did()
print(f"Workflow DID: {did}")
XLinkWorkflowConnection.get_audit_log() -> List[Dict[str, Any]]
Retrieve workflow execution audit trail.
Returns:
List[Dict[str, Any]]: Audit log entries
Example:
audit = connection.get_audit_log()
for entry in audit:
print(f"{entry['timestamp']}: {entry['workflow']} - {entry['status']}")
Flowise Integration
Basic Workflow Execution
from private_me import flowise_xlink
# Connect to workflow
workflow_conn = await flowise_xlink.connect('etl-pipeline')
# Execute data transformation
result = await workflow_conn.call_workflow(
workflow='extract-transform-load',
params={
'source': 'postgresql://db.example.com/analytics',
'destination': 's3://bucket/processed-data/',
'format': 'parquet'
}
)
print(f"Processed {result['records_processed']} records")
Multi-Workflow Coordination
from private_me import flowise_xlink
# Connect multiple workflows
etl = await flowise_xlink.connect('etl-pipeline')
reporting = await flowise_xlink.connect('reporting-pipeline')
alerting = await flowise_xlink.connect('alerting-pipeline')
# Orchestrate workflow chain
async def data_pipeline(dataset: str):
# Step 1: ETL
etl_result = await etl.call_workflow(
workflow='extract-transform-load',
params={'dataset': dataset}
)
# Step 2: Notify reporting workflow
await etl.send_message(
to=reporting.get_did(),
payload={
'type': 'data-ready',
'dataset': dataset,
'record_count': etl_result['records_processed']
}
)
# Step 3: Generate reports
report_result = await reporting.call_workflow(
workflow='generate-dashboard',
params={'dataset': dataset}
)
# Step 4: Send alerts if anomalies detected
if report_result.get('anomalies_detected', 0) > 0:
await alerting.send_message(
to='admin-notification-channel',
payload={
'type': 'anomaly-alert',
'dataset': dataset,
'anomalies': report_result['anomalies']
}
)
return report_result
result = await data_pipeline('sales_2024_q1')
Event-Driven Workflow Triggers
from private_me import flowise_xlink
import asyncio
# Create workflow agent
workflow = await flowise_xlink.connect('event-processor')
# Poll for incoming events
async def process_events():
while True:
messages = await workflow.receive_messages(timeout=10000)
for msg in messages:
if msg['payload']['type'] == 'data-upload':
# Trigger processing workflow
result = await workflow.call_workflow(
workflow='process-uploaded-data',
params={
'file_path': msg['payload']['file_path'],
'user_id': msg['payload']['user_id']
}
)
print(f"Processed upload: {result['status']}")
await asyncio.sleep(5) # Poll every 5 seconds
await process_events()
Policy-Enforced Workflows
from private_me import flowise_xlink
# Connect with policy enforcement
workflow = await flowise_xlink.connect('compliance-workflow')
# Configure policy (set via Node.js backend)
# Policy limits: maxNodeExecutions=100, timeout=60000ms
# Execute workflow - policy enforced automatically
result = await workflow.call_workflow(
workflow='compliance-check',
params={'document_id': 'doc-123'}
)
if 'error' in result:
print(f"Policy violation: {result['error']}")
else:
print(f"Compliance result: {result['status']}")
Workflow Audit and Monitoring
from private_me import flowise_xlink
# Connect to workflow
workflow = await flowise_xlink.connect('audit-enabled-workflow')
# Execute multiple workflows
await workflow.call_workflow('step-1', {'input': 'data'})
await workflow.call_workflow('step-2', {'input': 'processed'})
await workflow.call_workflow('step-3', {'input': 'final'})
# Retrieve audit trail
audit_log = workflow.get_audit_log()
for entry in audit_log:
print(f"[{entry['timestamp']}] Workflow: {entry['workflow']}")
print(f" Status: {entry['status']}")
print(f" Nodes Executed: {entry['nodes_executed']}")
print(f" Duration: {entry['duration_ms']}ms")
Architecture
This package uses a wrapper pattern:
Python App → Python Bindings → Node.js Backend → Flowise xLink → xLink Protocol
- Python layer: Provides Pythonic API (
connect(),call_workflow(),send_message()) - Node.js backend: Handles cryptographic operations (Ed25519, AES-256-GCM) + Flowise integration
- xLink protocol: Identity-based M2M authentication
- Flowise integration: Visual workflow execution with identity authentication
Troubleshooting
"Node.js module not found"
Error:
RuntimeError: Node.js module @private.me/flowise not found
Solution:
# Install Node.js package first
npm install @private.me/flowise flowise
# Verify installation
ls node_modules/@private.me/flowise
"Node.js backend error"
Error:
RuntimeError: Node.js backend error: <message>
Solution:
- Check Node.js version (requires 18+):
node --version - Verify Node.js package installed:
npm list @private.me/flowise - Check workflow ID is valid
- Review Node.js error message for details
Timeout errors
Error:
TimeoutError: Request timed out after 30000ms
Solution:
# Increase timeout for long-running workflows
result = await connection.call_workflow(
workflow='heavy-processing',
params=data,
timeout=120000 # 120 seconds (2 minutes)
)
Flowise connection errors
Error:
RuntimeError: Failed to connect to Flowise instance
Solution:
- Verify Flowise is running:
http://localhost:3000(default) - Check
FLOWISE_API_URLenvironment variable - Ensure workflow name exists in Flowise
- Check Flowise API authentication if enabled
Development
Running Tests
# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest -v
# Run with coverage
pytest -v --cov=private_me --cov-report=html
Building
# Build wheel
python setup.py bdist_wheel
# Validate build
bash validate-build.sh
Support
- Documentation: https://private.me/docs/flowise
- White Paper: https://private.me/docs/flowise.html
- Email: contact@private.me
- GitHub: https://github.com/xail-io/xail
License
Proprietary - See LICENSE.md
Questions? Visit private.me/docs/flowise for complete documentation.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file private_me_flowise-0.2.0.tar.gz.
File metadata
- Download URL: private_me_flowise-0.2.0.tar.gz
- Upload date:
- Size: 9.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
174b90bab8a773a336f1d4efab7f2fc9427a8ae1faf1ddfa37736d2298013aec
|
|
| MD5 |
3d73e79532eff193ee5fda1470a68d85
|
|
| BLAKE2b-256 |
9e485bc89cec809d3ddfc9d46714e8d20ab1c47a467c7ab885537efbb239e1bf
|
File details
Details for the file private_me_flowise-0.2.0-py3-none-any.whl.
File metadata
- Download URL: private_me_flowise-0.2.0-py3-none-any.whl
- Upload date:
- Size: 4.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6ac0f1f5354754910b276b1865881e07328b5cd8147c8d9f516e19a634415a4e
|
|
| MD5 |
ae11fe63c212aa635885aa91189c4790
|
|
| BLAKE2b-256 |
994170847b3522861838157e0e1b0765958603fed40f1d048a23e27ccd9fb3c6
|