Skip to main content

Netflix Conductor Python SDK

Project description

Netflix Conductor Client SDK

To find out more about Conductor visit: https://github.com/Netflix/conductor

conductor-python repository provides the client SDKs to build Task Workers in Python

Quick Start

  1. Create virtual environment
  2. Write worker
  3. Run workers
  4. Worker Configurations
  5. C/C++ Support

Virtual Environment Setup

 $ virtualenv conductor
 $ source conductor/bin/activate

Install conductor-python package

python3 -m pip install conductor-python

Write worker

from conductor.client.http.models.task import Task
from conductor.client.http.models.task_result import TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface


class SimplePythonWorker(WorkerInterface):
    def execute(self, task: Task) -> TaskResult:
        task_result = self.get_task_result_from_task(task)
        
        # Add any outputs that the task should produce as part of the execution
        task_result.add_output_data('key', 'value')
        task_result.add_output_data('temperature', 32)
        
        # Mark the task status as COMPLETED
        task_result.status = TaskResultStatus.COMPLETED
        return task_result

Run workers

Create main method that does the following:

  1. Adds configurations such as metrics, authentication, thread count, Conductor server URL
  2. Add your workers
  3. Start the workers to poll for work
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.metrics_settings import MetricsSettings
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface
from conductor.client.http.models import Task, TaskResult
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from pathlib import Path
import os


class SimplePythonWorker(WorkerInterface):
    def execute(self, task: Task) -> TaskResult:
        task_result = self.get_task_result_from_task(task)
        task_result.add_output_data('key1', 'value')
        task_result.add_output_data('key2', 42)
        task_result.add_output_data('key3', False)
        task_result.status = TaskResultStatus.COMPLETED
        return task_result


class WorkerA(WorkerInterface):
    def execute(self, task: Task) -> TaskResult:
        task_result = self.get_task_result_from_task(task)
        task_result.add_output_data('key', 'A')
        task_result.status = TaskResultStatus.COMPLETED
        return task_result


class WorkerB(WorkerInterface):
    def execute(self, task: Task) -> TaskResult:
        task_result = self.get_task_result_from_task(task)
        task_result.add_output_data('key', 'B')
        task_result.status = TaskResultStatus.COMPLETED
        return task_result


def main():
    # Create a temp folder for metrics logs
    metrics_dir = str(Path.home()) + '/tmp/'
    if not os.path.isdir(metrics_dir):
        os.mkdir(metrics_dir)

    metrics_settings = MetricsSettings(
        directory=metrics_dir
    )

    # Optionally, if you are using Conductor server that requires authentication,  setup key_id and secret
    auth = AuthenticationSettings(
        key_id='id',
        key_secret='secret'
    )

    # Point to the Conductor Server
    configuration = Configuration(
        base_url='http://localhost:8080',
        debug=True,
        # authentication_settings=auth      # Optional if you are using server that requires authentication
    )

    # Add three workers
    workers = [
        SimplePythonWorker('python_task_example'),
        WorkerA('task_A'),
        WorkerB('task_B'),
    ]

    # Start the worker processes and wait
    with TaskHandler(workers, configuration=configuration, metrics_settings=metrics_settings) as task_handler:
        task_handler.start_processes()
        task_handler.join_processes()


if __name__ == '__main__':
    main()

Save this as main.py

Running Conductor server locally in 2-minute

More details on how to run Conductor see https://netflix.github.io/conductor/server/

Use the script below to download and start the server locally. The server runs in memory and no data saved upon exit.

export CONDUCTOR_VER=3.5.2
export REPO_URL=https://repo1.maven.org/maven2/com/netflix/conductor/conductor-server
curl $REPO_URL/$CONDUCTOR_VER/conductor-server-$CONDUCTOR_VER-boot.jar \
--output conductor-server-$CONDUCTOR_VER-boot.jar; java -jar conductor-server-$CONDUCTOR_VER-boot.jar 

Execute workers

python ./main.py

Create your first workflow

Now, let's create a new workflow and see your task worker code in execution!

Create a new Task Metadata for the worker you just created

curl -X 'POST' \
  'http://localhost:8080/api/metadata/taskdefs' \
  -H 'accept: */*' \
  -H 'Content-Type: application/json' \
  -d '[{
    "name": "python_task_example",
    "description": "Python task example",
    "retryCount": 3,
    "retryLogic": "FIXED",
    "retryDelaySeconds": 10,
    "timeoutSeconds": 300,
    "timeoutPolicy": "TIME_OUT_WF",
    "responseTimeoutSeconds": 180,
    "ownerEmail": "example@example.com"
}]'

Create a workflow that uses the task

curl -X 'POST' \
  'http://localhost:8080/api/metadata/workflow' \
  -H 'accept: */*' \
  -H 'Content-Type: application/json' \
  -d '{
    "name": "workflow_with_python_task_example",
    "description": "Workflow with Python Task example",
    "version": 1,
    "tasks": [
      {
        "name": "python_task_example",
        "taskReferenceName": "python_task_example_ref_1",
        "inputParameters": {},
        "type": "SIMPLE"
      }
    ],
    "inputParameters": [],
    "outputParameters": {
      "workerOutput": "${python_task_example_ref_1.output}"
    },
    "schemaVersion": 2,
    "restartable": true,
    "ownerEmail": "example@example.com",
    "timeoutPolicy": "ALERT_ONLY",
    "timeoutSeconds": 0
}'

Start a new workflow execution

curl -X 'POST' \
  'http://localhost:8080/api/workflow/workflow_with_python_task_example?priority=0' \
  -H 'accept: text/plain' \
  -H 'Content-Type: application/json' \
  -d '{}'

Worker Configurations

Worker configuration is handled via Configuraiton object passed when initializing TaskHandler

Server Configurations

  • base_url : Conductor server address. e.g. http://localhost:8000 if running locally
  • debug: true for verbose logging false to display only the errors
  • authentication_settings: see below
  • metrics_settings: see below

Metrics

Conductor uses Prometheus to collect metrics.

  • directory: Directory where to store the metrics
  • file_name: File where the metrics are colleted. e.g. metrics.log
  • update_interval: Time interval in seconds at which to collect metrics into the file

Authentication

Use if your conductor server requires authentication

  • key_id: Key
  • key_secret: Secret for the Key

C/C++ Support

Python is great, but at times you need to call into native C/C++ code. Here is an example how you can do that with Conductor SDK.

1. Export your C++ functions as extern "C":

  • C++ function example (sum two integers)
    #include <iostream>
    
    extern "C" int32_t get_sum(const int32_t A, const int32_t B) {
        return A + B; 
    }
    

2. Compile and share its library:

  • C++ file name: simple_cpp_lib.cpp
  • Library output name goal: lib.so
    $ g++ -c -fPIC simple_cpp_lib.cpp -o simple_cpp_lib.o
    $ g++ -shared -Wl,-install_name,lib.so -o lib.so simple_cpp_lib.o
    

3. Use the C++ library in your python worker

from conductor.client.http.models.task import Task
from conductor.client.http.models.task_result import TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface
from ctypes import cdll

class CppWrapper:
    def __init__(self, file_path='./lib.so'):
        self.cpp_lib = cdll.LoadLibrary(file_path)

    def get_sum(self, X: int, Y: int) -> int:
        return self.cpp_lib.get_sum(X, Y)


class SimpleCppWorker(WorkerInterface):
    cpp_wrapper = CppWrapper()

    def execute(self, task: Task) -> TaskResult:
        execution_result = self.cpp_wrapper.get_sum(1, 2)
        task_result = self.get_task_result_from_task(task)
        task_result.add_output_data(
            'sum', execution_result
        )
        task_result.status = TaskResultStatus.COMPLETED
        return task_result

Unit Tests

Simple validation

/conductor-python/src$ python3 -m unittest -v

Run with code coverage

/conductor-python/src$ python3 -m coverage run --source=conductor/ -m unittest

Report:

/conductor-python/src$ python3 -m coverage report

Visual coverage results:

/conductor-python/src$ python3 -m coverage html

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

conductor-python-1.0.22.tar.gz (64.4 kB view details)

Uploaded Source

Built Distribution

conductor_python-1.0.22-py3-none-any.whl (105.5 kB view details)

Uploaded Python 3

File details

Details for the file conductor-python-1.0.22.tar.gz.

File metadata

  • Download URL: conductor-python-1.0.22.tar.gz
  • Upload date:
  • Size: 64.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.9 tqdm/4.63.1 importlib-metadata/4.11.3 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.8.9

File hashes

Hashes for conductor-python-1.0.22.tar.gz
Algorithm Hash digest
SHA256 c08f4e54cf89cb5aa7b2c20cba284b9ecaea860530d37dede290b08074e8a5e2
MD5 244a864fbc5489688b57f1ebea194203
BLAKE2b-256 dc7bcca1c163021ec49929b5495c95bf34e368304bef3bb8958f70497ec4a75b

See more details on using hashes here.

File details

Details for the file conductor_python-1.0.22-py3-none-any.whl.

File metadata

  • Download URL: conductor_python-1.0.22-py3-none-any.whl
  • Upload date:
  • Size: 105.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.9 tqdm/4.63.1 importlib-metadata/4.11.3 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.8.9

File hashes

Hashes for conductor_python-1.0.22-py3-none-any.whl
Algorithm Hash digest
SHA256 6804c3d7bbfe9c7c1c3f3ebdcac1f8b4cd95e0fe0e1a41a75913393bd42836ff
MD5 854ccd54b208fdf7b52eaa30dc12447f
BLAKE2b-256 b1c3ae69675a56f9a5c24b50133029e6c32e17b8301a00d327a6263ae823cc42

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page