Netflix Conductor Python SDK
Project description
Conductor Python
Software Development Kit for Netflix Conductor, written on and providing support for Python.
Quick Guide
-
Create a virtual environment
$ virtualenv conductor $ source conductor/bin/activate $ python3 -m pip list Package Version ---------- ------- pip 22.0.3 setuptools 60.6.0 wheel 0.37.1
-
Install latest version of
conductor-python
from pypi$ python3 -m pip install conductor-python Collecting conductor-python Collecting certifi>=14.05.14 Collecting urllib3>=1.15.1 Requirement already satisfied: setuptools>=21.0.0 in ./conductor/lib/python3.8/site-packages (from conductor-python) (60.6.0) Collecting six>=1.10 Installing collected packages: certifi, urllib3, six, conductor-python Successfully installed certifi-2021.10.8 conductor-python-1.0.7 six-1.16.0 urllib3-1.26.8
-
Create a worker capable of executing a
Task
. Example:from conductor.client.http.models.task import Task from conductor.client.http.models.task_result import TaskResult from conductor.client.http.models.task_result_status import TaskResultStatus from conductor.client.worker.worker_interface import WorkerInterface class SimplePythonWorker(WorkerInterface): def execute(self, task: Task) -> TaskResult: task_result = self.get_task_result_from_task(task) task_result.add_output_data('key', 'value') task_result.status = TaskResultStatus.COMPLETED return task_result
- The
add_output_data
is the most relevant part, since you can store information in a dictionary, which will be sent withinTaskResult
as your execution response to Conductor
- The
-
Create a main method to start polling tasks to execute with your worker. Example:
from conductor.client.automator.task_handler import TaskHandler from conductor.client.configuration.configuration import Configuration from conductor.client.worker.sample.faulty_execution_worker import FaultyExecutionWorker from conductor.client.worker.sample.simple_python_worker import SimplePythonWorker def main(): configuration = Configuration(debug=True) task_definition_name = 'python_task_example' workers = [ FaultyExecutionWorker(task_definition_name), SimplePythonWorker(task_definition_name) ] with TaskHandler(workers, configuration) as task_handler: task_handler.start_processes() task_handler.join_processes() if __name__ == '__main__': main()
- This example contains two workers, each with a different execution method, capable of running the same
task_definition_name
- This example contains two workers, each with a different execution method, capable of running the same
-
Now that you have implemented the example, you can start the Conductor server locally:
- Clone Netflix Conductor repository:
$ git clone https://github.com/Netflix/conductor.git $ cd conductor/
- Start the Conductor server:
/conductor$ ./gradlew bootRun
- Start Conductor UI:
/conductor$ cd ui/ /conductor/ui$ yarn install /conductor/ui$ yarn run start
You should be able to access:
- Conductor API:
- Conductor UI:
- Clone Netflix Conductor repository:
-
Create a
Task
withinConductor
. Example:$ curl -X 'POST' \ 'http://localhost:8080/api/metadata/taskdefs' \ -H 'accept: */*' \ -H 'Content-Type: application/json' \ -d '[ { "name": "python_task_example", "description": "Python task example", "retryCount": 3, "retryLogic": "FIXED", "retryDelaySeconds": 10, "timeoutSeconds": 300, "timeoutPolicy": "TIME_OUT_WF", "responseTimeoutSeconds": 180, "ownerEmail": "example@example.com" } ]'
-
Create a
Workflow
withinConductor
. Example:$ curl -X 'POST' \ 'http://localhost:8080/api/metadata/workflow' \ -H 'accept: */*' \ -H 'Content-Type: application/json' \ -d '{ "createTime": 1634021619147, "updateTime": 1630694890267, "name": "workflow_with_python_task_example", "description": "Workflow with Python Task example", "version": 1, "tasks": [ { "name": "python_task_example", "taskReferenceName": "python_task_example_ref_1", "inputParameters": {}, "type": "SIMPLE" } ], "inputParameters": [], "outputParameters": { "workerOutput": "${python_task_example_ref_1.output}" }, "schemaVersion": 2, "restartable": true, "ownerEmail": "example@example.com", "timeoutPolicy": "ALERT_ONLY", "timeoutSeconds": 0 }'
-
Start a new workflow:
$ curl -X 'POST' \ 'http://localhost:8080/api/workflow/workflow_with_python_task_example' \ -H 'accept: text/plain' \ -H 'Content-Type: application/json' \ -d '{}'
You should receive a Workflow ID at the Response body
- Workflow ID example:
8ff0bc06-4413-4c94-b27a-b3210412a914
Now you must be able to see its execution through the UI.
- Example:
http://localhost:5000/execution/8ff0bc06-4413-4c94-b27a-b3210412a914
- Workflow ID example:
-
Run your Python file with the
main
method
Unit Tests
Simple validation
/conductor-python/src$ python3 -m unittest -v
test_execute_task (tst.automator.test_task_runner.TestTaskRunner) ... ok
test_execute_task_with_faulty_execution_worker (tst.automator.test_task_runner.TestTaskRunner) ... ok
test_execute_task_with_invalid_task (tst.automator.test_task_runner.TestTaskRunner) ... ok
----------------------------------------------------------------------
Ran 3 tests in 0.001s
OK
Run with code coverage
/conductor-python/src$ python3 -m coverage run --source=conductor/ -m unittest
Report:
/conductor-python/src$ python3 -m coverage report
Visual coverage results:
/conductor-python/src$ python3 -m coverage html
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file conductor-python-1.0.15.tar.gz
.
File metadata
- Download URL: conductor-python-1.0.15.tar.gz
- Upload date:
- Size: 62.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.7.1 importlib_metadata/4.10.1 pkginfo/1.8.2 requests/2.27.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.8.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 58f295aff961171124a94780205c96b3d386b0809a49bd6b57cfca9936de723b |
|
MD5 | 0b77e5f0eef400fc2b6dc729aec1c634 |
|
BLAKE2b-256 | fd067cce869369a5604e2153cfc56a944fb9556dd8ae2d0044ed551be789aca8 |
File details
Details for the file conductor_python-1.0.15-py3-none-any.whl
.
File metadata
- Download URL: conductor_python-1.0.15-py3-none-any.whl
- Upload date:
- Size: 101.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.7.1 importlib_metadata/4.10.1 pkginfo/1.8.2 requests/2.27.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.8.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4c7b966ee85c55f861962f2cbe42550d69110fee7b2118810242bab6208c3d73 |
|
MD5 | 7ac1bda39c33ec21f1d496e5a98089a9 |
|
BLAKE2b-256 | 66af4534d95b026d5bf74a52f11aa29279f0ad6531cd1e2e8a74091e99052a50 |