Netflix Conductor Python SDK
Project description
Conductor Python
Software Development Kit for Netflix Conductor, written on and providing support for Python.
Quick Guide
In case you have already set up Conductor and got it running, you can quickly onboard to it with these steps:
- Install Conductor Python SDK from pypi:
$ python3 -m pip install conductor-python
- Run Conductor Python SDK:
$ python3 -m conductor.client
Custom worker
In order to create a custom python worker, it should inherit from WorkerInterface
and instantiate a TaskHandler
object.
Here is an example:
- Worker:
- Details:
from conductor.client.worker.worker_interface import WorkerInterface import multiprocessing import socket class SimplePythonWorker(WorkerInterface): def get_task_definition_name(self): return 'simple_python_worker' def execute(self, task_result): task_result.add_output_data('hostname', socket.gethostname()) task_result.add_output_data('cpu_cores', multiprocessing.cpu_count()) return task_result def get_polling_interval(self): return 3
- Name:
simple_python_worker
- Input:
None
- Output: [
hostname
,cpu_cores
] - Polling Interval:
3s
- Name:
- Details:
- Main:
from conductor.client.automator.task_handler import TaskHandler from conductor.client.worker.simple_python_worker import SimplePythonWorker def main(): workers = [SimplePythonWorker()] * 1 with TaskHandler(workers) as parallel_task_handler: parallel_task_handler.start() if __name__ == '__main__': main()
- Single worker within TaskHandler
Full installation guide
Set up Conductor
- Clone Netflix Conductor repository: https://github.com/Netflix/conductor
$ git clone https://github.com/Netflix/conductor.git
- Start Conductor server by running this command at the repo root folder (
/conductor
):$ ./gradlew bootRun
- Start Conductor UI by running this command at the UI folder (
/conductor/ui
):$ yarn install $ yarn run start
You should be able to access:
- Conductor API:
- Conductor UI:
Run
Create new task
You need to define a Task within Conductor that your Python Worker
is capable of running.
Make a POST request to /metadata/taskdefs
endpoint at your conductor server.
- URL example:
http://localhost:8080/api/metadata/taskdefs
- Task Definition example:
[ { "name": "simple_python_worker", "description": "Simple Python Worker", "retryCount": 3, "retryLogic": "FIXED", "retryDelaySeconds": 10, "timeoutSeconds": 300, "timeoutPolicy": "TIME_OUT_WF", "responseTimeoutSeconds": 180, "ownerEmail": "example@example.com" } ]
- Command example:
$ curl -X 'POST' \ 'http://localhost:8080/api/metadata/taskdefs' \ -H 'accept: */*' \ -H 'Content-Type: application/json' \ -d '[ { "name": "simple_python_worker", "description": "Simple Python Worker", "retryCount": 3, "retryLogic": "FIXED", "retryDelaySeconds": 10, "timeoutSeconds": 300, "timeoutPolicy": "TIME_OUT_WF", "responseTimeoutSeconds": 180, "ownerEmail": "example@example.com" } ]'
Create new workflow
You need to define a Workflow within Conductor that contains the Task you had just defined.
Make a POST request to /metadata/workflow
endpoint at your conductor server.
- URL example:
http://localhost:8080/api/metadata/workflow
- Workflow Definition example:
{ "createTime": 1634021619147, "updateTime": 1630694890267, "name": "simple_workflow_with_python_worker", "description": "Simple Workflow with Python Worker", "version": 1, "tasks": [ { "name": "simple_python_worker", "taskReferenceName": "simple_python_worker_ref_1", "inputParameters": {}, "type": "SIMPLE" } ], "inputParameters": [], "outputParameters": { "workerOutput": "${simple_python_worker_ref_1.output}" }, "schemaVersion": 2, "restartable": true, "ownerEmail": "example@example.com", "timeoutPolicy": "ALERT_ONLY", "timeoutSeconds": 0 }
- Command example:
$ curl -X 'POST' \ 'http://localhost:8080/api/metadata/workflow' \ -H 'accept: */*' \ -H 'Content-Type: application/json' \ -d '{ "createTime": 1634021619147, "updateTime": 1630694890267, "name": "simple_workflow_with_python_worker", "description": "Simple Workflow with Python Worker", "version": 1, "tasks": [ { "name": "simple_python_worker", "taskReferenceName": "simple_python_worker_ref_1", "inputParameters": {}, "type": "SIMPLE" } ], "inputParameters": [], "outputParameters": { "workerOutput": "${simple_python_worker_ref_1.output}" }, "schemaVersion": 2, "restartable": true, "ownerEmail": "example@example.com", "timeoutPolicy": "ALERT_ONLY", "timeoutSeconds": 0 }'
Start new workflow
Now that you have defined a Task and a Workflow within Conductor, you should be able to run it.
Make a POST request to /workflow/{name}
endpoint at your conductor server.
- URL example:
http://localhost:8080/api/workflow/simple_workflow_with_python_worker
- Priority should be empty
- Request body should be empty, like:
{}
- Command example:
$ curl -X 'POST' \ 'http://localhost:8080/api/workflow/simple_workflow_with_python_worker' \ -H 'accept: text/plain' \ -H 'Content-Type: application/json' \ -d '{}'
- Create a bunch of workflows:
$ export CREATE_WORKFLOW_SHORTCUT="curl -X 'POST' \ 'http://localhost:8080/api/workflow/simple_workflow_with_python_worker' \ -H 'accept: text/plain' \ -H 'Content-Type: application/json' \ -d '{}' \ -s" $ for idx in {1..100}; do \ echo "Creating workflow ${idx}"; \ workflow_id=$(eval "${CREATE_WORKFLOW_SHORTCUT}"); \ echo "workflow_id=${workflow_id}"; \ done
- Expected output example:
Creating workflow 1 workflow_id=6dd2c86b-5ce6-487a-9a65-632139da1345 Creating workflow 2 workflow_id=b0ddfdcf-0c4a-4fd3-892c-97fc38c46d63 ...
- Expected output example:
You should receive a Workflow ID at the Response body
- Workflow ID example:
8ff0bc06-4413-4c94-b27a-b3210412a914
See workflow execution
Now you must be able to see its execution through the UI.
- URL:
- prefix:
http://localhost:5001/execution
- suffix:
${workflow_id}
- prefix:
- Example:
http://localhost:5001/execution/8ff0bc06-4413-4c94-b27a-b3210412a914
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file conductor-python-1.0.5.tar.gz
.
File metadata
- Download URL: conductor-python-1.0.5.tar.gz
- Upload date:
- Size: 62.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.7.1 importlib_metadata/4.10.1 pkginfo/1.8.2 requests/2.27.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.8.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 58a73f057ab9995123d74bae326961f6b68293102e318a11be007e4822d6b9f6 |
|
MD5 | fbf798e9a83737f5458063226aa38aab |
|
BLAKE2b-256 | 3724c572f58b614e5ec3570670d68e392f4b5e8be7621f2021874f79accaa098 |
File details
Details for the file conductor_python-1.0.5-py3-none-any.whl
.
File metadata
- Download URL: conductor_python-1.0.5-py3-none-any.whl
- Upload date:
- Size: 120.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.7.1 importlib_metadata/4.10.1 pkginfo/1.8.2 requests/2.27.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.8.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8f14aae5ee7cc36e042523d7516e67b3896bdc90311cc8c3fe2efecdc79c016a |
|
MD5 | 4e8d8ffa453db8af3a96b457d86ffe69 |
|
BLAKE2b-256 | 0d5270473326610e1d4e831fab4f4e5118eddd9485b64ee13fc78f60c7255928 |