Skip to main content

A lightweight pipeline/workflow engine. Weave data processing nodes into DAG workflows with decorators and the >> operator.

Project description

๐Ÿงถ Dagloom

CI codecov PyPI version PyPI Downloads Python 3.11+ License

Like a loom weaving threads into fabric, Dagloom weaves data processing nodes into DAG workflows.

A lightweight pipeline/workflow engine for Python. Define nodes with decorators, connect them with the >> operator, visualize and edit in a drag-and-drop Web UI.

ไธญๆ–‡ๆ–‡ๆกฃ


โœจ Why Dagloom?

Problem Competitors Dagloom
Overkill installation Airflow needs PostgreSQL + Redis + Celery + Webserver pip install dagloom && dagloom serve
Too many concepts Dagster: Assets, Ops, Jobs, Resources, IO Managers... Just @node and >>
Code/visual disconnect Airflow UI is read-only True bidirectional sync
Can't resume from failure Re-run the entire pipeline dagloom resume picks up where it left off
Shell-only nodes Dagu only supports shell commands Native Python objects (DataFrames, dicts, classes)

๐Ÿš€ Quick Start

Installation

pip install dagloom

Quick Demo

# Run the built-in demo pipeline instantly
dagloom demo --run

# Or start the web server with demo pipeline
dagloom demo

Your First Pipeline

from dagloom import node, Pipeline

@node
def greet(name: str) -> str:
    """Create a greeting message."""
    return f"Hello, {name}!"

@node
def shout(message: str) -> str:
    """Convert message to uppercase."""
    return message.upper()

@node
def add_emoji(message: str) -> str:
    """Add emoji to the message."""
    return f"๐ŸŽ‰ {message} ๐ŸŽ‰"

# Build DAG with >> operator
pipeline = greet >> shout >> add_emoji

# Run the pipeline
result = pipeline.run(name="World")
print(result)  # ๐ŸŽ‰ HELLO, WORLD! ๐ŸŽ‰

Conditional Branching

Use the | operator to create mutually exclusive branches โ€” the runtime selects which branch to execute based on the upstream output:

from dagloom import node

@node
def classify(text: str) -> dict:
    """Route to different processors."""
    if "urgent" in text:
        return {"branch": "urgent_handler", "text": text}
    return {"branch": "normal_handler", "text": text}

@node
def urgent_handler(data: dict) -> str:
    return f"๐Ÿšจ URGENT: {data['text']}"

@node
def normal_handler(data: dict) -> str:
    return f"๐Ÿ“‹ Normal: {data['text']}"

pipeline = classify >> (urgent_handler | normal_handler)
result = pipeline.run(text="urgent: server down!")
# ๐Ÿšจ URGENT: urgent: server down!

Streaming Nodes (Generator)

Node functions can be generators โ€” yielded values are automatically collected into a list:

@node
def stream_data(url: str):
    """Yield data chunks."""
    for i in range(5):
        yield {"chunk": i, "url": url}

@node
def aggregate(chunks: list[dict]) -> int:
    return len(chunks)

pipeline = stream_data >> aggregate
result = pipeline.run(url="https://example.com")
# 5

Execution Hooks

Monitor node execution with on_node_start / on_node_end callbacks:

import asyncio
from dagloom import node, AsyncExecutor

@node
def step(x: int) -> int:
    return x + 1

pipeline = step

def my_hook(node_name, ctx):
    print(f"  โ†’ {node_name}: {ctx.get_node_info(node_name).status}")

executor = AsyncExecutor(
    pipeline,
    on_node_start=my_hook,
    on_node_end=my_hook,
)
result = asyncio.run(executor.execute(x=1))

Pipeline Scheduling

Schedule pipelines to run automatically on cron expressions or fixed intervals:

from dagloom import node, Pipeline

@node
def fetch(url: str = "https://example.com/data.csv") -> list:
    return [1, 2, 3]

@node
def process(data: list) -> int:
    return sum(data)

# Set schedule via Pipeline constructor
pipeline = Pipeline(name="daily_etl", schedule="0 9 * * *")

# Or use interval shorthand
pipeline = Pipeline(name="frequent_check", schedule="every 30m")

# Or set after construction
pipeline = fetch >> process
pipeline.name = "my_pipeline"
pipeline.schedule = "0 9 * * 1-5"  # Weekdays at 9am

The scheduler runs in-process with dagloom serve โ€” schedules are persisted to SQLite and auto-restored on restart.

Notifications (Email / Webhook)

Get notified when pipelines succeed or fail:

from dagloom import node, Pipeline

@node
def fetch(url: str = "https://example.com") -> dict:
    return {"data": [1, 2, 3]}

@node
def process(data: dict) -> int:
    return sum(data["data"])

pipeline = fetch >> process
pipeline.name = "daily_etl"
pipeline.notify_on = {
    "failure": ["email://ops@team.com", "webhook://https://hooks.slack.com/xxx?format=slack"],
    "success": ["webhook://https://hooks.slack.com/yyy?format=slack"],
}

Supported channels:

  • Email: email://recipient@example.com โ€” SMTP delivery via aiosmtplib
  • Slack: webhook://https://hooks.slack.com/...?format=slack โ€” Block Kit formatting
  • WeChat Work: webhook://https://qyapi.weixin.qq.com/...?format=wechat_work
  • Feishu: webhook://https://open.feishu.cn/...?format=feishu
  • Generic Webhook: webhook://https://your-endpoint.com/hook โ€” plain JSON POST

Advanced Features

@node(retry=3, cache=True, timeout=30.0)
def fetch_data(url: str) -> pd.DataFrame:
    """Fetch CSV data with retry and caching."""
    return pd.read_csv(url)

@node(cache=True)
def clean(df: pd.DataFrame) -> pd.DataFrame:
    """Remove rows with missing values."""
    return df.dropna()

@node
def save(df: pd.DataFrame) -> str:
    """Persist cleaned data to parquet file."""
    path = "output/cleaned.parquet"
    df.to_parquet(path)
    return path

pipeline = fetch_data >> clean >> save
pipeline.run(url="https://example.com/data.csv")

Cache dependency invalidation: When fetch_data produces a different output on re-run, Dagloom automatically invalidates the caches for clean and save so they re-execute with fresh data. No manual cache management needed.

Per-Node Executor Hints

Control execution strategy per node โ€” run CPU-heavy work in separate processes while keeping I/O-bound nodes in the event loop:

from dagloom import node, AsyncExecutor

@node
def fetch(url: str) -> list:
    """I/O-bound: runs in thread (default)."""
    return [1, 2, 3]

@node(executor="process")
def transform(data: list) -> list:
    """CPU-bound: runs in a separate process."""
    return [x ** 2 for x in data]

@node
async def save(data: list) -> str:
    """Async: awaited directly on the event loop."""
    return f"Saved {len(data)} records"

pipeline = fetch >> transform >> save
executor = AsyncExecutor(pipeline)
result = await executor.execute(url="https://example.com")

Credential Management

Securely store and retrieve secrets with layered resolution (env vars โ†’ .env โ†’ encrypted DB):

# CLI
export DAGLOOM_MASTER_KEY=$(python -c "from dagloom.security import Encryptor; print(Encryptor.generate_key())")
dagloom secret set API_KEY "sk-abc123"
dagloom secret get API_KEY
dagloom secret list
dagloom secret delete API_KEY
# Python API
from dagloom.security import Encryptor, SecretStore
from dagloom.store.db import Database

db = Database()
await db.connect()
store = SecretStore(db=db, encryptor=Encryptor())

await store.set("API_KEY", "sk-abc123")
value = await store.get("API_KEY")  # Checks env โ†’ .env โ†’ encrypted DB

HTTP Authentication

Protect your Dagloom server with API Key or Basic authentication:

# API Key authentication
dagloom serve --auth-type API_KEY --auth-key sk-your-secret-key

# Basic authentication (username:password)
dagloom serve --auth-type BASIC_AUTH --auth-key admin:mypassword

# No authentication (default)
dagloom serve
# Client: API Key authentication
import httpx

headers = {"Authorization": "Bearer sk-your-secret-key"}
response = httpx.get("http://localhost:8000/api/pipelines", headers=headers)

# Client: Basic authentication
response = httpx.get("http://localhost:8000/api/pipelines", auth=("admin", "mypassword"))

Start the Web UI

dagloom serve
# Open http://localhost:8000 in your browser

๐Ÿ”– Pipeline Versioning

Every DAG change is automatically versioned with a SHA-256 hash. Compare versions to see exactly what changed:

# List version history
versions = await db.list_pipeline_versions("my_pipeline")

# Diff two versions
# GET /api/versions/{hash_a}/diff/{hash_b}
# Returns: added/removed nodes, edges, and unified code diff

REST API:

  • GET /api/pipelines/{id}/versions โ€” list version history
  • GET /api/versions/{hash} โ€” get a specific version snapshot
  • GET /api/versions/{hash_a}/diff/{hash_b} โ€” structured diff between versions

๐Ÿ“Š Observability

Track node execution metrics (wall time, success/failure rate, retries) across pipeline runs:

from dagloom import AsyncExecutor
from dagloom.store.db import Database

db = Database()
await db.connect()

executor = AsyncExecutor(pipeline, metrics_db=db)
result = await executor.execute(url="https://example.com")

# Query aggregate stats per node
stats = await db.get_node_stats("my_pipeline")
# [{"node_id": "fetch", "total_runs": 50, "avg_ms": 120.5, "p95_ms": 350.2, ...}]

# Execution history with per-node detail
history = await db.get_execution_history("my_pipeline", limit=10)

REST API:

  • GET /api/metrics/{pipeline_id} โ€” per-node stats (runs, failure rate, p50/p95 latency)
  • GET /api/history/{pipeline_id}?limit=20 โ€” execution history with node metrics

๐Ÿ”Œ Connectors

Dagloom includes built-in connectors for common data sources:

Available connectors: PostgreSQL, MySQL, S3/MinIO, HTTP API, MongoDB, Redis, Kafka

pip install dagloom[connectors]     # PostgreSQL, MySQL, S3, HTTP
pip install dagloom[mongodb]        # MongoDB (motor)
pip install dagloom[redis]          # Redis (redis-py)
pip install dagloom[kafka]          # Kafka (aiokafka)
pip install dagloom[all-connectors] # All connectors
from dagloom.connectors import ConnectionConfig
from dagloom.connectors.mongodb import MongoDBConnector

config = ConnectionConfig(host="localhost", database="mydb")
async with MongoDBConnector(config) as mongo:
    docs = await mongo.execute("find", collection="users", filter={"active": True})

๐Ÿ—๏ธ Architecture

Single Process Architecture
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  CLI / Web UI                       โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  FastAPI (REST API + WebSocket)     โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  Scheduler (APScheduler + asyncio)  โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  Core (@node + Pipeline + DAG)      โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  SQLite (embedded, zero config)     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ“ฆ Project Structure

dagloom/
โ”œโ”€โ”€ core/       # @node decorator, Pipeline class, DAG validation
โ”œโ”€โ”€ scheduler/  # Cron/interval scheduler, asyncio executor, caching, checkpoint
โ”œโ”€โ”€ security/   # Encrypted secret store, Fernet encryption, HTTP authentication (API Key + Basic Auth)
โ”œโ”€โ”€ connectors/ # PostgreSQL, MySQL, S3, HTTP, MongoDB, Redis, Kafka connectors
โ”œโ”€โ”€ server/     # FastAPI REST API + WebSocket
โ”œโ”€โ”€ store/      # SQLite storage layer
โ””โ”€โ”€ cli/        # Click CLI (serve, run, list, inspect, scheduler, secret)

๐Ÿ“– Documentation

๐Ÿค Contributing

Contributions are welcome! Please feel free to:

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

๐Ÿ“„ License

Apache License 2.0 โ€” see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagloom-0.14.0.tar.gz (229.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagloom-0.14.0-py3-none-any.whl (100.5 kB view details)

Uploaded Python 3

File details

Details for the file dagloom-0.14.0.tar.gz.

File metadata

  • Download URL: dagloom-0.14.0.tar.gz
  • Upload date:
  • Size: 229.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dagloom-0.14.0.tar.gz
Algorithm Hash digest
SHA256 3f397dcf9662bb4a93561db420edd9ab95769cfde2f2238045e020e3cd72c2f1
MD5 cc17aeef217f0e9ae79ab15b7863ed0e
BLAKE2b-256 b7d7c030f8b4dfb12cfa7b9266ce2c3253e73f415006eedada6711d7a4def955

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagloom-0.14.0.tar.gz:

Publisher: publish.yml on lucientong/dagloom

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dagloom-0.14.0-py3-none-any.whl.

File metadata

  • Download URL: dagloom-0.14.0-py3-none-any.whl
  • Upload date:
  • Size: 100.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dagloom-0.14.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3db8d8da3ee6a154cf0df77f93bdedb0f164f5bb67d8ce9144f2e222a633bee3
MD5 9bef2a59ce2eda0378de9824d23d684b
BLAKE2b-256 9f15be42b0abb003e20e4c4193aab43663453a2db238ed30770d5e2970e049ae

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagloom-0.14.0-py3-none-any.whl:

Publisher: publish.yml on lucientong/dagloom

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page