A lightweight pipeline/workflow engine. Weave data processing nodes into DAG workflows with decorators and the >> operator.
Project description
๐งถ Dagloom
Like a loom weaving threads into fabric, Dagloom weaves data processing nodes into DAG workflows.
A lightweight pipeline/workflow engine for Python. Define nodes with decorators, connect them with the >> operator, visualize and edit in a drag-and-drop Web UI.
โจ Why Dagloom?
| Problem | Competitors | Dagloom |
|---|---|---|
| Overkill installation | Airflow needs PostgreSQL + Redis + Celery + Webserver | pip install dagloom && dagloom serve |
| Too many concepts | Dagster: Assets, Ops, Jobs, Resources, IO Managers... | Just @node and >> |
| Code/visual disconnect | Airflow UI is read-only | True bidirectional sync |
| Can't resume from failure | Re-run the entire pipeline | dagloom resume picks up where it left off |
| Shell-only nodes | Dagu only supports shell commands | Native Python objects (DataFrames, dicts, classes) |
๐ Quick Start
Installation
pip install dagloom
Your First Pipeline
from dagloom import node, Pipeline
@node
def greet(name: str) -> str:
"""Create a greeting message."""
return f"Hello, {name}!"
@node
def shout(message: str) -> str:
"""Convert message to uppercase."""
return message.upper()
@node
def add_emoji(message: str) -> str:
"""Add emoji to the message."""
return f"๐ {message} ๐"
# Build DAG with >> operator
pipeline = greet >> shout >> add_emoji
# Run the pipeline
result = pipeline.run(name="World")
print(result) # ๐ HELLO, WORLD! ๐
Conditional Branching
Use the | operator to create mutually exclusive branches โ the runtime selects which branch to execute based on the upstream output:
from dagloom import node
@node
def classify(text: str) -> dict:
"""Route to different processors."""
if "urgent" in text:
return {"branch": "urgent_handler", "text": text}
return {"branch": "normal_handler", "text": text}
@node
def urgent_handler(data: dict) -> str:
return f"๐จ URGENT: {data['text']}"
@node
def normal_handler(data: dict) -> str:
return f"๐ Normal: {data['text']}"
pipeline = classify >> (urgent_handler | normal_handler)
result = pipeline.run(text="urgent: server down!")
# ๐จ URGENT: urgent: server down!
Streaming Nodes (Generator)
Node functions can be generators โ yielded values are automatically collected into a list:
@node
def stream_data(url: str):
"""Yield data chunks."""
for i in range(5):
yield {"chunk": i, "url": url}
@node
def aggregate(chunks: list[dict]) -> int:
return len(chunks)
pipeline = stream_data >> aggregate
result = pipeline.run(url="https://example.com")
# 5
Execution Hooks
Monitor node execution with on_node_start / on_node_end callbacks:
import asyncio
from dagloom import node, AsyncExecutor
@node
def step(x: int) -> int:
return x + 1
pipeline = step
def my_hook(node_name, ctx):
print(f" โ {node_name}: {ctx.get_node_info(node_name).status}")
executor = AsyncExecutor(
pipeline,
on_node_start=my_hook,
on_node_end=my_hook,
)
result = asyncio.run(executor.execute(x=1))
Pipeline Scheduling
Schedule pipelines to run automatically on cron expressions or fixed intervals:
from dagloom import node, Pipeline
@node
def fetch(url: str = "https://example.com/data.csv") -> list:
return [1, 2, 3]
@node
def process(data: list) -> int:
return sum(data)
# Set schedule via Pipeline constructor
pipeline = Pipeline(name="daily_etl", schedule="0 9 * * *")
# Or use interval shorthand
pipeline = Pipeline(name="frequent_check", schedule="every 30m")
# Or set after construction
pipeline = fetch >> process
pipeline.name = "my_pipeline"
pipeline.schedule = "0 9 * * 1-5" # Weekdays at 9am
The scheduler runs in-process with dagloom serve โ schedules are persisted to SQLite and auto-restored on restart.
Advanced Features
@node(retry=3, cache=True, timeout=30.0)
def fetch_data(url: str) -> pd.DataFrame:
"""Fetch CSV data with retry and caching."""
return pd.read_csv(url)
@node(cache=True)
def clean(df: pd.DataFrame) -> pd.DataFrame:
"""Remove rows with missing values."""
return df.dropna()
@node
def save(df: pd.DataFrame) -> str:
"""Persist cleaned data to parquet file."""
path = "output/cleaned.parquet"
df.to_parquet(path)
return path
pipeline = fetch_data >> clean >> save
pipeline.run(url="https://example.com/data.csv")
Start the Web UI
dagloom serve
# Open http://localhost:8000 in your browser
๐๏ธ Architecture
Single Process Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ CLI / Web UI โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ FastAPI (REST API + WebSocket) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Scheduler (APScheduler + asyncio) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Core (@node + Pipeline + DAG) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ SQLite (embedded, zero config) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
๐ฆ Project Structure
dagloom/
โโโ core/ # @node decorator, Pipeline class, DAG validation
โโโ scheduler/ # Cron/interval scheduler, asyncio executor, caching, checkpoint
โโโ connectors/ # PostgreSQL, MySQL, S3, HTTP connectors
โโโ server/ # FastAPI REST API + WebSocket
โโโ store/ # SQLite storage layer
โโโ cli/ # Click CLI (serve, run, list, inspect, scheduler)
๐ Documentation
๐ค Contributing
Contributions are welcome! Please feel free to:
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
๐ License
Apache License 2.0 โ see LICENSE for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dagloom-0.4.0.tar.gz.
File metadata
- Download URL: dagloom-0.4.0.tar.gz
- Upload date:
- Size: 136.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9183db330616b791a97658f6cbb3accab19916ebe3bb3d6558ba16003bc6c991
|
|
| MD5 |
6f423d049ee79b9f686a1fe7219b8303
|
|
| BLAKE2b-256 |
e1f03d9dc53b5310c353306e70e26303a0c9a353eb4e9e0269c03c4f4da6658e
|
Provenance
The following attestation bundles were made for dagloom-0.4.0.tar.gz:
Publisher:
publish.yml on lucientong/dagloom
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dagloom-0.4.0.tar.gz -
Subject digest:
9183db330616b791a97658f6cbb3accab19916ebe3bb3d6558ba16003bc6c991 - Sigstore transparency entry: 1293646062
- Sigstore integration time:
-
Permalink:
lucientong/dagloom@5ce446198feb8fc8cad198bbb1ee7be124a77e8b -
Branch / Tag:
refs/tags/v0.4.0 - Owner: https://github.com/lucientong
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@5ce446198feb8fc8cad198bbb1ee7be124a77e8b -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file dagloom-0.4.0-py3-none-any.whl.
File metadata
- Download URL: dagloom-0.4.0-py3-none-any.whl
- Upload date:
- Size: 60.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
579ae93f2df938f5175fdb369c43b765751a974bd88f0344e771827cf06bbde3
|
|
| MD5 |
c4969e5f8cbdd2cd6370b0439a5722d2
|
|
| BLAKE2b-256 |
f9cda6e686b00453d068a8338cf792afc0a5eada92df5c8de29c1e5eff138f6c
|
Provenance
The following attestation bundles were made for dagloom-0.4.0-py3-none-any.whl:
Publisher:
publish.yml on lucientong/dagloom
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dagloom-0.4.0-py3-none-any.whl -
Subject digest:
579ae93f2df938f5175fdb369c43b765751a974bd88f0344e771827cf06bbde3 - Sigstore transparency entry: 1293646067
- Sigstore integration time:
-
Permalink:
lucientong/dagloom@5ce446198feb8fc8cad198bbb1ee7be124a77e8b -
Branch / Tag:
refs/tags/v0.4.0 - Owner: https://github.com/lucientong
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@5ce446198feb8fc8cad198bbb1ee7be124a77e8b -
Trigger Event:
workflow_dispatch
-
Statement type: