Skip to main content

Netflix Conductor Python SDK

Project description

Conductor Python

Software Development Kit for Netflix Conductor, written on and providing support for Python.

Quick Guide

In case you have already set up Conductor and got it running, you can quickly onboard to it with these steps:

  1. Install Conductor Python SDK from pypi:
    $ python3 -m pip install conductor-python
    
  2. Run Conductor Python SDK:
    $ python3 -m conductor.client
    

Custom worker

In order to:

  • Create a custom python worker, it should inherit from WorkerInterface and implement the execution method. Example:
    from conductor.client.http.models.task_result import TaskResult
    from conductor.client.worker.worker_interface import WorkerInterface
    
    
    class SimplePythonWorker(WorkerInterface):
        def __init__(self):
            super().__init__('simple_python_worker')
    
        def execute(self, task):
            task_result = TaskResult(
                task_id=task.task_id,
                workflow_instance_id=task.workflow_instance_id,
                worker_id=self.get_task_definition_name()
            )
            self.__execute(task_result)
            task_result.status = 'COMPLETED'
            return task_result
    
        def __execute(self, task_result):
            task_result.add_output_data('key', 'value')
    
  • Run, you should create a main method to instantiate a TaskHandler object with your implemented workers. Example:
    from conductor.client.automator.task_handler import TaskHandler
    from conductor.client.configuration.configuration import Configuration
    from conductor.client.worker.sample.simple_python_worker import SimplePythonWorker
    import logging
    
    logger = logging.getLogger(
        Configuration.get_logging_formatted_name(
            __name__
        )
    )
    
    
    def main():
        configuration = Configuration(
            debug=True
        )
        configuration.apply_logging_config()
        workers = [SimplePythonWorker()] * 3
        logger.debug(f'Created workers: {workers}')
        with TaskHandler(configuration, workers) as task_handler:
            logger.debug(f'Created task_handler: {task_handler}')
            task_handler.start()
    
    
    if __name__ == '__main__':
        main()
    

Full installation guide

Set up Conductor

  1. Clone Netflix Conductor repository: https://github.com/Netflix/conductor
    $ git clone https://github.com/Netflix/conductor.git
    
  2. Start Conductor server by running this command at the repo root folder (/conductor):
    $ ./gradlew bootRun
    
  3. Start Conductor UI by running this command at the UI folder (/conductor/ui):
    $ yarn install
    $ yarn run start
    

You should be able to access:

Run

Create new task

You need to define a Task within Conductor that your Python Worker is capable of running.

Make a POST request to /metadata/taskdefs endpoint at your conductor server.

  • URL example: http://localhost:8080/api/metadata/taskdefs
  • Task Definition example:
    [
      {
        "name": "simple_python_worker",
        "description": "Simple Python Worker",
        "retryCount": 3,
        "retryLogic": "FIXED",
        "retryDelaySeconds": 10,
        "timeoutSeconds": 300,
        "timeoutPolicy": "TIME_OUT_WF",
        "responseTimeoutSeconds": 180,
        "ownerEmail": "example@example.com"
      }
    ]
    
  • Command example:
    $ curl -X 'POST' \
        'http://localhost:8080/api/metadata/taskdefs' \
        -H 'accept: */*' \
        -H 'Content-Type: application/json' \
        -d '[
        {
          "name": "simple_python_worker",
          "description": "Simple Python Worker",
          "retryCount": 3,
          "retryLogic": "FIXED",
          "retryDelaySeconds": 10,
          "timeoutSeconds": 300,
          "timeoutPolicy": "TIME_OUT_WF",
          "responseTimeoutSeconds": 180,
          "ownerEmail": "example@example.com"
        }
      ]'
    

Create new workflow

You need to define a Workflow within Conductor that contains the Task you had just defined.

Make a POST request to /metadata/workflow endpoint at your conductor server.

  • URL example: http://localhost:8080/api/metadata/workflow
  • Workflow Definition example:
    {
      "createTime": 1634021619147,
      "updateTime": 1630694890267,
      "name": "simple_workflow_with_python_worker",
      "description": "Simple Workflow with Python Worker",
      "version": 1,
      "tasks": [
        {
          "name": "simple_python_worker",
          "taskReferenceName": "simple_python_worker_ref_1",
          "inputParameters": {},
          "type": "SIMPLE"
        }
      ],
      "inputParameters": [],
      "outputParameters": {
        "workerOutput": "${simple_python_worker_ref_1.output}"
      },
      "schemaVersion": 2,
      "restartable": true,
      "ownerEmail": "example@example.com",
      "timeoutPolicy": "ALERT_ONLY",
      "timeoutSeconds": 0
    }
    
  • Command example:
    $ curl -X 'POST' \
        'http://localhost:8080/api/metadata/workflow' \
        -H 'accept: */*' \
        -H 'Content-Type: application/json' \
        -d '{
        "createTime": 1634021619147,
        "updateTime": 1630694890267,
        "name": "simple_workflow_with_python_worker",
        "description": "Simple Workflow with Python Worker",
        "version": 1,
        "tasks": [
          {
            "name": "simple_python_worker",
            "taskReferenceName": "simple_python_worker_ref_1",
            "inputParameters": {},
            "type": "SIMPLE"
          }
        ],
        "inputParameters": [],
        "outputParameters": {
          "workerOutput": "${simple_python_worker_ref_1.output}"
        },
        "schemaVersion": 2,
        "restartable": true,
        "ownerEmail": "example@example.com",
        "timeoutPolicy": "ALERT_ONLY",
        "timeoutSeconds": 0
      }'
    

Start new workflow

Now that you have defined a Task and a Workflow within Conductor, you should be able to run it.

Make a POST request to /workflow/{name} endpoint at your conductor server.

  • URL example: http://localhost:8080/api/workflow/simple_workflow_with_python_worker
    • Priority should be empty
    • Request body should be empty, like: {}
  • Command example:
    $ curl -X 'POST' \
        'http://localhost:8080/api/workflow/simple_workflow_with_python_worker' \
        -H 'accept: text/plain' \
        -H 'Content-Type: application/json' \
        -d '{}'
    
  • Create a bunch of workflows:
    $ export CREATE_WORKFLOW_SHORTCUT="curl -X 'POST' \
        'http://localhost:8080/api/workflow/simple_workflow_with_python_worker' \
        -H 'accept: text/plain' \
        -H 'Content-Type: application/json' \
        -d '{}' \
        -s"
    
    $ for idx in {1..100}; do \
        echo "Creating workflow ${idx}"; \
        workflow_id=$(eval "${CREATE_WORKFLOW_SHORTCUT}"); \
        echo "workflow_id=${workflow_id}"; \
      done
    
    • Expected output example:
      Creating workflow 1
      workflow_id=6dd2c86b-5ce6-487a-9a65-632139da1345
      Creating workflow 2
      workflow_id=b0ddfdcf-0c4a-4fd3-892c-97fc38c46d63
      ...
      

You should receive a Workflow ID at the Response body

  • Workflow ID example: 8ff0bc06-4413-4c94-b27a-b3210412a914

See workflow execution

Now you must be able to see its execution through the UI.

  • URL:
    • prefix: http://localhost:5001/execution
    • suffix: ${workflow_id}
  • Example: http://localhost:5001/execution/8ff0bc06-4413-4c94-b27a-b3210412a914

Unit Tests

Simple validation

/conductor-python/src$ python3 -m unittest -v
test_execute_task (tst.automator.test_task_runner.TestTaskRunner) ... ok
test_execute_task_with_faulty_execution_worker (tst.automator.test_task_runner.TestTaskRunner) ... ok
test_execute_task_with_invalid_task (tst.automator.test_task_runner.TestTaskRunner) ... ok

----------------------------------------------------------------------
Ran 3 tests in 0.001s

OK

Run with code coverage

/conductor-python/src$ python3 -m coverage run --source=conductor/ -m unittest

Report:

/conductor-python/src$ python3 -m coverage report

Visual coverage results:

/conductor-python/src$ python3 -m coverage html

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

conductor-python-1.0.7.tar.gz (59.4 kB view details)

Uploaded Source

Built Distribution

conductor_python-1.0.7-py3-none-any.whl (96.2 kB view details)

Uploaded Python 3

File details

Details for the file conductor-python-1.0.7.tar.gz.

File metadata

  • Download URL: conductor-python-1.0.7.tar.gz
  • Upload date:
  • Size: 59.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.10.1 pkginfo/1.8.2 requests/2.27.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.8.9

File hashes

Hashes for conductor-python-1.0.7.tar.gz
Algorithm Hash digest
SHA256 9d29097e11d9d05de39cc3ad6ec727688a0a6e4edaca75397cbc7aec64d9caa1
MD5 2b761bf0c6651cc726a33ec60d3b8ba7
BLAKE2b-256 9f1bcc9f93248416f147646bc7caedd96faf37bb02ecf5b284aba99590d24f49

See more details on using hashes here.

File details

Details for the file conductor_python-1.0.7-py3-none-any.whl.

File metadata

  • Download URL: conductor_python-1.0.7-py3-none-any.whl
  • Upload date:
  • Size: 96.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.10.1 pkginfo/1.8.2 requests/2.27.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.8.9

File hashes

Hashes for conductor_python-1.0.7-py3-none-any.whl
Algorithm Hash digest
SHA256 1db052ee39a1898a0d8d773c9bed5aab1489b1ddf6ed1c8386e5fcb346f7ee02
MD5 3f1ed8fb22f46a6d99f8cc6a5957f697
BLAKE2b-256 6ae3ec20f46bf632438f1fdcabc3097b248a799e6190d5b6cba00d79eca64661

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page