A GRPC Library for Netflix Conductor
Project description
pyconductor-grpc
Python GRPC client for Netflix's Conductor
Installation
pip install pyconductor_grpc
Usage
Example task definition JSON:
[
{
"name": "sleep",
"retryCount": 3,
"timeoutSeconds": 20,
"pollTimeoutSeconds": 36,
"inputKeys": ["duration", "rem"],
"outputKeys": ["rem"],
"timeoutPolicy": "TIME_OUT_WF",
"retryLogic": "FIXED",
"retryDelaySeconds": 6,
"responseTimeoutSeconds": 19,
"concurrentExecLimit": null,
"rateLimitFrequencyInSeconds": null,
"rateLimitPerFrequency": null,
"ownerEmail": "example@test.com"
}
]
Example workflow definition JSON:
{
"name": "my_workflow",
"description": "My Workflow",
"version": 3,
"tasks": [
{
"name": "sleep",
"taskReferenceName": "heavy-sleeper",
"type": "SIMPLE",
"inputParameters": {
"duration": 10.0,
"rem": true
}
}
],
"outputParameters": {
"rem": "${heavySleeper.output.rem}"
},
"ownerEmail": "example@test.com"
}
Upload the definitions:
import requests
base_url = 'http://localhost/api'
headers = {'content-type': 'application/json'}
payload = open("tasks.json")
r = requests.post(f'{base_url}/metadata/taskdefs', data=payload, headers=headers)
print(r.status_code, r.text)
payload = open("workflow.json")
r = requests.post(f'{base_url}/metadata/workflow', data=payload, headers=headers)
print(r.status_code, r.text)
TaskWorker usage:
import time
from pyconductor_grpc.grpc_client import TaskService, TaskWorker
# Define task client
task_service = TaskService()
"""
Specify a connection URI or a custom channel object.
If a URI is specified, a channel will automatically be created.
If a channel object is specified, the URI will be ignored.
Default is to connect to localhost:8090
:param connection_uri: URI of Conductor API
:param channel: gRPC channel
"""
# Define work function. Input data dict keys come pre-expanded as function parameters.
def sleep_work_function(duration: int, rem: bool) -> dict:
time.sleep(duration)
output_data = {'rem': rem}
return output_data
# Work. This worker will spawn up to 10 work threads
# Each task is passed to the work function individually.
with TaskWorker(
task_service=task_service,
task_type='sleep',
work_function=sleep_work_function,
work_concurrency=10,
) as worker:
worker.work()
BatchTaskWorker usage:
import time
import typing
from pyconductor_grpc.grpc_client import BatchTaskWorker, ConductorTask, ConductorTaskResult, TaskService
# Define task client
task_service = TaskService()
# Define work function. Inputs are all tasks in a batch. Outputs are all task results.
def sleep_work_function(tasks: typing.Sequence[ConductorTask]) -> typing.Sequence[ConductorTaskResult]:
# intentionally sleeping many times in series just to demonstrate batch worker
task_results = []
for task in tasks:
# each task dict obeys JSON equivalent of schema conductor.proto.Task (same keys but in camel case)
input_data = task['inputData']
time.sleep(input_data['duration'])
output_data = {'rem': input_data['rem']} # this must be a dictionary
task_result = ConductorTaskResult(
taskId=task['taskId'],
workflowInstanceId=task['workflowInstanceId'],
outputData=output_data,
status='COMPLETED',
)
# task_result = ConductorTaskResult(
# taskId=task['taskId'],
# workflowInstanceId=task['workflowInstanceId'],
# status='FAILED',
# reasonForIncompletion='this is why it failed',
# )
task_results.append(task_result)
return task_results
# Work. This worker will work on up to 10 batches in parallel, with each batch containing up to 1000 tasks.
# Each batch of 1000 will be passed to the work function.
# Within each batch, the worker will make up to 20 concurrent requests to the Conductor API.
with BatchTaskWorker(
batch_size=1000,
batch_threads=20,
task_service=task_service,
task_type='sleep',
work_function=sleep_work_function,
work_concurrency=10,
) as worker:
worker.work()
Kick off a workflow instance and monitor for a done state:
import time
from pyconductor_grpc.grpc_client import WorkflowService
workflow_service = WorkflowService() # same inputs as TaskService
workflow_id = workflow_service.start_workflow('annotation_workflow', input={'variant': {'start': '1', 'end': '3', 'chrom': '5'}})
status = None
while status is None:
time.sleep(1)
workflow_output = workflow_client.get_workflow_status(workflow_id)
status = workflow_ouput.get('status')
successful = status == 'COMPLETED'
output = workflow_output['output']
Code generation
To rebuild the proto files and autogenerated python grpc sdk
scripts/generate-code.sh
Notes
The generated Python message types verify field types when objects are constructed from them. For example:
>>> from model.task_result_pb2 import TaskResult
>>> TaskResult(task_id=1)
TypeError: Cannot set conductor.proto.TaskResult.task_id to 0: 0 has type <class 'int'>, but expected one of: (<class 'bytes'>, <class 'str'>) for field TaskResult.task_id
>>> TaskResult(task_id='1')
>>>
Publishing
To build and upload:
pip install --upgrade build twine
python -m build
twine upload dist/*
or
pipx install build
pipx install twine
pyproject-build
twine upload dist/*
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyconductor_grpc-0.2.0.tar.gz.
File metadata
- Download URL: pyconductor_grpc-0.2.0.tar.gz
- Upload date:
- Size: 51.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.3.0 pkginfo/1.7.0 requests/2.25.1 setuptools/54.1.1 requests-toolbelt/0.9.1 tqdm/4.56.2 CPython/3.9.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9eb849e82e84f788f95d466a504df9c396fce1f1c16ed5a59b2166994c689167
|
|
| MD5 |
978322625991311059ba075b2509235a
|
|
| BLAKE2b-256 |
a211c160de46562745c826f5b1044f450e029f7108a9d68b2707819515c9a6a6
|
File details
Details for the file pyconductor_grpc-0.2.0-py3-none-any.whl.
File metadata
- Download URL: pyconductor_grpc-0.2.0-py3-none-any.whl
- Upload date:
- Size: 78.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.3.0 pkginfo/1.7.0 requests/2.25.1 setuptools/54.1.1 requests-toolbelt/0.9.1 tqdm/4.56.2 CPython/3.9.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c07b6f21f92072b6eaee968786ffafdaa4aa19edc938c39edb3880e4e5ea0342
|
|
| MD5 |
458134b8726b50d73ab7b8208c2cf6f7
|
|
| BLAKE2b-256 |
6a329f9dc95ec8b725e81088f57061b967623d8886a08998df8802517cadbabb
|