Orchestra SDK for updating self-hosted Tasks.
Project description
Orchestra Python SDK
This is a lightweight SDK that allows Orchestra (getorchestra.io) to interact with self-hosted applications (Tasks).
The basic premise is for your self-hosted Task to send back status updates and logs to Orchestra. This is done via a HTTP request. The Task is started by Orchestra.
Local Development
- Create a Python3 virtual environment:
python3 -m venv venv
- Activate the virtual environment:
source venv/bin/activate
- Run
make install-dev
At this point, you can start making changes to the code. Once completed:
- Run
make test
- Update
pyproject.toml
with the new version number and any other PyPi updates - Run
make build
To upload to PyPi, make sure you have API tokens set in your local .pypirc
file. It should look like this:
[testpypi]
username = __token__
password = <API_TOKEN>
[pypi]
username = __token__
password = <API_TOKEN>
- Run
make release-test
to confirm it can upload to Test PyPi as expected - Run
make release
to upload to PyPi
Quickstart guide
Firstly, install the package:
pip install orchestra-sdk
Note - to install the test package from Test PyPi, use the following command:
pip install -i https://test.pypi.org/simple/ orchestra-sdk
To use the package:
from orchestra_sdk.orcProcess import orcProcess
def run_example_process(correlation_id, creds):
orcProcessInstance = orcProcess(correlation_id, creds)
print('Starting complicated process')
try:
print('Trying something complicated')
raise Exception
except Exception as e:
print('Failed to do something complicated')
orcProcessInstance.sendFailure(message = str(e))
finally:
print('Completed')
orcProcessInstance.sendCompletion(message = 'Completed', custom_data={'I sent this from':'my computer'})
creds = {'apikey':'my_api_key', 'clientid':'my_client_id'}
run_example_process('correlation_id', creds)
Summary
- The function takes a correlation id and a dictionary called 'creds'. The correlation id is a uuid that will be passed to the app by Orchestra when the app is triggered, and corresponds to a pipeline element / task. The creds object is a dictionary that contains an 'apikey' key that should be securely stored and accessed via your app e.g. using a local config file, environment variables, but ideally by interacting with a key vault
- To kick off the process, add the code
orcProcessInstance = orcProcess(correlation_id, creds)
. This lets Orchestra know the process has started - Use try/except/finally logic to ensure Orchestra knows when parts of the pipeline element / task fail or succeed.
Use the methods
sendFailure
andsendCompletion
to indicate whether stages fail. Note; arbitrary data can be appended to status updates using the "data" parameter. You do not need to include status codes or times in the "data" parameter; Orchestra calculates these by default.
Note; it's feasible that the above structure requires relatively deep nesting of try/except logic which can lead to much lengthier code. It is perfectly fine to have a structure like below:
def run_something_complicated():
pass
def action_on_app_trigger(correlation_id, creds):
orcProcessInstance = orcProcess(correlation_id, creds)
try:
run_something_complicated()
except BaseException as e:
orcProcessInstance.sendFailure(message = str(e), data={'some':'arbitrary stuff'})
finally:
print('Completed')
orcProcessInstance.sendCompletion(message = 'Completed')
Which ensures that no matter what code is executed in run_something_complicated
, Orchestra will know what happens
to the job.
Quick-start; Fast API
For Production usage, you should have an App Service that accepts HTTP requests and contains all the logic you need to carry out the pipeline/element task.
Firstly, ensure there is an endpoint that can be hit / triggered by Orchestra for the task you wish to carry out:
from fastapi import Depends, FastAPI
from services.data_ingestion import dataIngester
from services.creds_manager import credsManager
from orchestra_sdk.orcProcess import orcProcess
from routers import base
app = FastAPI()
app.include_router(base.router)
@app.post("/kickOffIngestion")
async def root(body= Body()):
creds = credsManager.get_creds()
orcProcessInstance = orcProcess(body.correlation_id, creds)
ingester__ = dataIngester()
try:
ingester__.run()
except BaseException as e:
orcProcessInstance.sendFailure(message = str(e), data={'some':'arbitrary stuff'})
finally:
print('Completed')
orcProcessInstance.sendCompletion(message = 'Completed')
return {"message": "Process kicked off successfully"}
There are a few things happening here:
- You have a credentials service that securely fetches your Orchestra API Key, which is being imported and handles the
generation of the
creds
object - You instantiate the
orcProcess
by using the creds object in (1) but also by accessing the correlation ID, which is passed to you in the body of the request usnig FastAPI'sBody()
function - You nest the function call
ingeter__.run()
within try / except / finally to ensure Orchestra has the relevant information to know how the task is performing - Bonus: the code above is not actually asynchronous although the function is defined with async def, and will only
return the response once
ingester.run()
has finished running. It is good API practice to make the code execute and return a response immediately for Post-like requests
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for orchestra_sdk-0.0.9-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | ab1513c8acbfd04a6b294c639f548f2ea3b7b3e6a4d7a7c868ea02013e72cfbe |
|
MD5 | 7ddd24d1d8fb3e3e931fb62a98e7eca3 |
|
BLAKE2b-256 | 56bd95521e424be0877952ade33588e72443ae472ee5b841cb5cdc87a1cd5517 |