Skip to main content

A Client for the workflow engine of the ProcessCube platform.

Project description

ProcessCube® Python Client

Python-Client-Bibliothek für die ProcessCube®-Workflow-Engine (Atlas Engine).

Ermöglicht die Anbindung an die ProcessCube® Engine aus Python-Anwendungen heraus — sowohl asynchron (für ExternalTasks, Notifications) als auch synchron (für Robot Framework und Scripting).

Installation

pip install processcube_client

Unterstützte Python-Versionen: 3.11, 3.12, 3.13

Schnellstart

from processcube_client import ExternalTaskClient

def handle_task(payload):
    print(f"Verarbeite: {payload}")
    return {"ergebnis": "fertig"}

client = ExternalTaskClient("http://localhost:56100")
client.subscribe_to_external_task_topic("meinTopic", handle_task)
client.start()

Architektur

Der Client ist in zwei Schichten aufgebaut:

graph TB
    subgraph "Schicht 2 — Domain-Clients (async)"
        ETClient["ExternalTaskClient<br/><i>Long-Polling + Worker</i>"]
        PDClient["ProcessDefinitionClient<br/><i>Prozesse starten</i>"]
        PIClient["ProcessInstanceClient<br/><i>Instanzen verwalten</i>"]
        UTClient["UserTaskClient<br/><i>User Tasks bearbeiten</i>"]
        EvClient["EventClient<br/><i>Messages & Signals</i>"]
        NotClient["NotificationClient<br/><i>Event-Subscriptions</i>"]
        FNIClient["FlowNodeInstanceClient<br/><i>Flow-Node-Events</i>"]
        AIClient["AppInfoClient<br/><i>Engine-Info</i>"]
    end

    subgraph "Schicht 1 — Low-Level HTTP"
        BaseAsync["BaseClient<br/><i>async, aiohttp</i>"]
        BaseSync["BaseClient<br/><i>sync, requests</i>"]
    end

    subgraph "Synchroner Aggregations-Client"
        SyncClient["Client<br/><i>core/api — für Robot Framework</i>"]
    end

    ETClient --> BaseAsync
    PDClient --> BaseAsync
    PIClient --> BaseAsync
    UTClient --> BaseAsync
    EvClient --> BaseAsync
    NotClient --> BaseAsync
    FNIClient --> BaseAsync
    AIClient --> BaseAsync

    SyncClient --> BaseSync

    CF["ClientFactory"] --> ETClient
    CF --> PDClient
    CF --> PIClient
    CF --> UTClient
    CF --> EvClient
    CF --> NotClient
    CF --> FNIClient
    CF --> AIClient

Alle Clients kommunizieren mit der Engine über REST-Endpunkte unter:

{engine_url}/atlas_engine/api/v1/{endpoint}

ExternalTasks

Konzept

ExternalTasks sind Service Tasks in einem BPMN-Prozess, deren Logik außerhalb der Engine ausgeführt wird. Die Engine stellt die Aufgabe bereit, ein externer Worker holt sie ab, verarbeitet sie und meldet das Ergebnis zurück.

Dieses Muster ermöglicht:

  • Entkopplung — Die Geschäftslogik lebt im Python-Code, nicht in der Engine
  • Skalierung — Mehrere Worker können dasselbe Topic parallel bedienen
  • Technologiefreiheit — Der Worker kann beliebige Bibliotheken und Services nutzen

Ablauf: Fetch-and-Lock-Zyklus

Der ExternalTaskClient verwendet Long Polling, um auf neue Aufgaben zu warten. Die Engine hält die Verbindung offen, bis entweder ein Task verfügbar ist oder das Timeout erreicht wird.

sequenceDiagram
    participant W as ExternalTaskClient<br/>(Worker)
    participant E as ProcessCube® Engine

    loop Endlosschleife pro Topic
        W->>+E: POST /external_tasks/fetch_and_lock<br/>{ topicName, workerId, maxTasks,<br/>  longPollingTimeout, lockDuration }
        Note over E: Engine hält Verbindung offen<br/>(Long Polling) bis Task<br/>verfügbar oder Timeout

        alt Task(s) verfügbar
            E-->>-W: [ ExternalTask, ... ]

            par Für jeden ExternalTask
                Note over W: Handler aufrufen mit payload

                alt Handler erfolgreich
                    W->>E: PUT /external_tasks/{id}/finish<br/>{ workerId, result }
                    E-->>W: 200 OK
                else FunctionalError
                    W->>E: PUT /external_tasks/{id}/error<br/>{ workerId, error: { errorCode, errorMessage } }
                    E-->>W: 200 OK
                else Technischer Fehler
                    W->>E: PUT /external_tasks/{id}/error<br/>{ workerId, error: { errorCode, errorMessage } }
                    E-->>W: 200 OK
                end
            end

        else Timeout (keine Tasks)
            E-->>W: [ ]
        end
    end

Lock-Verlängerung

Wenn die Verarbeitung länger dauert als die Lock-Duration, verlängert der Worker den Lock automatisch. Der Timer wird bei 90% der Lock-Duration ausgelöst und wiederholt sich, bis der Task abgeschlossen ist.

sequenceDiagram
    participant W as ExternalTaskWorker
    participant E as ProcessCube® Engine

    Note over W: Task gelockt<br/>lock_duration_in_ms = 60000

    par Task-Verarbeitung
        Note over W: Handler wird ausgeführt...
    and Lock-Verlängerung (alle 54s)
        loop Alle 90% der lock_duration
            W->>E: PUT /external_tasks/{id}/extend_lock<br/>{ workerId, additionalDuration }
            E-->>W: 200 OK
        end
    end

    Note over W: Handler fertig
    W->>E: PUT /external_tasks/{id}/finish
    Note over W: Lock-Timer wird gestoppt

ExternalTask-Lebenszyklus

stateDiagram-v2
    [*] --> Wartend: BPMN-Prozess erreicht<br/>ExternalTask-Node

    Wartend --> Gelockt: Worker ruft<br/>fetch_and_lock auf

    Gelockt --> Gelockt: Lock wird verlängert<br/>(extend_lock)

    Gelockt --> Abgeschlossen: Worker meldet<br/>Ergebnis (finish)

    Gelockt --> Fehler: Worker meldet<br/>Fehler (error)

    Gelockt --> Wartend: Lock abgelaufen<br/>(Timeout)

    Fehler --> Wartend: Engine setzt Task<br/>erneut auf wartend

    Abgeschlossen --> [*]: Prozess läuft weiter

API

ExternalTaskClient

from processcube_client import ExternalTaskClient

client = ExternalTaskClient(url, session=None, identity=None, loop=None, **kwargs)
Parameter Beschreibung
url Engine-URL, z.B. "http://localhost:56100"
identity Optional: Dict {"token": "..."} oder Callable das ein solches Dict liefert
loop Optional: Eigener asyncio Event Loop
worker_id Optional (in kwargs): Eigene Worker-ID (Default: UUID)

Topic abonnieren

client.subscribe_to_external_task_topic(topic, handler, **options)
Option Beschreibung Default
max_tasks Maximale Anzahl gleichzeitig abgeholter Tasks 10
long_polling_timeout_in_ms Timeout für Long Polling in Millisekunden 10000 (10s)
lock_duration_in_ms Dauer des Locks in Millisekunden 100000 (100s)
payload_filter Filter für den Task-Payload None

Starten und Stoppen

# Startet den Event Loop und verarbeitet Tasks (blockierend)
client.start()

# Stoppt den Client
client.stop()

Handler

Der Handler ist eine Funktion, die vom Worker für jeden ExternalTask aufgerufen wird. Er empfängt den Payload und gibt ein Ergebnis-Dictionary zurück.

Einfacher Handler (nur Payload)

def handle_task(payload):
    name = payload.get("name", "Welt")
    return {"greeting": f"Hallo {name}!"}

Erweiterter Handler (Payload + ExternalTask)

Wenn der Handler einen zweiten Parameter akzeptiert, erhält er zusätzlich das gesamte ExternalTask-Objekt mit Metadaten:

def handle_task(payload, external_task):
    task_id = external_task["id"]
    correlation_id = external_task["correlationId"]
    process_instance_id = external_task["processInstanceId"]

    print(f"Task {task_id} in Prozess {process_instance_id}")
    return {"status": "verarbeitet"}

Das ExternalTask-Objekt enthält:

Feld Beschreibung
id Eindeutige Task-ID
workerId ID des Workers, der den Task gelockt hat
topic Topic-Name
flowNodeInstanceId ID der Flow-Node-Instanz
correlationId Korrelations-ID des Prozesses
processDefinitionId ID der Prozessdefinition
processInstanceId ID der Prozessinstanz
payload Task-Payload (Dictionary)
state Status: pending oder finished
lockExpirationTime Zeitpunkt, wann der Lock abläuft
createdAt Erstellungszeitpunkt

Async-Handler

Async-Handler werden ebenfalls unterstützt:

import aiohttp

async def handle_task(payload):
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.example.com/{payload['id']}") as resp:
            data = await resp.json()
    return {"result": data}

Fehlerbehandlung

FunctionalError (BPMN-Fehler)

Ein FunctionalError signalisiert einen fachlichen Fehler, der im BPMN-Prozess behandelt werden kann (z.B. über einen Error Boundary Event):

from processcube_client import ExternalTaskClient
from processcube_client.external_task import FunctionalError

def handle_task(payload):
    if not payload.get("email"):
        raise FunctionalError(
            code="VALIDATION_ERROR",
            message="E-Mail-Adresse fehlt",
            details="Das Feld 'email' ist erforderlich"
        )
    return {"status": "ok"}
flowchart LR
    A[ExternalTask-Handler] -->|FunctionalError| B[Engine: Error Boundary Event]
    A -->|Exception| C[Engine: Technischer Fehler]
    A -->|return result| D[Engine: Task abgeschlossen]

    B --> E[BPMN-Fehlerbehandlung]
    C --> F[Task wird als fehlerhaft markiert]
    D --> G[Prozess läuft weiter]

Technische Fehler

Jede andere Exception wird als technischer Fehler an die Engine gemeldet. Der Fehlercode ist der Exception-Typ, die Nachricht enthält den Stacktrace:

def handle_task(payload):
    # Jede unbehandelte Exception wird automatisch als
    # technischer Fehler an die Engine gemeldet
    result = 1 / 0  # ZeroDivisionError → technischer Fehler
    return result

Vollständiges Beispiel

import logging
from processcube_client import ExternalTaskClient
from processcube_client.external_task import FunctionalError

logging.basicConfig(level=logging.INFO)

ENGINE_URL = "http://localhost:56100"

def bestellung_pruefen(payload):
    """Prüft eine Bestellung und gibt das Ergebnis zurück."""
    artikel = payload.get("artikel", [])

    if not artikel:
        raise FunctionalError(
            code="LEERE_BESTELLUNG",
            message="Bestellung enthält keine Artikel"
        )

    gesamtpreis = sum(a.get("preis", 0) * a.get("menge", 1) for a in artikel)

    return {
        "gesamtpreis": gesamtpreis,
        "artikelanzahl": len(artikel),
        "status": "geprueft"
    }

def versand_vorbereiten(payload, external_task):
    """Bereitet den Versand vor — nutzt ExternalTask-Metadaten."""
    print(f"Versand für Prozess: {external_task['processInstanceId']}")
    return {"versand_status": "vorbereitet"}

client = ExternalTaskClient(ENGINE_URL)

client.subscribe_to_external_task_topic(
    "BestellungPruefen",
    bestellung_pruefen,
    max_tasks=5,
    long_polling_timeout_in_ms=30000,
    lock_duration_in_ms=60000
)

client.subscribe_to_external_task_topic(
    "VersandVorbereiten",
    versand_vorbereiten
)

client.start()

Mehrere Topics mit ClientFactory

from processcube_client import ClientFactory

factory = ClientFactory()
client = factory.create_external_task_client("http://localhost:56100")

et_client = client.subscribe_to_external_task_topic("TopicA", handler_a)
et_client.start()

Weitere Clients

ProcessDefinitionClient

Prozesse starten und Definitionen verwalten.

from processcube_client import ClientFactory

factory = ClientFactory()
client = factory.create_process_definition_client("http://localhost:56100")

# Prozess starten und auf Ende warten
result = client.start_process_instance_and_await_end_event(
    "MeinProzess",
    start_event_id="StartEvent_1",
    initial_token={"eingabe": "wert"}
)
Methode Beschreibung
start_process_instance(process_model_id, **options) Startet Prozess, gibt sofort zurück
start_process_instance_and_await_end_event(process_model_id, **options) Startet Prozess und wartet auf Ende
start_process_instance_and_await_specific_end_event(process_model_id, end_event_id=..., **options) Wartet auf bestimmtes End-Event
get_process_definition(process_model_id) Gibt die Prozessdefinition zurück

ProcessInstanceClient

Laufende Prozessinstanzen verwalten.

client = factory.create_process_instance_client("http://localhost:56100")

client.terminate(process_instance_id)
client.retry(process_instance_id)
Methode Beschreibung
terminate(process_instance_id) Bricht eine Prozessinstanz ab
retry(process_instance_id) Wiederholt eine fehlgeschlagene Instanz

UserTaskClient

User Tasks abfragen und bearbeiten.

client = factory.create_user_task_client("http://localhost:56100")

tasks = client.get_user_tasks(state="suspended")
for task in tasks:
    print(f"Task: {task['name']}")

client.finish_user_task(user_task_instance_id, {"approved": True})
Methode Beschreibung
get_user_tasks(state='suspended') Alle User Tasks im angegebenen Status
reserve_user_task(id, owner_id) Reserviert einen Task für einen Benutzer
cancel_reservation_user_task(id) Hebt die Reservierung auf
finish_user_task(id, answer) Schließt einen Task mit Ergebnis ab

EventClient

BPMN-Events auslösen.

client = factory.create_event_client("http://localhost:56100")

# Message-Event auslösen
client.trigger_message("BestellungEingegangen", payload={"order_id": "123"})

# Signal-Event auslösen
client.trigger_signal("NotfallStop")
Methode Beschreibung
trigger_message(event_name, payload={}, process_instance_id=None) Message-Event senden
trigger_signal(signal_name) Signal-Event broadcasten

NotificationClient

Echtzeit-Benachrichtigungen über Prozess-Events per Long Polling.

client = factory.create_notification_client("http://localhost:56100")

client.on_process_started(lambda event: print(f"Prozess gestartet: {event}"))
client.on_process_ended(lambda event: print(f"Prozess beendet: {event}"))
client.on_user_task_waiting(lambda event: print(f"User Task wartet: {event}"))

client.start()
Methode Beschreibung
on_process_started(callback) Prozess gestartet
on_process_ended(callback) Prozess beendet
on_process_error(callback) Prozess-Fehler
on_activity_reached(callback) Aktivität erreicht
on_activity_finished(callback) Aktivität abgeschlossen
on_user_task_waiting(callback) User Task wartet
on_user_task_finished(callback) User Task abgeschlossen
on_user_task_reserved(callback) User Task reserviert
on_user_task_reservation_canceled(callback) Reservierung aufgehoben
on_manual_task_waiting(callback) Manual Task wartet
on_manual_task_finished(callback) Manual Task abgeschlossen
on_empty_activity_waiting(callback) Empty Activity wartet
on_empty_activity_finished(callback) Empty Activity abgeschlossen
on_boundary_event_triggered(callback) Boundary Event ausgelöst
on_intermediate_throw_event_triggered(callback) Intermediate Throw Event
on_intermediate_catch_event_reached(callback) Intermediate Catch Event erreicht
on_intermediate_catch_event_finished(callback) Intermediate Catch Event abgeschlossen

FlowNodeInstanceClient

Events auf Flow-Node-Ebene auslösen.

client = factory.create_flow_node_instance_client("http://localhost:56100")

client.trigger_message_event("MeineNachricht", process_instance_id="...")
client.trigger_signal_event("MeinSignal")

AppInfoClient

Engine-Informationen abfragen.

client = factory.create_app_info_client("http://localhost:56100")

info = client.get_info()
authority = client.get_authority()

Synchroner Client (core/api)

Für synchrone Kontexte (z.B. Robot Framework) steht eine aggregierte Client-Klasse bereit, die alle Handler in einer einzigen Schnittstelle bündelt:

from processcube_client.core.api.client import Client

client = Client("http://localhost:56100")

# Engine-Info
info = client.info()

# Prozess starten
from processcube_client.core.api.helpers.process_models import ProcessStartRequest
result = client.process_model_start("MeinProzess", ProcessStartRequest(
    start_event_id="StartEvent_1"
))

# User Tasks abfragen
from processcube_client.core.api.helpers.user_tasks import UserTaskQuery
tasks = client.user_task_query(UserTaskQuery(
    process_instance_id=result.process_instance_id
))

# BPMN-Dateien deployen
result = client.deploy_bpmn_from_path("prozesse/")

Authentifizierung

Standardmäßig wird ein Dummy-Token (ZHVtbXlfdG9rZW4=) verwendet. Für eigene Authentifizierung kann ein Callable übergeben werden:

def get_identity():
    return {"token": "mein_jwt_token"}

# Async Client
client = ExternalTaskClient("http://localhost:56100", identity=get_identity)

# Sync Client
from processcube_client.core.api.client import Client
client = Client("http://localhost:56100", identity=get_identity)

API-Endpunkte

Alle Endpunkte liegen unter {engine_url}/atlas_engine/api/v1/.

Methode Endpunkt Beschreibung
GET /info Engine-Informationen
GET /authority Authority-URL
POST /process_definitions Prozessdefinition hochladen
DELETE /process_definitions/{id} Prozessdefinition löschen
POST /process_models/{id}/start Prozessinstanz starten
GET /process_instances/query Prozessinstanzen abfragen
PUT /process_instances/{id}/terminate Prozessinstanz abbrechen
POST /external_tasks/fetch_and_lock ExternalTasks abholen und locken
PUT /external_tasks/{id}/extend_lock Lock verlängern
PUT /external_tasks/{id}/finish ExternalTask abschließen
PUT /external_tasks/{id}/error Fehler melden
GET /user_tasks User Tasks abfragen
PUT /user_tasks/{id}/finish User Task abschließen
PUT /user_tasks/{id}/reserve User Task reservieren
DELETE /user_tasks/{id}/cancel-reservation Reservierung aufheben
PUT /manual_tasks/{id}/finish Manual Task abschließen
POST /messages/{name}/trigger Message-Event auslösen
POST /signals/{name}/trigger Signal-Event auslösen
GET /flow_node_instances Flow-Node-Instanzen abfragen
GET /data_object_instances/query Datenobjekt-Instanzen abfragen

Lizenz

MIT — ProcessCube UG

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

processcube_client-6.1.2.tar.gz (42.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

processcube_client-6.1.2-py3-none-any.whl (57.2 kB view details)

Uploaded Python 3

File details

Details for the file processcube_client-6.1.2.tar.gz.

File metadata

  • Download URL: processcube_client-6.1.2.tar.gz
  • Upload date:
  • Size: 42.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for processcube_client-6.1.2.tar.gz
Algorithm Hash digest
SHA256 25108af34f7759f37596ebcf4f1ce10fe0686842da92534affda362d810ab190
MD5 1afe5f0bc4b6993590dd9555df797576
BLAKE2b-256 dbd72d33d760ed7357c9c3c18c6b2992ced805f6985b09e1d6b761332bfb90e4

See more details on using hashes here.

File details

Details for the file processcube_client-6.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for processcube_client-6.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 cedd57aab4455cce8fd8433812a541fe24d68c92beb3425d1be0bd3075d75de6
MD5 30a858a5750cc013fef35ef4c8312cab
BLAKE2b-256 baea292245a3dbd5699af85db85229e57d8b89eb63f07afe10973bf25e9b3a11

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page