Skip to main content

A lightweight pipeline/workflow engine. Weave data processing nodes into DAG workflows with decorators and the >> operator.

Project description

๐Ÿงถ Dagloom

CI codecov PyPI version PyPI Downloads Python 3.11+ License

Like a loom weaving threads into fabric, Dagloom weaves data processing nodes into DAG workflows.

A lightweight pipeline/workflow engine for Python. Define nodes with decorators, connect them with the >> operator, visualize and edit in a drag-and-drop Web UI.

ไธญๆ–‡ๆ–‡ๆกฃ


โœจ Why Dagloom?

Problem Competitors Dagloom
Overkill installation Airflow needs PostgreSQL + Redis + Celery + Webserver pip install dagloom && dagloom serve
Too many concepts Dagster: Assets, Ops, Jobs, Resources, IO Managers... Just @node and >>
Code/visual disconnect Airflow UI is read-only True bidirectional sync
Can't resume from failure Re-run the entire pipeline dagloom resume picks up where it left off
Shell-only nodes Dagu only supports shell commands Native Python objects (DataFrames, dicts, classes)

๐Ÿš€ Quick Start

Installation

pip install dagloom

Quick Demo

# Run the built-in demo pipeline instantly
dagloom demo --run

# Or start the web server with demo pipeline
dagloom demo

Your First Pipeline

from dagloom import node, Pipeline

@node
def greet(name: str) -> str:
    """Create a greeting message."""
    return f"Hello, {name}!"

@node
def shout(message: str) -> str:
    """Convert message to uppercase."""
    return message.upper()

@node
def add_emoji(message: str) -> str:
    """Add emoji to the message."""
    return f"๐ŸŽ‰ {message} ๐ŸŽ‰"

# Build DAG with >> operator
pipeline = greet >> shout >> add_emoji

# Run the pipeline
result = pipeline.run(name="World")
print(result)  # ๐ŸŽ‰ HELLO, WORLD! ๐ŸŽ‰

Conditional Branching

Use the | operator to create mutually exclusive branches โ€” the runtime selects which branch to execute based on the upstream output:

from dagloom import node

@node
def classify(text: str) -> dict:
    """Route to different processors."""
    if "urgent" in text:
        return {"branch": "urgent_handler", "text": text}
    return {"branch": "normal_handler", "text": text}

@node
def urgent_handler(data: dict) -> str:
    return f"๐Ÿšจ URGENT: {data['text']}"

@node
def normal_handler(data: dict) -> str:
    return f"๐Ÿ“‹ Normal: {data['text']}"

pipeline = classify >> (urgent_handler | normal_handler)
result = pipeline.run(text="urgent: server down!")
# ๐Ÿšจ URGENT: urgent: server down!

Streaming Nodes (Generator)

Node functions can be generators โ€” yielded values are automatically collected into a list:

@node
def stream_data(url: str):
    """Yield data chunks."""
    for i in range(5):
        yield {"chunk": i, "url": url}

@node
def aggregate(chunks: list[dict]) -> int:
    return len(chunks)

pipeline = stream_data >> aggregate
result = pipeline.run(url="https://example.com")
# 5

Execution Hooks

Monitor node execution with on_node_start / on_node_end callbacks:

import asyncio
from dagloom import node, AsyncExecutor

@node
def step(x: int) -> int:
    return x + 1

pipeline = step

def my_hook(node_name, ctx):
    print(f"  โ†’ {node_name}: {ctx.get_node_info(node_name).status}")

executor = AsyncExecutor(
    pipeline,
    on_node_start=my_hook,
    on_node_end=my_hook,
)
result = asyncio.run(executor.execute(x=1))

Pipeline Scheduling

Schedule pipelines to run automatically on cron expressions or fixed intervals:

from dagloom import node, Pipeline

@node
def fetch(url: str = "https://example.com/data.csv") -> list:
    return [1, 2, 3]

@node
def process(data: list) -> int:
    return sum(data)

# Set schedule via Pipeline constructor
pipeline = Pipeline(name="daily_etl", schedule="0 9 * * *")

# Or use interval shorthand
pipeline = Pipeline(name="frequent_check", schedule="every 30m")

# Or set after construction
pipeline = fetch >> process
pipeline.name = "my_pipeline"
pipeline.schedule = "0 9 * * 1-5"  # Weekdays at 9am

The scheduler runs in-process with dagloom serve โ€” schedules are persisted to SQLite and auto-restored on restart.

Notifications (Email / Webhook)

Get notified when pipelines succeed or fail:

from dagloom import node, Pipeline

@node
def fetch(url: str = "https://example.com") -> dict:
    return {"data": [1, 2, 3]}

@node
def process(data: dict) -> int:
    return sum(data["data"])

pipeline = fetch >> process
pipeline.name = "daily_etl"
pipeline.notify_on = {
    "failure": ["email://ops@team.com", "webhook://https://hooks.slack.com/xxx?format=slack"],
    "success": ["webhook://https://hooks.slack.com/yyy?format=slack"],
}

Supported channels:

  • Email: email://recipient@example.com โ€” SMTP delivery via aiosmtplib
  • Slack: webhook://https://hooks.slack.com/...?format=slack โ€” Block Kit formatting
  • WeChat Work: webhook://https://qyapi.weixin.qq.com/...?format=wechat_work
  • Feishu: webhook://https://open.feishu.cn/...?format=feishu
  • Generic Webhook: webhook://https://your-endpoint.com/hook โ€” plain JSON POST

Advanced Features

@node(retry=3, cache=True, timeout=30.0)
def fetch_data(url: str) -> pd.DataFrame:
    """Fetch CSV data with retry and caching."""
    return pd.read_csv(url)

@node(cache=True)
def clean(df: pd.DataFrame) -> pd.DataFrame:
    """Remove rows with missing values."""
    return df.dropna()

@node
def save(df: pd.DataFrame) -> str:
    """Persist cleaned data to parquet file."""
    path = "output/cleaned.parquet"
    df.to_parquet(path)
    return path

pipeline = fetch_data >> clean >> save
pipeline.run(url="https://example.com/data.csv")

Cache dependency invalidation: When fetch_data produces a different output on re-run, Dagloom automatically invalidates the caches for clean and save so they re-execute with fresh data. No manual cache management needed.

Per-Node Executor Hints

Control execution strategy per node โ€” run CPU-heavy work in separate processes while keeping I/O-bound nodes in the event loop:

from dagloom import node, AsyncExecutor

@node
def fetch(url: str) -> list:
    """I/O-bound: runs in thread (default)."""
    return [1, 2, 3]

@node(executor="process")
def transform(data: list) -> list:
    """CPU-bound: runs in a separate process."""
    return [x ** 2 for x in data]

@node
async def save(data: list) -> str:
    """Async: awaited directly on the event loop."""
    return f"Saved {len(data)} records"

pipeline = fetch >> transform >> save
executor = AsyncExecutor(pipeline)
result = await executor.execute(url="https://example.com")

Credential Management

Securely store and retrieve secrets with layered resolution (env vars โ†’ .env โ†’ encrypted DB):

# CLI
export DAGLOOM_MASTER_KEY=$(python -c "from dagloom.security import Encryptor; print(Encryptor.generate_key())")
dagloom secret set API_KEY "sk-abc123"
dagloom secret get API_KEY
dagloom secret list
dagloom secret delete API_KEY
# Python API
from dagloom.security import Encryptor, SecretStore
from dagloom.store.db import Database

db = Database()
await db.connect()
store = SecretStore(db=db, encryptor=Encryptor())

await store.set("API_KEY", "sk-abc123")
value = await store.get("API_KEY")  # Checks env โ†’ .env โ†’ encrypted DB

HTTP Authentication

Protect your Dagloom server with API Key or Basic authentication:

# API Key authentication
dagloom serve --auth-type API_KEY --auth-key sk-your-secret-key

# Basic authentication (username:password)
dagloom serve --auth-type BASIC_AUTH --auth-key admin:mypassword

# No authentication (default)
dagloom serve
# Client: API Key authentication
import httpx

headers = {"Authorization": "Bearer sk-your-secret-key"}
response = httpx.get("http://localhost:8000/api/pipelines", headers=headers)

# Client: Basic authentication
response = httpx.get("http://localhost:8000/api/pipelines", auth=("admin", "mypassword"))

Web UI

dagloom serve
# Open http://localhost:8000 in your browser

The Web UI provides:

  • DAG Editor โ€” drag-and-drop pipeline visualization with ReactFlow
  • Pipeline List โ€” sidebar listing all registered pipelines
  • Metrics Dashboard โ€” per-node execution stats with bar charts (success/failure rate, p50/p95 latency)
  • Version History โ€” timeline of pipeline snapshots with diff support
  • Execution Log โ€” real-time log viewer via WebSocket
  • Node Inspector โ€” click any node to view config (retry, cache, timeout)

For frontend development:

cd web
npm install
npm run dev    # Vite dev server with HMR, proxies /api to backend

Tech stack: React 18 + TypeScript + Vite + Tailwind CSS + ReactFlow + Recharts

๐Ÿ”– Pipeline Versioning

Every DAG change is automatically versioned with a SHA-256 hash. Compare versions to see exactly what changed:

# List version history
versions = await db.list_pipeline_versions("my_pipeline")

# Diff two versions
# GET /api/versions/{hash_a}/diff/{hash_b}
# Returns: added/removed nodes, edges, and unified code diff

REST API:

  • GET /api/pipelines/{id}/versions โ€” list version history
  • GET /api/versions/{hash} โ€” get a specific version snapshot
  • GET /api/versions/{hash_a}/diff/{hash_b} โ€” structured diff between versions

๐Ÿ“Š Observability

Track node execution metrics (wall time, success/failure rate, retries) across pipeline runs:

from dagloom import AsyncExecutor
from dagloom.store.db import Database

db = Database()
await db.connect()

executor = AsyncExecutor(pipeline, metrics_db=db)
result = await executor.execute(url="https://example.com")

# Query aggregate stats per node
stats = await db.get_node_stats("my_pipeline")
# [{"node_id": "fetch", "total_runs": 50, "avg_ms": 120.5, "p95_ms": 350.2, ...}]

# Execution history with per-node detail
history = await db.get_execution_history("my_pipeline", limit=10)

REST API:

  • GET /api/metrics/{pipeline_id} โ€” per-node stats (runs, failure rate, p50/p95 latency)
  • GET /api/history/{pipeline_id}?limit=20 โ€” execution history with node metrics

๐Ÿ”Œ Connectors

Dagloom includes built-in connectors for common data sources:

Available connectors: PostgreSQL, MySQL, S3/MinIO, HTTP API, MongoDB, Redis, Kafka

pip install dagloom[connectors]     # PostgreSQL, MySQL, S3, HTTP
pip install dagloom[mongodb]        # MongoDB (motor)
pip install dagloom[redis]          # Redis (redis-py)
pip install dagloom[kafka]          # Kafka (aiokafka)
pip install dagloom[all-connectors] # All connectors
from dagloom.connectors import ConnectionConfig
from dagloom.connectors.mongodb import MongoDBConnector

config = ConnectionConfig(host="localhost", database="mydb")
async with MongoDBConnector(config) as mongo:
    docs = await mongo.execute("find", collection="users", filter={"active": True})

๐Ÿ—๏ธ Architecture

Single Process Architecture
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  CLI / Web UI                       โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  FastAPI (REST API + WebSocket)     โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  Scheduler (APScheduler + asyncio)  โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  Core (@node + Pipeline + DAG)      โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  SQLite (embedded, zero config)     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ“ฆ Project Structure

dagloom/
โ”œโ”€โ”€ core/       # @node decorator, Pipeline class, DAG validation
โ”œโ”€โ”€ scheduler/  # Cron/interval scheduler, asyncio executor, caching, checkpoint
โ”œโ”€โ”€ security/   # Encrypted secret store, Fernet encryption, HTTP authentication (API Key + Basic Auth)
โ”œโ”€โ”€ connectors/ # PostgreSQL, MySQL, S3, HTTP, MongoDB, Redis, Kafka connectors
โ”œโ”€โ”€ server/     # FastAPI REST API + WebSocket
โ”œโ”€โ”€ store/      # SQLite storage layer
โ””โ”€โ”€ cli/        # Click CLI (serve, run, list, inspect, scheduler, secret)

๐Ÿ“– Documentation

๐Ÿค Contributing

Contributions are welcome! Please feel free to:

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

๐Ÿ“„ License

Apache License 2.0 โ€” see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagloom-1.0.0.tar.gz (241.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagloom-1.0.0-py3-none-any.whl (100.8 kB view details)

Uploaded Python 3

File details

Details for the file dagloom-1.0.0.tar.gz.

File metadata

  • Download URL: dagloom-1.0.0.tar.gz
  • Upload date:
  • Size: 241.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dagloom-1.0.0.tar.gz
Algorithm Hash digest
SHA256 c6ec2c5d770b08e3652afd797d0e62bb73117d0d9b5358857e16c3ee4ea696eb
MD5 d7b69072febdbaccdee3c80a295582b6
BLAKE2b-256 a64c6dd36898be0a2dbc541505d309b7107f88d543ae8e789bb72c55702b0970

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagloom-1.0.0.tar.gz:

Publisher: publish.yml on lucientong/dagloom

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dagloom-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: dagloom-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 100.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dagloom-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c234a0f972b6acb5483f0aae48daed1b8ee91a4ce2ae132266e6bfc4fb7cad12
MD5 df5f8b5a15718d6d621bf06a83c58a1f
BLAKE2b-256 603b989756daa3a5404b8394e4b6c3a7a75e0591294c9b17b854f5776fda9737

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagloom-1.0.0-py3-none-any.whl:

Publisher: publish.yml on lucientong/dagloom

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page