Skip to main content

A lightweight pipeline/workflow engine. Weave data processing nodes into DAG workflows with decorators and the >> operator.

Project description

๐Ÿงถ Dagloom

CI codecov PyPI version PyPI Downloads Python 3.11+ License

Like a loom weaving threads into fabric, Dagloom weaves data processing nodes into DAG workflows.

A lightweight pipeline/workflow engine for Python. Define nodes with decorators, connect them with the >> operator, visualize and edit in a drag-and-drop Web UI.

ไธญๆ–‡ๆ–‡ๆกฃ


โœจ Why Dagloom?

Problem Competitors Dagloom
Overkill installation Airflow needs PostgreSQL + Redis + Celery + Webserver pip install dagloom && dagloom serve
Too many concepts Dagster: Assets, Ops, Jobs, Resources, IO Managers... Just @node and >>
Code/visual disconnect Airflow UI is read-only True bidirectional sync
Can't resume from failure Re-run the entire pipeline dagloom resume picks up where it left off
Shell-only nodes Dagu only supports shell commands Native Python objects (DataFrames, dicts, classes)

๐Ÿš€ Quick Start

Installation

pip install dagloom

Quick Demo

# Run the built-in demo pipeline instantly
dagloom demo --run

# Or start the web server with demo pipeline
dagloom demo

Your First Pipeline

from dagloom import node, Pipeline

@node
def greet(name: str) -> str:
    """Create a greeting message."""
    return f"Hello, {name}!"

@node
def shout(message: str) -> str:
    """Convert message to uppercase."""
    return message.upper()

@node
def add_emoji(message: str) -> str:
    """Add emoji to the message."""
    return f"๐ŸŽ‰ {message} ๐ŸŽ‰"

# Build DAG with >> operator
pipeline = greet >> shout >> add_emoji

# Run the pipeline
result = pipeline.run(name="World")
print(result)  # ๐ŸŽ‰ HELLO, WORLD! ๐ŸŽ‰

Conditional Branching

Use the | operator to create mutually exclusive branches โ€” the runtime selects which branch to execute based on the upstream output:

from dagloom import node

@node
def classify(text: str) -> dict:
    """Route to different processors."""
    if "urgent" in text:
        return {"branch": "urgent_handler", "text": text}
    return {"branch": "normal_handler", "text": text}

@node
def urgent_handler(data: dict) -> str:
    return f"๐Ÿšจ URGENT: {data['text']}"

@node
def normal_handler(data: dict) -> str:
    return f"๐Ÿ“‹ Normal: {data['text']}"

pipeline = classify >> (urgent_handler | normal_handler)
result = pipeline.run(text="urgent: server down!")
# ๐Ÿšจ URGENT: urgent: server down!

Streaming Nodes (Generator)

Node functions can be generators โ€” yielded values are automatically collected into a list:

@node
def stream_data(url: str):
    """Yield data chunks."""
    for i in range(5):
        yield {"chunk": i, "url": url}

@node
def aggregate(chunks: list[dict]) -> int:
    return len(chunks)

pipeline = stream_data >> aggregate
result = pipeline.run(url="https://example.com")
# 5

Execution Hooks

Monitor node execution with on_node_start / on_node_end callbacks:

import asyncio
from dagloom import node, AsyncExecutor

@node
def step(x: int) -> int:
    return x + 1

pipeline = step

def my_hook(node_name, ctx):
    print(f"  โ†’ {node_name}: {ctx.get_node_info(node_name).status}")

executor = AsyncExecutor(
    pipeline,
    on_node_start=my_hook,
    on_node_end=my_hook,
)
result = asyncio.run(executor.execute(x=1))

Pipeline Scheduling

Schedule pipelines to run automatically on cron expressions or fixed intervals:

from dagloom import node, Pipeline

@node
def fetch(url: str = "https://example.com/data.csv") -> list:
    return [1, 2, 3]

@node
def process(data: list) -> int:
    return sum(data)

# Set schedule via Pipeline constructor
pipeline = Pipeline(name="daily_etl", schedule="0 9 * * *")

# Or use interval shorthand
pipeline = Pipeline(name="frequent_check", schedule="every 30m")

# Or set after construction
pipeline = fetch >> process
pipeline.name = "my_pipeline"
pipeline.schedule = "0 9 * * 1-5"  # Weekdays at 9am

The scheduler runs in-process with dagloom serve โ€” schedules are persisted to SQLite and auto-restored on restart.

Notifications (Email / Webhook)

Get notified when pipelines succeed or fail:

from dagloom import node, Pipeline

@node
def fetch(url: str = "https://example.com") -> dict:
    return {"data": [1, 2, 3]}

@node
def process(data: dict) -> int:
    return sum(data["data"])

pipeline = fetch >> process
pipeline.name = "daily_etl"
pipeline.notify_on = {
    "failure": ["email://ops@team.com", "webhook://https://hooks.slack.com/xxx?format=slack"],
    "success": ["webhook://https://hooks.slack.com/yyy?format=slack"],
}

Supported channels:

  • Email: email://recipient@example.com โ€” SMTP delivery via aiosmtplib
  • Slack: webhook://https://hooks.slack.com/...?format=slack โ€” Block Kit formatting
  • WeChat Work: webhook://https://qyapi.weixin.qq.com/...?format=wechat_work
  • Feishu: webhook://https://open.feishu.cn/...?format=feishu
  • Generic Webhook: webhook://https://your-endpoint.com/hook โ€” plain JSON POST

Advanced Features

@node(retry=3, cache=True, timeout=30.0)
def fetch_data(url: str) -> pd.DataFrame:
    """Fetch CSV data with retry and caching."""
    return pd.read_csv(url)

@node(cache=True)
def clean(df: pd.DataFrame) -> pd.DataFrame:
    """Remove rows with missing values."""
    return df.dropna()

@node
def save(df: pd.DataFrame) -> str:
    """Persist cleaned data to parquet file."""
    path = "output/cleaned.parquet"
    df.to_parquet(path)
    return path

pipeline = fetch_data >> clean >> save
pipeline.run(url="https://example.com/data.csv")

Cache dependency invalidation: When fetch_data produces a different output on re-run, Dagloom automatically invalidates the caches for clean and save so they re-execute with fresh data. No manual cache management needed.

Per-Node Executor Hints

Control execution strategy per node โ€” run CPU-heavy work in separate processes while keeping I/O-bound nodes in the event loop:

from dagloom import node, AsyncExecutor

@node
def fetch(url: str) -> list:
    """I/O-bound: runs in thread (default)."""
    return [1, 2, 3]

@node(executor="process")
def transform(data: list) -> list:
    """CPU-bound: runs in a separate process."""
    return [x ** 2 for x in data]

@node
async def save(data: list) -> str:
    """Async: awaited directly on the event loop."""
    return f"Saved {len(data)} records"

pipeline = fetch >> transform >> save
executor = AsyncExecutor(pipeline)
result = await executor.execute(url="https://example.com")

Credential Management

Securely store and retrieve secrets with layered resolution (env vars โ†’ .env โ†’ encrypted DB):

# CLI
export DAGLOOM_MASTER_KEY=$(python -c "from dagloom.security import Encryptor; print(Encryptor.generate_key())")
dagloom secret set API_KEY "sk-abc123"
dagloom secret get API_KEY
dagloom secret list
dagloom secret delete API_KEY
# Python API
from dagloom.security import Encryptor, SecretStore
from dagloom.store.db import Database

db = Database()
await db.connect()
store = SecretStore(db=db, encryptor=Encryptor())

await store.set("API_KEY", "sk-abc123")
value = await store.get("API_KEY")  # Checks env โ†’ .env โ†’ encrypted DB

HTTP Authentication

Protect your Dagloom server with API Key or Basic authentication:

# API Key authentication
dagloom serve --auth-type API_KEY --auth-key sk-your-secret-key

# Basic authentication (username:password)
dagloom serve --auth-type BASIC_AUTH --auth-key admin:mypassword

# No authentication (default)
dagloom serve
# Client: API Key authentication
import httpx

headers = {"Authorization": "Bearer sk-your-secret-key"}
response = httpx.get("http://localhost:8000/api/pipelines", headers=headers)

# Client: Basic authentication
response = httpx.get("http://localhost:8000/api/pipelines", auth=("admin", "mypassword"))

Start the Web UI

dagloom serve
# Open http://localhost:8000 in your browser

๐Ÿ”Œ Connectors

Dagloom includes built-in connectors for common data sources:

Available connectors: PostgreSQL, MySQL, S3/MinIO, HTTP API, MongoDB, Redis, Kafka

pip install dagloom[connectors]     # PostgreSQL, MySQL, S3, HTTP
pip install dagloom[mongodb]        # MongoDB (motor)
pip install dagloom[redis]          # Redis (redis-py)
pip install dagloom[kafka]          # Kafka (aiokafka)
pip install dagloom[all-connectors] # All connectors
from dagloom.connectors import ConnectionConfig
from dagloom.connectors.mongodb import MongoDBConnector

config = ConnectionConfig(host="localhost", database="mydb")
async with MongoDBConnector(config) as mongo:
    docs = await mongo.execute("find", collection="users", filter={"active": True})

๐Ÿ—๏ธ Architecture

Single Process Architecture
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  CLI / Web UI                       โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  FastAPI (REST API + WebSocket)     โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  Scheduler (APScheduler + asyncio)  โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  Core (@node + Pipeline + DAG)      โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  SQLite (embedded, zero config)     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ“ฆ Project Structure

dagloom/
โ”œโ”€โ”€ core/       # @node decorator, Pipeline class, DAG validation
โ”œโ”€โ”€ scheduler/  # Cron/interval scheduler, asyncio executor, caching, checkpoint
โ”œโ”€โ”€ security/   # Encrypted secret store, Fernet encryption, HTTP authentication (API Key + Basic Auth)
โ”œโ”€โ”€ connectors/ # PostgreSQL, MySQL, S3, HTTP, MongoDB, Redis, Kafka connectors
โ”œโ”€โ”€ server/     # FastAPI REST API + WebSocket
โ”œโ”€โ”€ store/      # SQLite storage layer
โ””โ”€โ”€ cli/        # Click CLI (serve, run, list, inspect, scheduler, secret)

๐Ÿ“– Documentation

๐Ÿค Contributing

Contributions are welcome! Please feel free to:

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

๐Ÿ“„ License

Apache License 2.0 โ€” see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagloom-0.12.0.tar.gz (214.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagloom-0.12.0-py3-none-any.whl (97.6 kB view details)

Uploaded Python 3

File details

Details for the file dagloom-0.12.0.tar.gz.

File metadata

  • Download URL: dagloom-0.12.0.tar.gz
  • Upload date:
  • Size: 214.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dagloom-0.12.0.tar.gz
Algorithm Hash digest
SHA256 394212ac3d546aad8eb5af265809b6bf55e4b050c972f1fdc16cdfdb2e0874f3
MD5 4c716dcfe3b2d9ef9830995ac0bf7ef6
BLAKE2b-256 1d37f229352217e349a695b2f5b10ca73c88cd8150f6e5ad1e518b7f186c6e19

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagloom-0.12.0.tar.gz:

Publisher: publish.yml on lucientong/dagloom

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dagloom-0.12.0-py3-none-any.whl.

File metadata

  • Download URL: dagloom-0.12.0-py3-none-any.whl
  • Upload date:
  • Size: 97.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dagloom-0.12.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0749367c1abb8f875d65f0134ee520fee21b4db25d12ddeb46e0c5de87a8f70d
MD5 21d6b3d6ec56364c3d04f38dcfd530dc
BLAKE2b-256 4d157a94360912bdee2b59ee22ec8354298f2dbdfb0a0e562cd5a5ad2e4ca338

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagloom-0.12.0-py3-none-any.whl:

Publisher: publish.yml on lucientong/dagloom

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page