Skip to main content

Camunda External Task Client for Python 3

Project description

Community ExtensionLifecycle: Stable

camunda-external-task-client-python3

camunda-external-task-client-python3

This repository contains Camunda External Task Client written in Python3.

Implement your BPMN Service Task in Python3.

Python >= 3.7 is required

Installing

Add following line to requirements.txt of your Python project.

git+https://github.com/trustfactors/camunda-external-task-client-python3.git/#egg=camunda-external-task-client-python3

Or use pip to install as shown below:

pip install camunda-external-task-client-python3

Running Camunda with Docker

To run the examples provided in examples folder, you need to have Camunda running locally or somewhere.

To run Camunda locally with Postgres DB as backend, you can use docker-compose.yml file.

$> docker-compose -f docker-compose.yml up

Auth Basic Examples

To run the examples with Auth Basic provided in examples/examples_auth_basic folder, you need to have Camunda with AuthBasic, running locally or somewhere.

To run Camunda with AuthBasic locally with Postgres DB as backend, you can use docker-compose-auth.yml file.

$> docker-compose -f docker-compose-auth.yml up

Usage

  1. Make sure to have Camunda running.
  2. Create a simple process model with an External Service Task and define the topic as 'topicName'.
  3. Deploy the process to the Camunda BPM engine.
  4. In your Python code:
import time
from camunda.external_task.external_task import ExternalTask, TaskResult
from camunda.external_task.external_task_worker import ExternalTaskWorker

# configuration for the Client
default_config = {
    "maxTasks": 1,
    "lockDuration": 10000,
    "asyncResponseTimeout": 5000,
    "retries": 3,
    "retryTimeout": 5000,
    "sleepSeconds": 30
}

def handle_task(task: ExternalTask) -> TaskResult:
    """
    This task handler you need to implement with your business logic.
    After completion of business logic call either task.complete() or task.failure() or task.bpmn_error() 
    to report status of task to Camunda
    """
    # add your business logic here
    # ...
    
    # mark task either complete/failure/bpmnError based on outcome of your business logic
    failure, bpmn_error = random_true(), random_true() # this code simulate random failure
    if failure:
        # this marks task as failed in Camunda
        return task.failure(error_message="task failed",  error_details="failed task details", 
                            max_retries=3, retry_timeout=5000)
    elif bpmn_error:
        return task.bpmn_error(error_code="BPMN_ERROR_CODE", error_message="BPMN Error occurred", 
                                variables={"var1": "value1", "success": False})
    
    # pass any output variables you may want to send to Camunda as dictionary to complete()
    return task.complete({"var1": 1, "var2": "value"}) 

def random_true():
    current_milli_time = int(round(time.time() * 1000))
    return current_milli_time % 2 == 0

if __name__ == '__main__':
   ExternalTaskWorker(worker_id="1", config=default_config).subscribe("topicName", handle_task)

About External Tasks

External Tasks are service tasks whose execution differs particularly from the execution of other service tasks (e.g. Human Tasks). The execution works in a way that units of work are polled from the engine before being completed.

camunda-external-task-client-python allows you to create easily such client in Python3.

Features

Start process

Camunda provides functionality to start a process instance for a given process definition.

To start a process instance, we can use start_process() from engine_client.py

You can find a usage example here.

client = EngineClient()
resp_json = client.start_process(process_key="PARALLEL_STEPS_EXAMPLE", variables={"intVar": "1", "strVar": "hello"},
                                 tenant_id="6172cdf0-7b32-4460-9da0-ded5107aa977", business_key=str(uuid.uuid1()))

Fetch and Lock

ExternalTaskWorker(worker_id="1").subscribe("topicName", handle_task) starts long polling of the Camunda engine for external tasks.

  • Polling tasks from the engine works by performing a fetch & lock operation of tasks that have subscriptions. It then calls the handler function passed to subscribe() function. i.e. handle_task in above example.
  • Long Polling is done periodically based on the asyncResponseTimeout configuration. Read more about Long Polling.

Complete

from camunda.external_task.external_task import ExternalTask, TaskResult
from camunda.external_task.external_task_worker import ExternalTaskWorker
def handle_task(task: ExternalTask) -> TaskResult:
    # add your business logic here
    
    # Complete the task
    # pass any output variables you may want to send to Camunda as dictionary to complete()
    return task.complete({"var1": 1, "var2": "value"})

ExternalTaskWorker(worker_id="1").subscribe("topicName", handle_task)

Handle Failure

from camunda.external_task.external_task import ExternalTask, TaskResult
from camunda.external_task.external_task_worker import ExternalTaskWorker
def handle_task(task: ExternalTask) -> TaskResult:
    # add your business logic here
    
    # Handle task Failure
    return task.failure(error_message="task failed",  error_details="failed task details", 
                        max_retries=3, retry_timeout=5000)
    # This client/worker uses max_retries if no retries are previously set in the task
    # if retries are previously set then it just decrements that count by one before reporting failure to Camunda
    # when retries are zero, Camunda creates an incident which then manually needs to be looked into on Camunda Cockpit            

ExternalTaskWorker(worker_id="1").subscribe("topicName", handle_task)

Handle BPMN Error

from camunda.external_task.external_task import ExternalTask, TaskResult
from camunda.external_task.external_task_worker import ExternalTaskWorker
def handle_task(task: ExternalTask) -> TaskResult:
    # add your business logic here
    
    # Handle a BPMN Failure
    return task.bpmn_error(error_code="BPMN_ERROR", error_message="BPMN error occurred")

ExternalTaskWorker(worker_id="1" ).subscribe("topicName", handle_task)

Access Process Variables

from camunda.external_task.external_task import ExternalTask, TaskResult
from camunda.external_task.external_task_worker import ExternalTaskWorker
def handle_task(task: ExternalTask) -> TaskResult:
    # add your business logic here
    # get the process variable 'score'
    score = task.get_variable("score")
    if int(score) >= 100:
        return task.complete(...)
    else:
        return task.failure(...)        

ExternalTaskWorker().subscribe("topicName", handle_task)

Correlate message

Camunda provides functionality to send a message event to a running process instance.

You can read more about the message events here: https://docs.camunda.org/manual/7.13/reference/bpmn20/events/message-events/

In our to send a message event to a process instance, a new function called correlate_message() is added to engine_client.py

We can correlate the message by:

  • process_instance_id
  • tenant_id
  • business_key
  • process_variables

You can find a usage example here.

client = EngineClient()
resp_json = client.correlate_message("CANCEL_MESSAGE", business_key="b4a6f392-12ab-11eb-80ef-acde48001122")

AuthBasic Usage

To create an EngineClient with AuthBasic simple

client = EngineClient(config={"auth_basic": {"username": "demo", "password": "demo"}})
resp_json = client.start_process(process_key="PARALLEL_STEPS_EXAMPLE", variables={"intVar": "1", "strVar": "hello"},
                                 tenant_id="6172cdf0-7b32-4460-9da0-ded5107aa977", business_key=str(uuid.uuid1()))

To create an ExternalTaskWorker with AuthBasic simple

from camunda.external_task.external_task import ExternalTask, TaskResult
from camunda.external_task.external_task_worker import ExternalTaskWorker

config = {"auth_basic": {"username": "demo", "password": "demo"}}

def handle_task(task: ExternalTask) -> TaskResult:
    # add your business logic here
    
    # Complete the task
    # pass any output variables you may want to send to Camunda as dictionary to complete()
    return task.complete({"var1": 1, "var2": "value"})

ExternalTaskWorker(worker_id="1", config=config).subscribe("topicName", handle_task)

License

The source files in this repository are made available under the Apache License Version 2.0.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

camunda_external_task_client_python3-4.5.0.tar.gz (37.3 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file camunda_external_task_client_python3-4.5.0.tar.gz.

File metadata

File hashes

Hashes for camunda_external_task_client_python3-4.5.0.tar.gz
Algorithm Hash digest
SHA256 a95d37bfed62c17ce375d22f0294c8bd34ba9f9cd8a4a07315bee62716124132
MD5 c6928ff3c2dff26b52cf95b8bd2547c7
BLAKE2b-256 bc89ab9e741717e8a9892eb2722c7c5a927ffac62310932a730857e21cc99cad

See more details on using hashes here.

Provenance

The following attestation bundles were made for camunda_external_task_client_python3-4.5.0.tar.gz:

Publisher: publish.yml on camunda-community-hub/camunda-external-task-client-python3

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file camunda_external_task_client_python3-4.5.0-py3-none-any.whl.

File metadata

File hashes

Hashes for camunda_external_task_client_python3-4.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d65bfe0ba8df752430ec24b9d35ccd507c0278c9f77acc689cd6caecfd3ac4d6
MD5 ee527ef396891c5003c023b014ec3341
BLAKE2b-256 e51a2ffa7c5371e7503e3ffe6599c48dfc050dba66e6c1863b090399cf522b14

See more details on using hashes here.

Provenance

The following attestation bundles were made for camunda_external_task_client_python3-4.5.0-py3-none-any.whl:

Publisher: publish.yml on camunda-community-hub/camunda-external-task-client-python3

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page