Skip to main content

Orchestra pipeline configuration language

Project description

orchestra-lang

Parse, validate, and transpile Orchestra pipeline definitions from Python.

orchestra-lang is the Python binding for the Orchestra pipeline DSL — the same engine that powers the Orchestra VS Code extension. It ships the dialect's schema and validators as a native library, so you get full dialect-aware parsing and validation without calling out to a separate process.

About the dialect

Orchestra pipelines are authored in a KSON-based DSL for defining, validating, and shipping data pipelines. A pipeline describes tasks, dependencies, conditions, triggers, and variables across 95+ integrations (Snowflake, dbt, Fivetran, Databricks, and more).

A minimal pipeline looks like this:

version: v1
name: 'task-references'
pipeline:
  producer:
    integration: SNOWFLAKE
    integrationJob: SNOWFLAKE_RUN_QUERY
    parameters:
      set_outputs: true
      statement: 'SELECT 1'
      .
    .
  consumer:
    integration: SNOWFLAKE
    integrationJob: SNOWFLAKE_RUN_QUERY
    condition: "${{ tasks['producer'].status == 'SUCCEEDED' }}"
    parameters:
      statement: "SELECT ${{ tasks['producer'].outputs['count'] }}"
      .
    dependsOn:
      - producer

The dialect validates required fields and types, integration-specific parameters, variable references and expressions, task dependencies, cron syntax, and branching conditions. See the Orchestra documentation for the full language reference.

Installation

pip install orchestra-lang

API

The module exposes five top-level functions and return the raw result objects.

analyze(kson: str) -> Analysis

Statically analyze an Orchestra document. The bundled engine runs the Orchestra dialect validators automatically, so analyze catches parse errors, schema violations, expression syntax errors, bad task references, circular dependencies, invalid cron expressions, and so on.

import orchestra_lang

src = open("pipeline.orc").read()
analysis = orchestra_lang.analyze(src)

for msg in analysis.errors():
    start = msg.start()
    print(f"[{msg.severity()}] {start.line()}:{start.column()}  {msg.message()}")

Example output for a pipeline with a broken condition expression and missing required fields:

[MessageSeverity.ERROR] 3:53  Expression syntax error: Expected RPAREN but got ''
[MessageSeverity.WARNING] 0:0  Missing required properties: version, name

The Analysis object

Analysis exposes three views of the analyzed document:

  • errors() -> list[Message] — every error and warning produced by the parser, schema validator, and dialect validators. Each Message carries a severity() (MessageSeverity.ERROR or MessageSeverity.WARNING), a message() string, and start() / end() Positions whose line() and column() methods return zero-based offsets. An empty list means the document is valid.

  • tokens() -> list[Token] — the full lexed token stream, useful for syntax highlighting and editor tooling. Each Token has token_type() (a TokenType enum), text(), and start() / end() positions.

  • kson_value() -> KsonValue | None — the parsed document as a typed value tree, or None if parsing failed fatally. Call type() to get a KsonValueType discriminator, or isinstance check against KsonValue.KsonObject, KsonValue.KsonArray, KsonValue.KsonString, KsonValue.KsonNumber, KsonValue.KsonBoolean, KsonValue.KsonNull, or KsonValue.KsonEmbed to walk the tree.

from orchestra_lang import analyze, KsonValue

analysis = analyze(src)
root = analysis.kson_value()
if isinstance(root, KsonValue.KsonObject):
    # ... walk the object
    ...

to_json(kson: str, options: TranspileOptions.Json) -> Result

Transpile Orchestra source to JSON. Returns a Result — pattern-match on Result.Success / Result.Failure:

from orchestra_lang import to_json, Result, TranspileOptions

result = to_json(src, TranspileOptions.Json(retain_embed_tags=False))
if isinstance(result, Result.Success):
    print(result.output())
else:
    for err in result.errors():
        print(err.message())

to_yaml(kson: str, options: TranspileOptions.Yaml) -> Result

Same shape as to_json, but emits YAML and preserves comments.

from orchestra_lang import to_yaml, TranspileOptions

result = to_yaml(src, TranspileOptions.Yaml(retain_embed_tags=False))

format_source(kson: str, format_options: FormatOptions) -> str

Pretty-print Orchestra source with the given formatting options. Named format_source rather than format so that from orchestra_lang import * does not shadow the format builtin.

from orchestra_lang import format_source, FormatOptions, FormattingStyle, IndentType

formatted = format_source(
    src,
    FormatOptions(
        indent_type=IndentType.Spaces(2),
        formatting_style=FormattingStyle.PLAIN,
        embed_block_rules=[],
    ),
)

parse_schema(schema_kson: str) -> SchemaResult

Parse a KSON JSON-Schema document and, on success, return a reusable SchemaValidator.

from orchestra_lang import parse_schema, SchemaResult

result = parse_schema(open("my.schema.kson").read())
if isinstance(result, SchemaResult.Success):
    validator = result.schema_validator()
    messages = validator.validate(src, "document.orc")

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

orchestra_lang-0.0.1-cp310-abi3-manylinux_2_34_x86_64.whl (20.2 MB view details)

Uploaded CPython 3.10+manylinux: glibc 2.34+ x86-64

orchestra_lang-0.0.1-cp310-abi3-manylinux_2_34_aarch64.whl (20.4 MB view details)

Uploaded CPython 3.10+manylinux: glibc 2.34+ ARM64

orchestra_lang-0.0.1-cp310-abi3-macosx_11_0_arm64.whl (18.8 MB view details)

Uploaded CPython 3.10+macOS 11.0+ ARM64

File details

Details for the file orchestra_lang-0.0.1-cp310-abi3-manylinux_2_34_x86_64.whl.

File metadata

File hashes

Hashes for orchestra_lang-0.0.1-cp310-abi3-manylinux_2_34_x86_64.whl
Algorithm Hash digest
SHA256 e85cb1421127a1a7eaf51b74ff82597b62f2449edf5a8501f0b8785bdda406b5
MD5 2865daec72fc2347c82e72009d1deb97
BLAKE2b-256 6042d55353c7a9bf0fbc51dba21308f2fe4a4ffa1eb945051323267a1335459e

See more details on using hashes here.

File details

Details for the file orchestra_lang-0.0.1-cp310-abi3-manylinux_2_34_aarch64.whl.

File metadata

File hashes

Hashes for orchestra_lang-0.0.1-cp310-abi3-manylinux_2_34_aarch64.whl
Algorithm Hash digest
SHA256 aa5e30ee67f37567b5956911bc67ca167304cf5806520fb230a032b3db75f447
MD5 e9e7bb607768c1c15067dca09f606cdb
BLAKE2b-256 0d4de90f0e96dfa244a3121be47d141251c27d8ce284f7a3a7d1b98c3688faf1

See more details on using hashes here.

File details

Details for the file orchestra_lang-0.0.1-cp310-abi3-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for orchestra_lang-0.0.1-cp310-abi3-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 849476392591887ba266c3f059c9ca1715b65a2925c553e792beb784092307ad
MD5 ae2537d34308a7ecce00ba6c7b36af9c
BLAKE2b-256 05863a13c18b4d4c405bebffa753fcfaa9ebae13d6202f4ecb3f9f7c80ab4426

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page