Skip to main content

Author dagron DAGs in Python and drive the dagron-api control plane (runs, workflows, schedules).

Project description

dagron Python SDK

Author dagron workflows in Python and drive the whole dagron control plane — trigger runs, manage workflows and schedules, redrive dead letters, wire up GitOps — without writing REST calls by hand.

  • Zero dependencies. Standard library only (json + urllib).
  • Two layers: Dag (a validating spec builder) and Client (a typed wrapper over the authenticated dagron-api gateway).
  • Full coverage. v0.2 covers 100% of the current dagron-api HTTP surface — see ROADMAP.md for the coverage matrix and what's next.

Install

pip install -e .        # from sdks/python

Author a DAG

from dagron import Dag

dag = Dag("etl")
extract = dag.task("extract", image="alpine", command=["echo", "hi"])
dag.task("load", image="alpine", command=["true"], depends_on=[extract])

print(dag.to_json())    # valid dagron input (YAML is a JSON superset)

task() maps onto the engine's full TaskSpecimage, command, depends_on, plus input, max_attempts, retry_delay_secs, timeout_secs, env, resources, service_account, and workflow_ref (chain another saved workflow). to_spec()/to_json() validate the graph client-side (unique names, known deps, leaf-xor-chain, acyclic), mirroring the server so a bad DAG fails fast.

Drive the control plane

import os
from dagron import Client

api = Client("http://localhost:8080")
api.login("admin@example.com", os.environ["DAGRON_PASSWORD"])   # stores the token
# ...or skip login: Client("http://localhost:8080", token=os.environ["DAGRON_TOKEN"])

# Trigger an ad-hoc run and wait for it to finish.
run_id = api.submit_run(dag)
run = api.wait_for_run(run_id)
print(run["status"])                       # succeeded | failed | cancelled

# Save it as a reusable workflow and schedule it nightly.
wf = api.create_workflow(dag, description="nightly ETL")
api.create_schedule(wf["id"], "0 0 0 * * *")

# Observe.
for ev in api.stream_run(run_id):          # live Server-Sent Events
    print(ev["event"], ev["data"])

Every method maps one dagron-api endpoint to one call and returns the server's JSON. Non-2xx responses raise DagronError(status, message):

from dagron import DagronError

try:
    api.submit_run({"name": "x", "tasks": []})
except DagronError as e:
    print(e.status, e.message)             # e.g. 400 "DAG 'x' contains a cycle"

What Client covers

login · logout · me · create_user · submit_run · list_runs · get_run · get_run_graph · get_task_logs · cancel_run · rerun_run · resubmit_run · retry_task · stream_run · wait_for_run · list_workflows · get_workflow · create_workflow · update_workflow · delete_workflow · run_workflow · sync_workflow_to_git · list_schedules · create_schedule · update_schedule · delete_schedule · backfill_schedule · list_dead_letters · redrive_dead_letter · discard_dead_letter · list_git_repos · connect_git_repo · sync_git_repo · disconnect_git_repo · metrics · healthz

Test

python -m unittest          # from sdks/python

The suite validates the builder and runs Client against a threaded in-process fake gateway (real sockets), so request construction is exercised end-to-end.

Release

Publishing dagron-sdk to PyPI (the pip repository) is documented step-by-step in RELEASING.md — version bump, python -m build, a TestPyPI dry-run, the twine upload, tagging, and the move to OIDC Trusted Publishing.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagron_sdk-0.2.0.tar.gz (12.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagron_sdk-0.2.0-py3-none-any.whl (11.5 kB view details)

Uploaded Python 3

File details

Details for the file dagron_sdk-0.2.0.tar.gz.

File metadata

  • Download URL: dagron_sdk-0.2.0.tar.gz
  • Upload date:
  • Size: 12.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.7

File hashes

Hashes for dagron_sdk-0.2.0.tar.gz
Algorithm Hash digest
SHA256 6f38b2ba4dd1cbfd3261ee21a9f3beb93be4b0c86454de63504a41e3d5f3f264
MD5 cb5f087ce00d523569925449f9181e53
BLAKE2b-256 135616ee4c76240e92acb6b25abf1e47090f9a1baef88bcf2eda1b88a165c7a4

See more details on using hashes here.

File details

Details for the file dagron_sdk-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: dagron_sdk-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 11.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.7

File hashes

Hashes for dagron_sdk-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 55dc03004ed1cb8a5978eb741cb200f34f5a3beb22f9ea7b1e246be76856784a
MD5 2522688d9eb33ba355469aea03b8be2e
BLAKE2b-256 2191f3d90deb1e66d8f7c00ed3b68a5fc8553b9866f8fde7ab184ed28bcd86c3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page