Skip to main content

Flowise integration for xLink identity-based authentication (low-code AI workflows, 603× faster M2M auth)

Project description

Private.Me xLink for Flowise (Python)

Identity-based M2M authentication for Flowise workflows.

Python bindings for @private.me/flowise. Build Flowise workflows that communicate securely using Ed25519 DID identity instead of API keys.

Installation

Prerequisites

This package requires both the Node.js module and Python bindings:

# 1. Install Node.js package
npm install @private.me/flowise flowise

# 2. Install Python bindings
pip install private-me-flowise

Requirements:

  • Python 3.9+
  • Node.js 18+ (for backend)
  • npm (for Node.js package installation)
  • Flowise 1.0+

Quick Start

from private_me import flowise_xlink

# Connect to xLink workflow agent
connection = await flowise_xlink.connect('data-pipeline')

# Execute workflow with identity authentication
result = await connection.call_workflow(
    workflow='transform-data',
    params={'source': 'database', 'output': 'api'}
)

print(result)  # Workflow execution result

API Reference

connect(workflow_id: str) -> XLinkWorkflowConnection

Connect to xLink workflow agent.

Parameters:

  • workflow_id (str): Workflow identifier

Returns:

  • XLinkWorkflowConnection: Connection instance

Raises:

  • RuntimeError: If Node.js module not found

Example:

conn = await flowise_xlink.connect('etl-pipeline')

XLinkWorkflowConnection.call_workflow(workflow: str, params: Dict[str, Any], timeout: int = 30000) -> Dict[str, Any]

Execute a Flowise workflow with identity authentication.

Parameters:

  • workflow (str): Workflow name to execute
  • params (Dict[str, Any]): Workflow parameters
  • timeout (int): Timeout in milliseconds (default: 30000)

Returns:

  • Dict[str, Any]: Workflow execution result

Raises:

  • RuntimeError: If Node.js backend fails
  • TimeoutError: If request times out

Example:

result = await connection.call_workflow(
    workflow='data-transformation',
    params={'input': 'raw_data.csv', 'format': 'json'},
    timeout=60000  # 60 seconds
)

XLinkWorkflowConnection.send_message(to: str, payload: Dict[str, Any], timeout: int = 10000) -> Dict[str, Any]

Send message to another workflow agent.

Parameters:

  • to (str): Target workflow DID
  • payload (Dict[str, Any]): Message payload
  • timeout (int): Timeout in milliseconds (default: 10000)

Returns:

  • Dict[str, Any]: Acknowledgment or response

Raises:

  • RuntimeError: If Node.js backend fails
  • TimeoutError: If request times out

Example:

result = await connection.send_message(
    to='did:key:z6Mk...',
    payload={'type': 'data-ready', 'dataset': 'sales_2024'},
    timeout=5000
)

XLinkWorkflowConnection.receive_messages(timeout: int = 5000) -> List[Dict[str, Any]]

Receive pending messages from other workflow agents.

Parameters:

  • timeout (int): Timeout in milliseconds (default: 5000)

Returns:

  • List[Dict[str, Any]]: List of received messages

Raises:

  • RuntimeError: If Node.js backend fails

Example:

messages = await connection.receive_messages(timeout=10000)
for msg in messages:
    print(f"From: {msg['from']}, Payload: {msg['payload']}")

XLinkWorkflowConnection.get_did() -> str

Get the workflow agent's decentralized identifier.

Returns:

  • str: DID in format did:key:z6Mk...

Example:

did = connection.get_did()
print(f"Workflow DID: {did}")

XLinkWorkflowConnection.get_audit_log() -> List[Dict[str, Any]]

Retrieve workflow execution audit trail.

Returns:

  • List[Dict[str, Any]]: Audit log entries

Example:

audit = connection.get_audit_log()
for entry in audit:
    print(f"{entry['timestamp']}: {entry['workflow']} - {entry['status']}")

Flowise Integration

Basic Workflow Execution

from private_me import flowise_xlink

# Connect to workflow
workflow_conn = await flowise_xlink.connect('etl-pipeline')

# Execute data transformation
result = await workflow_conn.call_workflow(
    workflow='extract-transform-load',
    params={
        'source': 'postgresql://db.example.com/analytics',
        'destination': 's3://bucket/processed-data/',
        'format': 'parquet'
    }
)

print(f"Processed {result['records_processed']} records")

Multi-Workflow Coordination

from private_me import flowise_xlink

# Connect multiple workflows
etl = await flowise_xlink.connect('etl-pipeline')
reporting = await flowise_xlink.connect('reporting-pipeline')
alerting = await flowise_xlink.connect('alerting-pipeline')

# Orchestrate workflow chain
async def data_pipeline(dataset: str):
    # Step 1: ETL
    etl_result = await etl.call_workflow(
        workflow='extract-transform-load',
        params={'dataset': dataset}
    )

    # Step 2: Notify reporting workflow
    await etl.send_message(
        to=reporting.get_did(),
        payload={
            'type': 'data-ready',
            'dataset': dataset,
            'record_count': etl_result['records_processed']
        }
    )

    # Step 3: Generate reports
    report_result = await reporting.call_workflow(
        workflow='generate-dashboard',
        params={'dataset': dataset}
    )

    # Step 4: Send alerts if anomalies detected
    if report_result.get('anomalies_detected', 0) > 0:
        await alerting.send_message(
            to='admin-notification-channel',
            payload={
                'type': 'anomaly-alert',
                'dataset': dataset,
                'anomalies': report_result['anomalies']
            }
        )

    return report_result

result = await data_pipeline('sales_2024_q1')

Event-Driven Workflow Triggers

from private_me import flowise_xlink
import asyncio

# Create workflow agent
workflow = await flowise_xlink.connect('event-processor')

# Poll for incoming events
async def process_events():
    while True:
        messages = await workflow.receive_messages(timeout=10000)

        for msg in messages:
            if msg['payload']['type'] == 'data-upload':
                # Trigger processing workflow
                result = await workflow.call_workflow(
                    workflow='process-uploaded-data',
                    params={
                        'file_path': msg['payload']['file_path'],
                        'user_id': msg['payload']['user_id']
                    }
                )
                print(f"Processed upload: {result['status']}")

        await asyncio.sleep(5)  # Poll every 5 seconds

await process_events()

Policy-Enforced Workflows

from private_me import flowise_xlink

# Connect with policy enforcement
workflow = await flowise_xlink.connect('compliance-workflow')

# Configure policy (set via Node.js backend)
# Policy limits: maxNodeExecutions=100, timeout=60000ms

# Execute workflow - policy enforced automatically
result = await workflow.call_workflow(
    workflow='compliance-check',
    params={'document_id': 'doc-123'}
)

if 'error' in result:
    print(f"Policy violation: {result['error']}")
else:
    print(f"Compliance result: {result['status']}")

Workflow Audit and Monitoring

from private_me import flowise_xlink

# Connect to workflow
workflow = await flowise_xlink.connect('audit-enabled-workflow')

# Execute multiple workflows
await workflow.call_workflow('step-1', {'input': 'data'})
await workflow.call_workflow('step-2', {'input': 'processed'})
await workflow.call_workflow('step-3', {'input': 'final'})

# Retrieve audit trail
audit_log = workflow.get_audit_log()

for entry in audit_log:
    print(f"[{entry['timestamp']}] Workflow: {entry['workflow']}")
    print(f"  Status: {entry['status']}")
    print(f"  Nodes Executed: {entry['nodes_executed']}")
    print(f"  Duration: {entry['duration_ms']}ms")

Architecture

This package uses a wrapper pattern:

Python App → Python Bindings → Node.js Backend → Flowise xLink → xLink Protocol
  1. Python layer: Provides Pythonic API (connect(), call_workflow(), send_message())
  2. Node.js backend: Handles cryptographic operations (Ed25519, AES-256-GCM) + Flowise integration
  3. xLink protocol: Identity-based M2M authentication
  4. Flowise integration: Visual workflow execution with identity authentication

Troubleshooting

"Node.js module not found"

Error:

RuntimeError: Node.js module @private.me/flowise not found

Solution:

# Install Node.js package first
npm install @private.me/flowise flowise

# Verify installation
ls node_modules/@private.me/flowise

"Node.js backend error"

Error:

RuntimeError: Node.js backend error: <message>

Solution:

  • Check Node.js version (requires 18+): node --version
  • Verify Node.js package installed: npm list @private.me/flowise
  • Check workflow ID is valid
  • Review Node.js error message for details

Timeout errors

Error:

TimeoutError: Request timed out after 30000ms

Solution:

# Increase timeout for long-running workflows
result = await connection.call_workflow(
    workflow='heavy-processing',
    params=data,
    timeout=120000  # 120 seconds (2 minutes)
)

Flowise connection errors

Error:

RuntimeError: Failed to connect to Flowise instance

Solution:

  • Verify Flowise is running: http://localhost:3000 (default)
  • Check FLOWISE_API_URL environment variable
  • Ensure workflow name exists in Flowise
  • Check Flowise API authentication if enabled

Development

Running Tests

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest -v

# Run with coverage
pytest -v --cov=private_me --cov-report=html

Building

# Build wheel
python setup.py bdist_wheel

# Validate build
bash validate-build.sh

Support

License

Proprietary - See LICENSE.md


Questions? Visit private.me/docs/flowise for complete documentation.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

private_me_flowise-0.2.0.tar.gz (9.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

private_me_flowise-0.2.0-py3-none-any.whl (4.7 kB view details)

Uploaded Python 3

File details

Details for the file private_me_flowise-0.2.0.tar.gz.

File metadata

  • Download URL: private_me_flowise-0.2.0.tar.gz
  • Upload date:
  • Size: 9.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for private_me_flowise-0.2.0.tar.gz
Algorithm Hash digest
SHA256 174b90bab8a773a336f1d4efab7f2fc9427a8ae1faf1ddfa37736d2298013aec
MD5 3d73e79532eff193ee5fda1470a68d85
BLAKE2b-256 9e485bc89cec809d3ddfc9d46714e8d20ab1c47a467c7ab885537efbb239e1bf

See more details on using hashes here.

File details

Details for the file private_me_flowise-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for private_me_flowise-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6ac0f1f5354754910b276b1865881e07328b5cd8147c8d9f516e19a634415a4e
MD5 ae11fe63c212aa635885aa91189c4790
BLAKE2b-256 994170847b3522861838157e0e1b0765958603fed40f1d048a23e27ccd9fb3c6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page