Skip to main content

A GRPC Library for Netflix Conductor

Project description

pyconductor-grpc

Python GRPC client for Netflix's Conductor

Installation

pip install pyconductor_grpc

Usage

Example task definition JSON:

[
  {
    "name": "sleep",
    "retryCount": 3,
    "timeoutSeconds": 20,
    "pollTimeoutSeconds": 36,
    "inputKeys": ["duration", "rem"],
    "outputKeys": ["rem"],
    "timeoutPolicy": "TIME_OUT_WF",
    "retryLogic": "FIXED",
    "retryDelaySeconds": 6,
    "responseTimeoutSeconds": 19,
    "concurrentExecLimit": null,
    "rateLimitFrequencyInSeconds": null,
    "rateLimitPerFrequency": null,
    "ownerEmail": "example@test.com"
  }
]

Example workflow definition JSON:

{
  "name": "my_workflow",
  "description": "My Workflow",
  "version": 3,
  "tasks": [
    {
      "name": "sleep",
      "taskReferenceName": "heavy-sleeper",
      "type": "SIMPLE",
      "inputParameters": {
        "duration": 10.0,
        "rem": true
      }
    }
  ],
  "outputParameters": {
    "rem": "${heavySleeper.output.rem}"
  },
  "ownerEmail": "example@test.com"
}

Upload the definitions:

import requests
base_url = 'http://localhost/api'
headers = {'content-type': 'application/json'}
payload = open("tasks.json")
r = requests.post(f'{base_url}/metadata/taskdefs', data=payload, headers=headers)
print(r.status_code, r.text)
payload = open("workflow.json")
r = requests.post(f'{base_url}/metadata/workflow', data=payload, headers=headers)
print(r.status_code, r.text)

TaskWorker usage:

import time
from pyconductor_grpc.grpc_client import TaskService, TaskWorker


# Define task client
task_service = TaskService()
"""
Specify a connection URI or a custom channel object.
If a URI is specified, a channel will automatically be created.
If a channel object is specified, the URI will be ignored.
Default is to connect to localhost:8090

:param connection_uri: URI of Conductor API
:param channel: gRPC channel
"""


# Define work function. Input data dict keys come pre-expanded as function parameters.
def sleep_work_function(duration: int, rem: bool) -> dict:
    time.sleep(duration)
    output_data = {'rem': rem}
    return output_data


# Work. This worker will spawn up to 10 work threads
# Each task is passed to the work function individually.
with TaskWorker(
    task_service=task_service,
    task_type='sleep',
    work_function=sleep_work_function,
    work_concurrency=10,
) as worker:
    worker.work()

BatchTaskWorker usage:

import time
import typing
from pyconductor_grpc.grpc_client import BatchTaskWorker, ConductorTask, ConductorTaskResult, TaskService


# Define task client
task_service = TaskService()


# Define work function. Inputs are all tasks in a batch. Outputs are all task results.
def sleep_work_function(tasks: typing.Sequence[ConductorTask]) -> typing.Sequence[ConductorTaskResult]:
    # intentionally sleeping many times in series just to demonstrate batch worker
    task_results = []
    for task in tasks:
        # each task dict obeys JSON equivalent of schema conductor.proto.Task (same keys but in camel case)
        input_data = task['inputData']
        time.sleep(input_data['duration'])
        output_data = {'rem': input_data['rem']}  # this must be a dictionary
        task_result = ConductorTaskResult(
            taskId=task['taskId'],
            workflowInstanceId=task['workflowInstanceId'],
            outputData=output_data,
            status='COMPLETED',
        )
        # task_result = ConductorTaskResult(
        #     taskId=task['taskId'],
        #     workflowInstanceId=task['workflowInstanceId'],
        #     status='FAILED',
        #     reasonForIncompletion='this is why it failed',
        # )
        task_results.append(task_result)
    return task_results


# Work. This worker will work on up to 10 batches in parallel, with each batch containing up to 1000 tasks.
# Each batch of 1000 will be passed to the work function.
# Within each batch, the worker will make up to 20 concurrent requests to the Conductor API.
with BatchTaskWorker(
    batch_size=1000,
    batch_threads=20,
    task_service=task_service,
    task_type='sleep',
    work_function=sleep_work_function,
    work_concurrency=10,
) as worker:
    worker.work()

Kick off a workflow instance and monitor for a done state:

import time
from pyconductor_grpc.grpc_client import WorkflowService


workflow_service = WorkflowService()  # same inputs as TaskService
workflow_id = workflow_service.start_workflow('annotation_workflow', input={'variant': {'start': '1', 'end': '3', 'chrom': '5'}})
status = None
while status is None:
    time.sleep(1)
    workflow_output = workflow_client.get_workflow_status(workflow_id)
    status = workflow_ouput.get('status')

successful = status == 'COMPLETED'
output = workflow_output['output']

Code generation

To rebuild the proto files and autogenerated python grpc sdk

scripts/generate-code.sh

Notes

The generated Python message types verify field types when objects are constructed from them. For example:

>>> from model.task_result_pb2 import TaskResult
>>> TaskResult(task_id=1)
TypeError: Cannot set conductor.proto.TaskResult.task_id to 0: 0 has type <class 'int'>, but expected one of: (<class 'bytes'>, <class 'str'>) for field TaskResult.task_id
>>> TaskResult(task_id='1')
>>>

Publishing

To build and upload:

pip install --upgrade build twine
python -m build
twine upload dist/*

or

pipx install build
pipx install twine
pyproject-build
twine upload dist/*

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyconductor_grpc-0.2.0.tar.gz (51.5 kB view hashes)

Uploaded Source

Built Distribution

pyconductor_grpc-0.2.0-py3-none-any.whl (78.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page