Skip to main content

Compose Python logic into data-flow pipelines — sync or async, run anywhere

Project description

lythonic

PyPI version Documentation

Compose Python logic into data-flow pipelines — sync or async, run anywhere.

Write plain Python functions. Wire them with >>. Data flows visibly between nodes — you can see what went in, what came out, what failed. Unlike task schedulers where jobs are opaque units, lythonic tracks the data itself.

Quick Start

uv add lythonic
from lythonic.compose.namespace import Dag

def fetch(url: str) -> dict:
    return {"source": url, "values": [1, 2, 3]}

def double(data: dict) -> dict:
    return {**data, "values": [v * 2 for v in data["values"]]}

dag = Dag()
dag.node(fetch) >> dag.node(double)

# Run it — sync or async, doesn't matter
import asyncio
result = asyncio.run(dag(url="https://example.com"))
print(result.outputs)  # {"double": {"source": "...", "values": [2, 4, 6]}}

Why lythonic?

Data flow, not task flow. Each node receives typed data from upstream and passes results downstream. The DAG runner wires inputs to outputs by type — fan-out, fan-in, and map-reduce built in. Provenance tracking records what data flowed through each edge.

Compose freely. DAGs nest inside DAGs. dag.node(sub_dag) runs a sub-DAG as a single step. dag.map(sub_dag) runs it on each element of a collection, concurrently. Build small, reuse everywhere.

Run transparently. await dag() for a quick test. DagRunner with provenance for production. lyth start for a long-running engine with cron-triggered pipelines. Same code, different execution context.

Sync and async — mixed freely. Write sync functions, async functions, or both in the same DAG. Sync nodes run in a thread executor automatically.

Features

  • DAG composition>> wiring, callable DAGs, MapNode, CallNode
  • @dag_factory — define reusable DAG templates as decorated functions
  • Triggers — cron-scheduled or push-triggered execution via TriggerManager
  • Provenance — SQLite-backed tracking of runs, node executions, edge traversals
  • Caching — per-callable SQLite cache with probabilistic TTL refresh
  • lyth CLIstart, stop, run, fire, status commands
  • State — Pydantic-based SQLite ORM with schema management and multi-tenant support

Documentation

Full documentation at walnutgeek.github.io/lythonic.

Star History

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lythonic-0.0.16.tar.gz (226.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

lythonic-0.0.16-py3-none-any.whl (76.8 kB view details)

Uploaded Python 3

File details

Details for the file lythonic-0.0.16.tar.gz.

File metadata

  • Download URL: lythonic-0.0.16.tar.gz
  • Upload date:
  • Size: 226.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.8.13

File hashes

Hashes for lythonic-0.0.16.tar.gz
Algorithm Hash digest
SHA256 e2dcd11efd8bcf0cdeef27960d2b5cd2208c9a5748dac742e8dcf8b938042f1c
MD5 70d0728ef73e6736d3f26e24051684d2
BLAKE2b-256 2d4d6807d794aaf84ead1c6cfdc52447de3746a8997920e99a1a719c0235baa9

See more details on using hashes here.

File details

Details for the file lythonic-0.0.16-py3-none-any.whl.

File metadata

  • Download URL: lythonic-0.0.16-py3-none-any.whl
  • Upload date:
  • Size: 76.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.8.13

File hashes

Hashes for lythonic-0.0.16-py3-none-any.whl
Algorithm Hash digest
SHA256 b7a343a86ab96c7fbf9d14aac937fcc030612c3e341dc660678c125c1462dd86
MD5 f8874877d010616560932527b489d5b7
BLAKE2b-256 a893823a12b7f9bcd9b1e871294fa0753a64841b8311079e0b271688923e3ff3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page