Skip to main content

A GRPC Library for Netflix Conductor

Project description

pyconductor-grpc

Python GRPC client for Netflix's Conductor

Installation

pip install pyconductor_grpc

Usage

Example task definition JSON:

[
  {
    "name": "sleep",
    "retryCount": 3,
    "timeoutSeconds": 20,
    "pollTimeoutSeconds": 36,
    "inputKeys": ["duration", "rem"],
    "outputKeys": ["rem"],
    "timeoutPolicy": "TIME_OUT_WF",
    "retryLogic": "FIXED",
    "retryDelaySeconds": 6,
    "responseTimeoutSeconds": 19,
    "concurrentExecLimit": null,
    "rateLimitFrequencyInSeconds": null,
    "rateLimitPerFrequency": null,
    "ownerEmail": "example@test.com"
  }
]

Example workflow definition JSON:

{
  "name": "my_workflow",
  "description": "My Workflow",
  "version": 3,
  "tasks": [
    {
      "name": "sleep",
      "taskReferenceName": "heavy-sleeper",
      "type": "SIMPLE",
      "inputParameters": {
        "duration": 10.0,
        "rem": true
      }
    }
  ],
  "outputParameters": {
    "rem": "${heavySleeper.output.rem}"
  },
  "ownerEmail": "example@test.com"
}

Upload the definitions:

import requests
base_url = 'http://localhost/api'
headers = {'content-type': 'application/json'}
payload = open("tasks.json")
r = requests.post(f'{base_url}/metadata/taskdefs', data=payload, headers=headers)
print(r.status_code, r.text)
payload = open("workflow.json")
r = requests.post(f'{base_url}/metadata/workflow', data=payload, headers=headers)
print(r.status_code, r.text)

TaskWorker usage:

import time
from pyconductor_grpc.grpc_client import TaskService, TaskWorker


# Define task client
task_service = TaskService()
"""
Specify a connection URI or a custom channel object.
If a URI is specified, a channel will automatically be created.
If a channel object is specified, the URI will be ignored.
Default is to connect to localhost:8090

:param connection_uri: URI of Conductor API
:param channel: gRPC channel
"""


# Define work function. Input data dict keys come pre-expanded as function parameters.
def sleep_work_function(duration: int, rem: bool) -> dict:
    time.sleep(duration)
    output_data = {'rem': rem}
    return output_data


# Work. This worker will spawn up to 10 work threads
# Each task is passed to the work function individually.
with TaskWorker(
    task_service=task_service,
    task_type='sleep',
    work_function=sleep_work_function,
    work_concurrency=10,
) as worker:
    worker.work()

BatchTaskWorker usage:

import time
import typing
from pyconductor_grpc.grpc_client import BatchTaskWorker, ConductorTask, ConductorTaskResult, TaskService


# Define task client
task_service = TaskService()


# Define work function. Inputs are all tasks in a batch. Outputs are all task results.
def sleep_work_function(tasks: typing.Sequence[ConductorTask]) -> typing.Sequence[ConductorTaskResult]:
    # intentionally sleeping many times in series just to demonstrate batch worker
    task_results = []
    for task in tasks:
        # each task dict obeys JSON equivalent of schema conductor.proto.Task (same keys but in camel case)
        input_data = task['inputData']
        time.sleep(input_data['duration'])
        output_data = {'rem': input_data['rem']}  # this must be a dictionary
        task_result = ConductorTaskResult(
            taskId=task['taskId'],
            workflowInstanceId=task['workflowInstanceId'],
            outputData=output_data,
            status='COMPLETED',
        )
        # task_result = ConductorTaskResult(
        #     taskId=task['taskId'],
        #     workflowInstanceId=task['workflowInstanceId'],
        #     status='FAILED',
        #     reasonForIncompletion='this is why it failed',
        # )
        task_results.append(task_result)
    return task_results


# Work. This worker will work on up to 10 batches in parallel, with each batch containing up to 1000 tasks.
# Each batch of 1000 will be passed to the work function.
# Within each batch, the worker will make up to 20 concurrent requests to the Conductor API.
with BatchTaskWorker(
    batch_size=1000,
    batch_threads=20,
    task_service=task_service,
    task_type='sleep',
    work_function=sleep_work_function,
    work_concurrency=10,
) as worker:
    worker.work()

Kick off a workflow instance and monitor for a done state:

import time
from pyconductor_grpc.grpc_client import WorkflowService


workflow_service = WorkflowService()  # same inputs as TaskService
workflow_id = workflow_service.start_workflow('annotation_workflow', input={'variant': {'start': '1', 'end': '3', 'chrom': '5'}})
status = None
while status is None:
    time.sleep(1)
    workflow_output = workflow_client.get_workflow_status(workflow_id)
    status = workflow_ouput.get('status')

successful = status == 'COMPLETED'
output = workflow_output['output']

Code generation

To rebuild the proto files and autogenerated python grpc sdk

scripts/generate-code.sh

Notes

The generated Python message types verify field types when objects are constructed from them. For example:

>>> from model.task_result_pb2 import TaskResult
>>> TaskResult(task_id=1)
TypeError: Cannot set conductor.proto.TaskResult.task_id to 0: 0 has type <class 'int'>, but expected one of: (<class 'bytes'>, <class 'str'>) for field TaskResult.task_id
>>> TaskResult(task_id='1')
>>>

Publishing

To build and upload:

pip install --upgrade build twine
python -m build
twine upload dist/*

or

pipx install build
pipx install twine
pyproject-build
twine upload dist/*

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyconductor_grpc-0.2.0.tar.gz (51.5 kB view details)

Uploaded Source

Built Distribution

pyconductor_grpc-0.2.0-py3-none-any.whl (78.0 kB view details)

Uploaded Python 3

File details

Details for the file pyconductor_grpc-0.2.0.tar.gz.

File metadata

  • Download URL: pyconductor_grpc-0.2.0.tar.gz
  • Upload date:
  • Size: 51.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.3.0 pkginfo/1.7.0 requests/2.25.1 setuptools/54.1.1 requests-toolbelt/0.9.1 tqdm/4.56.2 CPython/3.9.1

File hashes

Hashes for pyconductor_grpc-0.2.0.tar.gz
Algorithm Hash digest
SHA256 9eb849e82e84f788f95d466a504df9c396fce1f1c16ed5a59b2166994c689167
MD5 978322625991311059ba075b2509235a
BLAKE2b-256 a211c160de46562745c826f5b1044f450e029f7108a9d68b2707819515c9a6a6

See more details on using hashes here.

File details

Details for the file pyconductor_grpc-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: pyconductor_grpc-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 78.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.3.0 pkginfo/1.7.0 requests/2.25.1 setuptools/54.1.1 requests-toolbelt/0.9.1 tqdm/4.56.2 CPython/3.9.1

File hashes

Hashes for pyconductor_grpc-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c07b6f21f92072b6eaee968786ffafdaa4aa19edc938c39edb3880e4e5ea0342
MD5 458134b8726b50d73ab7b8208c2cf6f7
BLAKE2b-256 6a329f9dc95ec8b725e81088f57061b967623d8886a08998df8802517cadbabb

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page