Skip to main content

Orchestra SDK for updating self-hosted Tasks.

Project description

Orchestra Python SDK

PyPI

This is a lightweight SDK that allows Orchestra to interact with Self-hosted applications (Tasks).

The basic premise is that your self-hosted Task can send back status updates and logs to Orchestra once triggered asynchronously by Orchestra. This is done via HTTP requests.

Installation

pip install orchestra-sdk

You initialise the package by creating an instance of the OrchestraSDK class. It requires an API key that will be used to connect with Orchestra - this can be found in your settings page. Orchestra will attempt to automatically set the other environment variables when the Task is triggered:

  • ORCHESTRA_TASK_RUN_ID: The UUID of the Task being executed

If these are not in your environment, you can set them manually after initialising the OrchestraSDK class.

There are also optional configuration values:

  • send_logs: send the contents of a log file to Orchestra, associated with the task (default = False)
  • log_file_path: the path to the log file to send to Orchestra (default = "orchestra.log")
import os

from orchestra_sdk.orchestra import OrchestraSDK

# also: o = OrchestraSDK(api_key="os.environ.get("ORCHESTRA_API_KEY"))
orchestra = OrchestraSDK(api_key="your_api_key")

# If not set in the environment:
orchestra.task_run_id = "your_task_run_id"

Orchestra recommends retrieving the API key from some secret store that you have configured. If that is not possible, you can set the API key as an environment variable and read that value in your code.

Task decorator

The decorator will handle updating the Task in Orchestra automatically. It will send a RUNNING status update when the function is called, and then send a SUCCEEDED or FAILED status update when the function finishes.

@orchestra.run()
def my_function(arg1, arg2=1):
    print("Running complex process")
  1. It will send a RUNNING status update to Orchestra
  2. Your function will then run
  3. If an exception is raised, the decorator will send a FAILED status update to Orchestra
  4. If the function finishes without an error being raised, regardless of the return value, the decorator will send a SUCCEEDED status update to Orchestra
  5. If send_logs is enabled, the contents of the logs will also be sent.

AWS Lambda

If you are using the AWS Lambda Task type from Orchestra, you can use the following helper function to ensure the correct configuration has been applied:

from orchestra_sdk.orchestra import OrchestraSDK

orchestra = OrchestraSDK(api_key="your_api_key")

@orchestra.run()
def function_to_monitor():
    # Your code here
    pass

def handler(event, context):
    orchestra.configure_aws_lambda_event(event)
    function_to_monitor()

This will automatically configure the ORCHESTRA_TASK_RUN_ID environment variable and configure the log file correctly (if send_logs=True).

NOTE: if log_file is set, configure_aws_lambda_event will override the log file path.

Updating Tasks manually

For additional control over when to update the status of the Task, or for sending messages to Orchestra, you can use the update_task method of the OrchestraSDK class.

from orchestra_sdk.enum import TaskRunStatus
from orchestra_sdk.orchestra import OrchestraSDK

orchestra = OrchestraSDK(api_key="your_api_key")

def my_function(arg1, arg2=1):
    print("Start my complex process")
    orchestra.update_task(status=TaskRunStatus.RUNNING, message="Starting process.")

    print("Running complex process")

    fn_result = complex_process()

    if fn_result == 0:
        orchestra.update_task(status=TaskRunStatus.SUCCEEDED)
    else:
        orchestra.update_task(status=TaskRunStatus.FAILED, message="Process failed")
  • If the function fails or throws an exception, Orchestra might not register that the Task has failed, which could have downstream consequences on your pipeline. Consider wrapping your function in a try/except block and calling update_task with status=TaskRunStatus.FAILED in the except block.

Sending logs

To send logs associated to the Task, enable the send_logs flag when initialising the OrchestraSDK class. The logs will be sent to Orchestra when the Task finishes and the decorator is being used.

An example logging configuration is shown below:

import logging
import sys

from orchestra_sdk.orchestra import OrchestraSDK

orchestra = OrchestraSDK(
    api_key="your_api_key",
    send_logs=True,
    log_file="a.log" # for certain environments, this may need to start with "/tmp/"
)

def test_function():
    # Setup logging configuration
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    # File handler
    file_handler = logging.FileHandler(orchestra.log_file)
    file_handler.setLevel(logging.INFO)
    file_formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    file_handler.setFormatter(file_formatter)

    # Console handler
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(logging.INFO)
    console_formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    console_handler.setFormatter(console_formatter)

    # Adding handlers to the logger
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)

    logger.info("Hello, World!")

Setting outputs

To set outputs associated to the Task, use the set_output method of the OrchestraSDK class.

Output names are case-insensitive and must only contain letters, underscores (_), and hyphens (-). Numbers and spaces are not allowed.

set_output must be called before updating the task status to SUCCEEDED or FAILED.

from orchestra_sdk.enum import TaskRunStatus
from orchestra_sdk.orchestra import OrchestraSDK

orchestra = OrchestraSDK(api_key="your_api_key")

def my_function(arg1, arg2=1):
    print("Start my complex process")
    fn_result = complex_process()
    orchestra.set_output(name='result', value=fn_result)

The function returns true/false for if the output was successfully set. We recommend checking the return value and logging failures to ensure outputs are correctly set.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

orchestra_sdk-0.1.2.tar.gz (12.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

orchestra_sdk-0.1.2-py3-none-any.whl (9.5 kB view details)

Uploaded Python 3

File details

Details for the file orchestra_sdk-0.1.2.tar.gz.

File metadata

  • Download URL: orchestra_sdk-0.1.2.tar.gz
  • Upload date:
  • Size: 12.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for orchestra_sdk-0.1.2.tar.gz
Algorithm Hash digest
SHA256 1eb8dea1a33a04f2fa84f8bd418bf982729d46bcde2005750ef19404e30825eb
MD5 28152ea2fa5e3d6a772998d9d8f05ac7
BLAKE2b-256 b846da9da18f8b38e27688b4fd0218b5dd08eca51c86eb441dac7a3fc8e0452d

See more details on using hashes here.

File details

Details for the file orchestra_sdk-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: orchestra_sdk-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 9.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for orchestra_sdk-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 e98a1d81a1e6410d2b5999188af731d711c210624d8797e65842a8f8abef2ea1
MD5 c22e331301aa60f44d683a586ce6a8d4
BLAKE2b-256 cef82690102ee4b9dc248f9abd056b1a5c600116240e8fc243bb34c677ae652f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page