A Client for the workflow engine of the ProcessCube platform.
Project description
ProcessCube® Python Client
Python-Client-Bibliothek für die ProcessCube®-Workflow-Engine (Atlas Engine).
Ermöglicht die Anbindung an die ProcessCube® Engine aus Python-Anwendungen heraus — sowohl asynchron (für ExternalTasks, Notifications) als auch synchron (für Robot Framework und Scripting).
Installation
pip install processcube_client
Unterstützte Python-Versionen: 3.11, 3.12, 3.13
Schnellstart
from processcube_client import ExternalTaskClient
def handle_task(payload):
print(f"Verarbeite: {payload}")
return {"ergebnis": "fertig"}
client = ExternalTaskClient("http://localhost:56100")
client.subscribe_to_external_task_topic("meinTopic", handle_task)
client.start()
Architektur
Der Client ist in zwei Schichten aufgebaut:
graph TB
subgraph "Schicht 2 — Domain-Clients (async)"
ETClient["ExternalTaskClient<br/><i>Long-Polling + Worker</i>"]
PDClient["ProcessDefinitionClient<br/><i>Prozesse starten</i>"]
PIClient["ProcessInstanceClient<br/><i>Instanzen verwalten</i>"]
UTClient["UserTaskClient<br/><i>User Tasks bearbeiten</i>"]
EvClient["EventClient<br/><i>Messages & Signals</i>"]
NotClient["NotificationClient<br/><i>Event-Subscriptions</i>"]
FNIClient["FlowNodeInstanceClient<br/><i>Flow-Node-Events</i>"]
AIClient["AppInfoClient<br/><i>Engine-Info</i>"]
end
subgraph "Schicht 1 — Low-Level HTTP"
BaseAsync["BaseClient<br/><i>async, aiohttp</i>"]
BaseSync["BaseClient<br/><i>sync, requests</i>"]
end
subgraph "Synchroner Aggregations-Client"
SyncClient["Client<br/><i>core/api — für Robot Framework</i>"]
end
ETClient --> BaseAsync
PDClient --> BaseAsync
PIClient --> BaseAsync
UTClient --> BaseAsync
EvClient --> BaseAsync
NotClient --> BaseAsync
FNIClient --> BaseAsync
AIClient --> BaseAsync
SyncClient --> BaseSync
CF["ClientFactory"] --> ETClient
CF --> PDClient
CF --> PIClient
CF --> UTClient
CF --> EvClient
CF --> NotClient
CF --> FNIClient
CF --> AIClient
Alle Clients kommunizieren mit der Engine über REST-Endpunkte unter:
{engine_url}/atlas_engine/api/v1/{endpoint}
ExternalTasks
Konzept
ExternalTasks sind Service Tasks in einem BPMN-Prozess, deren Logik außerhalb der Engine ausgeführt wird. Die Engine stellt die Aufgabe bereit, ein externer Worker holt sie ab, verarbeitet sie und meldet das Ergebnis zurück.
Dieses Muster ermöglicht:
- Entkopplung — Die Geschäftslogik lebt im Python-Code, nicht in der Engine
- Skalierung — Mehrere Worker können dasselbe Topic parallel bedienen
- Technologiefreiheit — Der Worker kann beliebige Bibliotheken und Services nutzen
Ablauf: Fetch-and-Lock-Zyklus
Der ExternalTaskClient verwendet Long Polling, um auf neue Aufgaben zu warten. Die Engine hält die Verbindung offen, bis entweder ein Task verfügbar ist oder das Timeout erreicht wird.
sequenceDiagram
participant W as ExternalTaskClient<br/>(Worker)
participant E as ProcessCube® Engine
loop Endlosschleife pro Topic
W->>+E: POST /external_tasks/fetch_and_lock<br/>{ topicName, workerId, maxTasks,<br/> longPollingTimeout, lockDuration }
Note over E: Engine hält Verbindung offen<br/>(Long Polling) bis Task<br/>verfügbar oder Timeout
alt Task(s) verfügbar
E-->>-W: [ ExternalTask, ... ]
par Für jeden ExternalTask
Note over W: Handler aufrufen mit payload
alt Handler erfolgreich
W->>E: PUT /external_tasks/{id}/finish<br/>{ workerId, result }
E-->>W: 200 OK
else FunctionalError
W->>E: PUT /external_tasks/{id}/error<br/>{ workerId, error: { errorCode, errorMessage } }
E-->>W: 200 OK
else Technischer Fehler
W->>E: PUT /external_tasks/{id}/error<br/>{ workerId, error: { errorCode, errorMessage } }
E-->>W: 200 OK
end
end
else Timeout (keine Tasks)
E-->>W: [ ]
end
end
Lock-Verlängerung
Wenn die Verarbeitung länger dauert als die Lock-Duration, verlängert der Worker den Lock automatisch. Der Timer wird bei 90% der Lock-Duration ausgelöst und wiederholt sich, bis der Task abgeschlossen ist.
sequenceDiagram
participant W as ExternalTaskWorker
participant E as ProcessCube® Engine
Note over W: Task gelockt<br/>lock_duration_in_ms = 60000
par Task-Verarbeitung
Note over W: Handler wird ausgeführt...
and Lock-Verlängerung (alle 54s)
loop Alle 90% der lock_duration
W->>E: PUT /external_tasks/{id}/extend_lock<br/>{ workerId, additionalDuration }
E-->>W: 200 OK
end
end
Note over W: Handler fertig
W->>E: PUT /external_tasks/{id}/finish
Note over W: Lock-Timer wird gestoppt
ExternalTask-Lebenszyklus
stateDiagram-v2
[*] --> Wartend: BPMN-Prozess erreicht<br/>ExternalTask-Node
Wartend --> Gelockt: Worker ruft<br/>fetch_and_lock auf
Gelockt --> Gelockt: Lock wird verlängert<br/>(extend_lock)
Gelockt --> Abgeschlossen: Worker meldet<br/>Ergebnis (finish)
Gelockt --> Fehler: Worker meldet<br/>Fehler (error)
Gelockt --> Wartend: Lock abgelaufen<br/>(Timeout)
Fehler --> Wartend: Engine setzt Task<br/>erneut auf wartend
Abgeschlossen --> [*]: Prozess läuft weiter
API
ExternalTaskClient
from processcube_client import ExternalTaskClient
client = ExternalTaskClient(url, session=None, identity=None, loop=None, **kwargs)
| Parameter | Beschreibung |
|---|---|
url |
Engine-URL, z.B. "http://localhost:56100" |
identity |
Optional: Dict {"token": "..."} oder Callable das ein solches Dict liefert |
loop |
Optional: Eigener asyncio Event Loop |
worker_id |
Optional (in kwargs): Eigene Worker-ID (Default: UUID) |
Topic abonnieren
client.subscribe_to_external_task_topic(topic, handler, **options)
| Option | Beschreibung | Default |
|---|---|---|
max_tasks |
Maximale Anzahl gleichzeitig abgeholter Tasks | 10 |
long_polling_timeout_in_ms |
Timeout für Long Polling in Millisekunden | 10000 (10s) |
lock_duration_in_ms |
Dauer des Locks in Millisekunden | 100000 (100s) |
payload_filter |
Filter für den Task-Payload | None |
Starten und Stoppen
# Startet den Event Loop und verarbeitet Tasks (blockierend)
client.start()
# Stoppt den Client
client.stop()
Handler
Der Handler ist eine Funktion, die vom Worker für jeden ExternalTask aufgerufen wird. Er empfängt den Payload und gibt ein Ergebnis-Dictionary zurück.
Einfacher Handler (nur Payload)
def handle_task(payload):
name = payload.get("name", "Welt")
return {"greeting": f"Hallo {name}!"}
Erweiterter Handler (Payload + ExternalTask)
Wenn der Handler einen zweiten Parameter akzeptiert, erhält er zusätzlich das gesamte ExternalTask-Objekt mit Metadaten:
def handle_task(payload, external_task):
task_id = external_task["id"]
correlation_id = external_task["correlationId"]
process_instance_id = external_task["processInstanceId"]
print(f"Task {task_id} in Prozess {process_instance_id}")
return {"status": "verarbeitet"}
Das ExternalTask-Objekt enthält:
| Feld | Beschreibung |
|---|---|
id |
Eindeutige Task-ID |
workerId |
ID des Workers, der den Task gelockt hat |
topic |
Topic-Name |
flowNodeInstanceId |
ID der Flow-Node-Instanz |
correlationId |
Korrelations-ID des Prozesses |
processDefinitionId |
ID der Prozessdefinition |
processInstanceId |
ID der Prozessinstanz |
payload |
Task-Payload (Dictionary) |
state |
Status: pending oder finished |
lockExpirationTime |
Zeitpunkt, wann der Lock abläuft |
createdAt |
Erstellungszeitpunkt |
Async-Handler
Async-Handler werden ebenfalls unterstützt:
import aiohttp
async def handle_task(payload):
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.example.com/{payload['id']}") as resp:
data = await resp.json()
return {"result": data}
Fehlerbehandlung
FunctionalError (BPMN-Fehler)
Ein FunctionalError signalisiert einen fachlichen Fehler, der im BPMN-Prozess behandelt werden kann (z.B. über einen Error Boundary Event):
from processcube_client import ExternalTaskClient
from processcube_client.external_task import FunctionalError
def handle_task(payload):
if not payload.get("email"):
raise FunctionalError(
code="VALIDATION_ERROR",
message="E-Mail-Adresse fehlt",
details="Das Feld 'email' ist erforderlich"
)
return {"status": "ok"}
flowchart LR
A[ExternalTask-Handler] -->|FunctionalError| B[Engine: Error Boundary Event]
A -->|Exception| C[Engine: Technischer Fehler]
A -->|return result| D[Engine: Task abgeschlossen]
B --> E[BPMN-Fehlerbehandlung]
C --> F[Task wird als fehlerhaft markiert]
D --> G[Prozess läuft weiter]
Technische Fehler
Jede andere Exception wird als technischer Fehler an die Engine gemeldet. Der Fehlercode ist der Exception-Typ, die Nachricht enthält den Stacktrace:
def handle_task(payload):
# Jede unbehandelte Exception wird automatisch als
# technischer Fehler an die Engine gemeldet
result = 1 / 0 # ZeroDivisionError → technischer Fehler
return result
Vollständiges Beispiel
import logging
from processcube_client import ExternalTaskClient
from processcube_client.external_task import FunctionalError
logging.basicConfig(level=logging.INFO)
ENGINE_URL = "http://localhost:56100"
def bestellung_pruefen(payload):
"""Prüft eine Bestellung und gibt das Ergebnis zurück."""
artikel = payload.get("artikel", [])
if not artikel:
raise FunctionalError(
code="LEERE_BESTELLUNG",
message="Bestellung enthält keine Artikel"
)
gesamtpreis = sum(a.get("preis", 0) * a.get("menge", 1) for a in artikel)
return {
"gesamtpreis": gesamtpreis,
"artikelanzahl": len(artikel),
"status": "geprueft"
}
def versand_vorbereiten(payload, external_task):
"""Bereitet den Versand vor — nutzt ExternalTask-Metadaten."""
print(f"Versand für Prozess: {external_task['processInstanceId']}")
return {"versand_status": "vorbereitet"}
client = ExternalTaskClient(ENGINE_URL)
client.subscribe_to_external_task_topic(
"BestellungPruefen",
bestellung_pruefen,
max_tasks=5,
long_polling_timeout_in_ms=30000,
lock_duration_in_ms=60000
)
client.subscribe_to_external_task_topic(
"VersandVorbereiten",
versand_vorbereiten
)
client.start()
Mehrere Topics mit ClientFactory
from processcube_client import ClientFactory
factory = ClientFactory()
client = factory.create_external_task_client("http://localhost:56100")
et_client = client.subscribe_to_external_task_topic("TopicA", handler_a)
et_client.start()
Weitere Clients
ProcessDefinitionClient
Prozesse starten und Definitionen verwalten.
from processcube_client import ClientFactory
factory = ClientFactory()
client = factory.create_process_definition_client("http://localhost:56100")
# Prozess starten und auf Ende warten
result = client.start_process_instance_and_await_end_event(
"MeinProzess",
start_event_id="StartEvent_1",
initial_token={"eingabe": "wert"}
)
| Methode | Beschreibung |
|---|---|
start_process_instance(process_model_id, **options) |
Startet Prozess, gibt sofort zurück |
start_process_instance_and_await_end_event(process_model_id, **options) |
Startet Prozess und wartet auf Ende |
start_process_instance_and_await_specific_end_event(process_model_id, end_event_id=..., **options) |
Wartet auf bestimmtes End-Event |
get_process_definition(process_model_id) |
Gibt die Prozessdefinition zurück |
ProcessInstanceClient
Laufende Prozessinstanzen verwalten.
client = factory.create_process_instance_client("http://localhost:56100")
client.terminate(process_instance_id)
client.retry(process_instance_id)
| Methode | Beschreibung |
|---|---|
terminate(process_instance_id) |
Bricht eine Prozessinstanz ab |
retry(process_instance_id) |
Wiederholt eine fehlgeschlagene Instanz |
UserTaskClient
User Tasks abfragen und bearbeiten.
client = factory.create_user_task_client("http://localhost:56100")
tasks = client.get_user_tasks(state="suspended")
for task in tasks:
print(f"Task: {task['name']}")
client.finish_user_task(user_task_instance_id, {"approved": True})
| Methode | Beschreibung |
|---|---|
get_user_tasks(state='suspended') |
Alle User Tasks im angegebenen Status |
reserve_user_task(id, owner_id) |
Reserviert einen Task für einen Benutzer |
cancel_reservation_user_task(id) |
Hebt die Reservierung auf |
finish_user_task(id, answer) |
Schließt einen Task mit Ergebnis ab |
EventClient
BPMN-Events auslösen.
client = factory.create_event_client("http://localhost:56100")
# Message-Event auslösen
client.trigger_message("BestellungEingegangen", payload={"order_id": "123"})
# Signal-Event auslösen
client.trigger_signal("NotfallStop")
| Methode | Beschreibung |
|---|---|
trigger_message(event_name, payload={}, process_instance_id=None) |
Message-Event senden |
trigger_signal(signal_name) |
Signal-Event broadcasten |
NotificationClient
Echtzeit-Benachrichtigungen über Prozess-Events per Long Polling.
client = factory.create_notification_client("http://localhost:56100")
client.on_process_started(lambda event: print(f"Prozess gestartet: {event}"))
client.on_process_ended(lambda event: print(f"Prozess beendet: {event}"))
client.on_user_task_waiting(lambda event: print(f"User Task wartet: {event}"))
client.start()
| Methode | Beschreibung |
|---|---|
on_process_started(callback) |
Prozess gestartet |
on_process_ended(callback) |
Prozess beendet |
on_process_error(callback) |
Prozess-Fehler |
on_activity_reached(callback) |
Aktivität erreicht |
on_activity_finished(callback) |
Aktivität abgeschlossen |
on_user_task_waiting(callback) |
User Task wartet |
on_user_task_finished(callback) |
User Task abgeschlossen |
on_user_task_reserved(callback) |
User Task reserviert |
on_user_task_reservation_canceled(callback) |
Reservierung aufgehoben |
on_manual_task_waiting(callback) |
Manual Task wartet |
on_manual_task_finished(callback) |
Manual Task abgeschlossen |
on_empty_activity_waiting(callback) |
Empty Activity wartet |
on_empty_activity_finished(callback) |
Empty Activity abgeschlossen |
on_boundary_event_triggered(callback) |
Boundary Event ausgelöst |
on_intermediate_throw_event_triggered(callback) |
Intermediate Throw Event |
on_intermediate_catch_event_reached(callback) |
Intermediate Catch Event erreicht |
on_intermediate_catch_event_finished(callback) |
Intermediate Catch Event abgeschlossen |
FlowNodeInstanceClient
Events auf Flow-Node-Ebene auslösen.
client = factory.create_flow_node_instance_client("http://localhost:56100")
client.trigger_message_event("MeineNachricht", process_instance_id="...")
client.trigger_signal_event("MeinSignal")
AppInfoClient
Engine-Informationen abfragen.
client = factory.create_app_info_client("http://localhost:56100")
info = client.get_info()
authority = client.get_authority()
Synchroner Client (core/api)
Für synchrone Kontexte (z.B. Robot Framework) steht eine aggregierte Client-Klasse bereit, die alle Handler in einer einzigen Schnittstelle bündelt:
from processcube_client.core.api.client import Client
client = Client("http://localhost:56100")
# Engine-Info
info = client.info()
# Prozess starten
from processcube_client.core.api.helpers.process_models import ProcessStartRequest
result = client.process_model_start("MeinProzess", ProcessStartRequest(
start_event_id="StartEvent_1"
))
# User Tasks abfragen
from processcube_client.core.api.helpers.user_tasks import UserTaskQuery
tasks = client.user_task_query(UserTaskQuery(
process_instance_id=result.process_instance_id
))
# BPMN-Dateien deployen
result = client.deploy_bpmn_from_path("prozesse/")
Authentifizierung
Standardmäßig wird ein Dummy-Token (ZHVtbXlfdG9rZW4=) verwendet. Für eigene Authentifizierung kann ein Callable übergeben werden:
def get_identity():
return {"token": "mein_jwt_token"}
# Async Client
client = ExternalTaskClient("http://localhost:56100", identity=get_identity)
# Sync Client
from processcube_client.core.api.client import Client
client = Client("http://localhost:56100", identity=get_identity)
API-Endpunkte
Alle Endpunkte liegen unter {engine_url}/atlas_engine/api/v1/.
| Methode | Endpunkt | Beschreibung |
|---|---|---|
GET |
/info |
Engine-Informationen |
GET |
/authority |
Authority-URL |
POST |
/process_definitions |
Prozessdefinition hochladen |
DELETE |
/process_definitions/{id} |
Prozessdefinition löschen |
POST |
/process_models/{id}/start |
Prozessinstanz starten |
GET |
/process_instances/query |
Prozessinstanzen abfragen |
PUT |
/process_instances/{id}/terminate |
Prozessinstanz abbrechen |
POST |
/external_tasks/fetch_and_lock |
ExternalTasks abholen und locken |
PUT |
/external_tasks/{id}/extend_lock |
Lock verlängern |
PUT |
/external_tasks/{id}/finish |
ExternalTask abschließen |
PUT |
/external_tasks/{id}/error |
Fehler melden |
GET |
/user_tasks |
User Tasks abfragen |
PUT |
/user_tasks/{id}/finish |
User Task abschließen |
PUT |
/user_tasks/{id}/reserve |
User Task reservieren |
DELETE |
/user_tasks/{id}/cancel-reservation |
Reservierung aufheben |
PUT |
/manual_tasks/{id}/finish |
Manual Task abschließen |
POST |
/messages/{name}/trigger |
Message-Event auslösen |
POST |
/signals/{name}/trigger |
Signal-Event auslösen |
GET |
/flow_node_instances |
Flow-Node-Instanzen abfragen |
GET |
/data_object_instances/query |
Datenobjekt-Instanzen abfragen |
Lizenz
MIT — ProcessCube UG
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file processcube_client-6.1.2.tar.gz.
File metadata
- Download URL: processcube_client-6.1.2.tar.gz
- Upload date:
- Size: 42.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
25108af34f7759f37596ebcf4f1ce10fe0686842da92534affda362d810ab190
|
|
| MD5 |
1afe5f0bc4b6993590dd9555df797576
|
|
| BLAKE2b-256 |
dbd72d33d760ed7357c9c3c18c6b2992ced805f6985b09e1d6b761332bfb90e4
|
File details
Details for the file processcube_client-6.1.2-py3-none-any.whl.
File metadata
- Download URL: processcube_client-6.1.2-py3-none-any.whl
- Upload date:
- Size: 57.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cedd57aab4455cce8fd8433812a541fe24d68c92beb3425d1be0bd3075d75de6
|
|
| MD5 |
30a858a5750cc013fef35ef4c8312cab
|
|
| BLAKE2b-256 |
baea292245a3dbd5699af85db85229e57d8b89eb63f07afe10973bf25e9b3a11
|