Orchestra SDK for updating self-hosted Tasks.
Project description
Orchestra Python SDK
This is a lightweight SDK that allows Orchestra to interact with Self-hosted applications (Tasks).
The basic premise is that your self-hosted Task can send back status updates and logs to Orchestra once triggered asynchronously by Orchestra. This is done via HTTP requests.
Installation
pip install orchestra-sdk
You initialise the package by creating an instance of the OrchestraSDK class. It requires an API key that will be used to connect with Orchestra - this can be found in your settings page. Orchestra will attempt to automatically set the other environment variables when the Task is triggered:
ORCHESTRA_TASK_RUN_ID: The UUID of the Task being executed
If these are not in your environment, you can set them manually after initialising the OrchestraSDK class.
There are also optional configuration values:
send_logs: send the contents of a log file to Orchestra, associated with the task (default = False)log_file_path: the path to the log file to send to Orchestra (default = "orchestra.log")- if your task is running in AWS Lambda, the log file must start with
/tmp/to be accessible - alternatively, use the
configure_aws_lambda_eventmethod
- if your task is running in AWS Lambda, the log file must start with
import os
from orchestra_sdk.orchestra import OrchestraSDK
# also: o = OrchestraSDK(api_key="os.environ.get("ORCHESTRA_API_KEY"))
orchestra = OrchestraSDK(api_key="your_api_key")
# If not set in the environment:
orchestra.task_run_id = "your_task_run_id"
Orchestra recommends retrieving the API key from some secret store that you have configured. If that is not possible, you can set the API key as an environment variable and read that value in your code.
Task decorator
The decorator will handle updating the Task in Orchestra automatically. It will send a RUNNING status update when the function is called, and then send a SUCCEEDED or FAILED status update when the function finishes.
@orchestra.run()
def my_function(arg1, arg2=1):
print("Running complex process")
- It will send a
RUNNINGstatus update to Orchestra - Your function will then run
- If an exception is raised, the decorator will send a
FAILEDstatus update to Orchestra - If the function finishes without an error being raised, regardless of the return value, the decorator will send a
SUCCEEDEDstatus update to Orchestra - If
send_logsis enabled, the contents of the logs will also be sent.
AWS Lambda
If you are using the AWS Lambda Task type from Orchestra, you can use the following helper function to ensure the correct configuration has been applied:
from orchestra_sdk.orchestra import OrchestraSDK
orchestra = OrchestraSDK(api_key="your_api_key")
@orchestra.run()
def function_to_monitor():
# Your code here
pass
def handler(event, context):
orchestra.configure_aws_lambda_event(event)
function_to_monitor()
This will automatically configure the ORCHESTRA_TASK_RUN_ID environment variable and configure the log file correctly (if send_logs=True).
NOTE: if log_file is set, configure_aws_lambda_event will override the log file path.
Updating Tasks manually
For additional control over when to update the status of the Task, or for sending messages to Orchestra, you can use the update_task method of the OrchestraSDK class.
from orchestra_sdk.enum import TaskRunStatus
from orchestra_sdk.orchestra import OrchestraSDK
orchestra = OrchestraSDK(api_key="your_api_key")
def my_function(arg1, arg2=1):
print("Start my complex process")
orchestra.update_task(status=TaskRunStatus.RUNNING, message="Starting process.")
print("Running complex process")
fn_result = complex_process()
if fn_result == 0:
orchestra.update_task(status=TaskRunStatus.SUCCEEDED)
else:
orchestra.update_task(status=TaskRunStatus.FAILED, message="Process failed")
- If the function fails or throws an exception, Orchestra might not register that the Task has failed, which could have downstream consequences on your pipeline. Consider wrapping your function in a try/except block and calling
update_taskwithstatus=TaskRunStatus.FAILEDin the except block.
Sending logs
To send logs associated to the Task, enable the send_logs flag when initialising the OrchestraSDK class. The logs will be sent to Orchestra when the Task finishes and the decorator is being used.
An example logging configuration is shown below:
import logging
import sys
from orchestra_sdk.orchestra import OrchestraSDK
orchestra = OrchestraSDK(
api_key="your_api_key",
send_logs=True,
log_file="a.log" # for certain environments, this may need to start with "/tmp/"
)
def test_function():
# Setup logging configuration
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# File handler
file_handler = logging.FileHandler(orchestra.log_file)
file_handler.setLevel(logging.INFO)
file_formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
file_handler.setFormatter(file_formatter)
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
console_handler.setFormatter(console_formatter)
# Adding handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.info("Hello, World!")
Setting outputs
To set outputs associated to the Task, use the set_output method of the OrchestraSDK class.
Output names are case-insensitive and must only contain letters, underscores (_), and hyphens (-). Numbers and spaces are not allowed.
set_output must be called before updating the task status to SUCCEEDED or FAILED.
from orchestra_sdk.enum import TaskRunStatus
from orchestra_sdk.orchestra import OrchestraSDK
orchestra = OrchestraSDK(api_key="your_api_key")
def my_function(arg1, arg2=1):
print("Start my complex process")
fn_result = complex_process()
orchestra.set_output(name='result', value=fn_result)
The function returns true/false for if the output was successfully set. We recommend checking the return value and logging failures to ensure outputs are correctly set.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file orchestra_sdk-0.1.2.tar.gz.
File metadata
- Download URL: orchestra_sdk-0.1.2.tar.gz
- Upload date:
- Size: 12.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1eb8dea1a33a04f2fa84f8bd418bf982729d46bcde2005750ef19404e30825eb
|
|
| MD5 |
28152ea2fa5e3d6a772998d9d8f05ac7
|
|
| BLAKE2b-256 |
b846da9da18f8b38e27688b4fd0218b5dd08eca51c86eb441dac7a3fc8e0452d
|
File details
Details for the file orchestra_sdk-0.1.2-py3-none-any.whl.
File metadata
- Download URL: orchestra_sdk-0.1.2-py3-none-any.whl
- Upload date:
- Size: 9.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e98a1d81a1e6410d2b5999188af731d711c210624d8797e65842a8f8abef2ea1
|
|
| MD5 |
c22e331301aa60f44d683a586ce6a8d4
|
|
| BLAKE2b-256 |
cef82690102ee4b9dc248f9abd056b1a5c600116240e8fc243bb34c677ae652f
|