A GRPC Library for Netflix Conductor
Project description
pyconductor-grpc
Python GRPC client for Netflix's Conductor
Installation
pip install pyconductor_grpc
Usage
Example task definition JSON:
[
{
"name": "sleep",
"retryCount": 3,
"timeoutSeconds": 20,
"pollTimeoutSeconds": 36,
"inputKeys": ["duration", "rem"],
"outputKeys": ["rem"],
"timeoutPolicy": "TIME_OUT_WF",
"retryLogic": "FIXED",
"retryDelaySeconds": 6,
"responseTimeoutSeconds": 19,
"concurrentExecLimit": null,
"rateLimitFrequencyInSeconds": null,
"rateLimitPerFrequency": null,
"ownerEmail": "example@test.com"
}
]
Example workflow definition JSON:
{
"name": "my_workflow",
"description": "My Workflow",
"version": 3,
"tasks": [
{
"name": "sleep",
"taskReferenceName": "heavy-sleeper",
"type": "SIMPLE",
"inputParameters": {
"duration": 10.0,
"rem": true
}
}
],
"outputParameters": {
"rem": "${heavySleeper.output.rem}"
},
"ownerEmail": "example@test.com"
}
Upload the definitions:
import requests
base_url = 'http://localhost/api'
headers = {'content-type': 'application/json'}
payload = open("tasks.json")
r = requests.post(f'{base_url}/metadata/taskdefs', data=payload, headers=headers)
print(r.status_code, r.text)
payload = open("workflow.json")
r = requests.post(f'{base_url}/metadata/workflow', data=payload, headers=headers)
print(r.status_code, r.text)
TaskWorker usage:
import time
from pyconductor_grpc.grpc_client import TaskService, TaskWorker
# Define task client
task_service = TaskService()
"""
Specify a connection URI or a custom channel object.
If a URI is specified, a channel will automatically be created.
If a channel object is specified, the URI will be ignored.
Default is to connect to localhost:8090
:param connection_uri: URI of Conductor API
:param channel: gRPC channel
"""
# Define work function. Input data dict keys come pre-expanded as function parameters.
def sleep_work_function(duration: int, rem: bool) -> dict:
time.sleep(duration)
output_data = {'rem': rem}
return output_data
# Work. This worker will spawn up to 10 work threads
# Each task is passed to the work function individually.
with TaskWorker(
task_service=task_service,
task_type='sleep',
work_function=sleep_work_function,
work_concurrency=10,
) as worker:
worker.work()
BatchTaskWorker usage:
import time
import typing
from pyconductor_grpc.grpc_client import BatchTaskWorker, ConductorTask, ConductorTaskResult, TaskService
# Define task client
task_service = TaskService()
# Define work function. Inputs are all tasks in a batch. Outputs are all task results.
def sleep_work_function(tasks: typing.Sequence[ConductorTask]) -> typing.Sequence[ConductorTaskResult]:
# intentionally sleeping many times in series just to demonstrate batch worker
task_results = []
for task in tasks:
# each task dict obeys JSON equivalent of schema conductor.proto.Task (same keys but in camel case)
input_data = task['inputData']
time.sleep(input_data['duration'])
output_data = {'rem': input_data['rem']} # this must be a dictionary
task_result = ConductorTaskResult(
taskId=task['taskId'],
workflowInstanceId=task['workflowInstanceId'],
outputData=output_data,
status='COMPLETED',
)
# task_result = ConductorTaskResult(
# taskId=task['taskId'],
# workflowInstanceId=task['workflowInstanceId'],
# status='FAILED',
# reasonForIncompletion='this is why it failed',
# )
task_results.append(task_result)
return task_results
# Work. This worker will work on up to 10 batches in parallel, with each batch containing up to 1000 tasks.
# Each batch of 1000 will be passed to the work function.
# Within each batch, the worker will make up to 20 concurrent requests to the Conductor API.
with BatchTaskWorker(
batch_size=1000,
batch_threads=20,
task_service=task_service,
task_type='sleep',
work_function=sleep_work_function,
work_concurrency=10,
) as worker:
worker.work()
Kick off a workflow instance and monitor for a done state:
import time
from pyconductor_grpc.grpc_client import WorkflowService
workflow_service = WorkflowService() # same inputs as TaskService
workflow_id = workflow_service.start_workflow('annotation_workflow', input={'variant': {'start': '1', 'end': '3', 'chrom': '5'}})
status = None
while status is None:
time.sleep(1)
workflow_output = workflow_client.get_workflow_status(workflow_id)
status = workflow_ouput.get('status')
successful = status == 'COMPLETED'
output = workflow_output['output']
Code generation
To rebuild the proto files and autogenerated python grpc sdk
scripts/generate-code.sh
Notes
The generated Python message types verify field types when objects are constructed from them. For example:
>>> from model.task_result_pb2 import TaskResult
>>> TaskResult(task_id=1)
TypeError: Cannot set conductor.proto.TaskResult.task_id to 0: 0 has type <class 'int'>, but expected one of: (<class 'bytes'>, <class 'str'>) for field TaskResult.task_id
>>> TaskResult(task_id='1')
>>>
Publishing
To build and upload:
pip install --upgrade build twine
python -m build
twine upload dist/*
or
pipx install build
pipx install twine
pyproject-build
twine upload dist/*
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pyconductor_grpc-0.2.0.tar.gz
(51.5 kB
view details)
Built Distribution
File details
Details for the file pyconductor_grpc-0.2.0.tar.gz
.
File metadata
- Download URL: pyconductor_grpc-0.2.0.tar.gz
- Upload date:
- Size: 51.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.3.0 pkginfo/1.7.0 requests/2.25.1 setuptools/54.1.1 requests-toolbelt/0.9.1 tqdm/4.56.2 CPython/3.9.1
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9eb849e82e84f788f95d466a504df9c396fce1f1c16ed5a59b2166994c689167 |
|
MD5 | 978322625991311059ba075b2509235a |
|
BLAKE2b-256 | a211c160de46562745c826f5b1044f450e029f7108a9d68b2707819515c9a6a6 |
File details
Details for the file pyconductor_grpc-0.2.0-py3-none-any.whl
.
File metadata
- Download URL: pyconductor_grpc-0.2.0-py3-none-any.whl
- Upload date:
- Size: 78.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.3.0 pkginfo/1.7.0 requests/2.25.1 setuptools/54.1.1 requests-toolbelt/0.9.1 tqdm/4.56.2 CPython/3.9.1
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c07b6f21f92072b6eaee968786ffafdaa4aa19edc938c39edb3880e4e5ea0342 |
|
MD5 | 458134b8726b50d73ab7b8208c2cf6f7 |
|
BLAKE2b-256 | 6a329f9dc95ec8b725e81088f57061b967623d8886a08998df8802517cadbabb |