Skip to main content

A lightweight pipeline/workflow engine. Weave data processing nodes into DAG workflows with decorators and the >> operator.

Project description

๐Ÿงถ Dagloom

PyPI version PyPI Downloads Python 3.11+ License

Like a loom weaving threads into fabric, Dagloom weaves data processing nodes into DAG workflows.

A lightweight pipeline/workflow engine for Python. Define nodes with decorators, connect them with the >> operator, visualize and edit in a drag-and-drop Web UI.

ไธญๆ–‡ๆ–‡ๆกฃ


โœจ Why Dagloom?

Problem Competitors Dagloom
Overkill installation Airflow needs PostgreSQL + Redis + Celery + Webserver pip install dagloom && dagloom serve
Too many concepts Dagster: Assets, Ops, Jobs, Resources, IO Managers... Just @node and >>
Code/visual disconnect Airflow UI is read-only True bidirectional sync
Can't resume from failure Re-run the entire pipeline dagloom resume picks up where it left off
Shell-only nodes Dagu only supports shell commands Native Python objects (DataFrames, dicts, classes)

๐Ÿš€ Quick Start

Installation

pip install dagloom

Your First Pipeline

from dagloom import node, Pipeline

@node
def greet(name: str) -> str:
    """Create a greeting message."""
    return f"Hello, {name}!"

@node
def shout(message: str) -> str:
    """Convert message to uppercase."""
    return message.upper()

@node
def add_emoji(message: str) -> str:
    """Add emoji to the message."""
    return f"๐ŸŽ‰ {message} ๐ŸŽ‰"

# Build DAG with >> operator
pipeline = greet >> shout >> add_emoji

# Run the pipeline
result = pipeline.run(name="World")
print(result)  # ๐ŸŽ‰ HELLO, WORLD! ๐ŸŽ‰

Conditional Branching

Use the | operator to create mutually exclusive branches โ€” the runtime selects which branch to execute based on the upstream output:

from dagloom import node

@node
def classify(text: str) -> dict:
    """Route to different processors."""
    if "urgent" in text:
        return {"branch": "urgent_handler", "text": text}
    return {"branch": "normal_handler", "text": text}

@node
def urgent_handler(data: dict) -> str:
    return f"๐Ÿšจ URGENT: {data['text']}"

@node
def normal_handler(data: dict) -> str:
    return f"๐Ÿ“‹ Normal: {data['text']}"

pipeline = classify >> (urgent_handler | normal_handler)
result = pipeline.run(text="urgent: server down!")
# ๐Ÿšจ URGENT: urgent: server down!

Streaming Nodes (Generator)

Node functions can be generators โ€” yielded values are automatically collected into a list:

@node
def stream_data(url: str):
    """Yield data chunks."""
    for i in range(5):
        yield {"chunk": i, "url": url}

@node
def aggregate(chunks: list[dict]) -> int:
    return len(chunks)

pipeline = stream_data >> aggregate
result = pipeline.run(url="https://example.com")
# 5

Execution Hooks

Monitor node execution with on_node_start / on_node_end callbacks:

import asyncio
from dagloom import node, AsyncExecutor

@node
def step(x: int) -> int:
    return x + 1

pipeline = step

def my_hook(node_name, ctx):
    print(f"  โ†’ {node_name}: {ctx.get_node_info(node_name).status}")

executor = AsyncExecutor(
    pipeline,
    on_node_start=my_hook,
    on_node_end=my_hook,
)
result = asyncio.run(executor.execute(x=1))

Pipeline Scheduling

Schedule pipelines to run automatically on cron expressions or fixed intervals:

from dagloom import node, Pipeline

@node
def fetch(url: str = "https://example.com/data.csv") -> list:
    return [1, 2, 3]

@node
def process(data: list) -> int:
    return sum(data)

# Set schedule via Pipeline constructor
pipeline = Pipeline(name="daily_etl", schedule="0 9 * * *")

# Or use interval shorthand
pipeline = Pipeline(name="frequent_check", schedule="every 30m")

# Or set after construction
pipeline = fetch >> process
pipeline.name = "my_pipeline"
pipeline.schedule = "0 9 * * 1-5"  # Weekdays at 9am

The scheduler runs in-process with dagloom serve โ€” schedules are persisted to SQLite and auto-restored on restart.

Notifications (Email / Webhook)

Get notified when pipelines succeed or fail:

from dagloom import node, Pipeline

@node
def fetch(url: str = "https://example.com") -> dict:
    return {"data": [1, 2, 3]}

@node
def process(data: dict) -> int:
    return sum(data["data"])

pipeline = fetch >> process
pipeline.name = "daily_etl"
pipeline.notify_on = {
    "failure": ["email://ops@team.com", "webhook://https://hooks.slack.com/xxx?format=slack"],
    "success": ["webhook://https://hooks.slack.com/yyy?format=slack"],
}

Supported channels:

  • Email: email://recipient@example.com โ€” SMTP delivery via aiosmtplib
  • Slack: webhook://https://hooks.slack.com/...?format=slack โ€” Block Kit formatting
  • WeChat Work: webhook://https://qyapi.weixin.qq.com/...?format=wechat_work
  • Feishu: webhook://https://open.feishu.cn/...?format=feishu
  • Generic Webhook: webhook://https://your-endpoint.com/hook โ€” plain JSON POST

Advanced Features

@node(retry=3, cache=True, timeout=30.0)
def fetch_data(url: str) -> pd.DataFrame:
    """Fetch CSV data with retry and caching."""
    return pd.read_csv(url)

@node(cache=True)
def clean(df: pd.DataFrame) -> pd.DataFrame:
    """Remove rows with missing values."""
    return df.dropna()

@node
def save(df: pd.DataFrame) -> str:
    """Persist cleaned data to parquet file."""
    path = "output/cleaned.parquet"
    df.to_parquet(path)
    return path

pipeline = fetch_data >> clean >> save
pipeline.run(url="https://example.com/data.csv")

Cache dependency invalidation: When fetch_data produces a different output on re-run, Dagloom automatically invalidates the caches for clean and save so they re-execute with fresh data. No manual cache management needed.

Per-Node Executor Hints

Control execution strategy per node โ€” run CPU-heavy work in separate processes while keeping I/O-bound nodes in the event loop:

from dagloom import node, AsyncExecutor

@node
def fetch(url: str) -> list:
    """I/O-bound: runs in thread (default)."""
    return [1, 2, 3]

@node(executor="process")
def transform(data: list) -> list:
    """CPU-bound: runs in a separate process."""
    return [x ** 2 for x in data]

@node
async def save(data: list) -> str:
    """Async: awaited directly on the event loop."""
    return f"Saved {len(data)} records"

pipeline = fetch >> transform >> save
executor = AsyncExecutor(pipeline)
result = await executor.execute(url="https://example.com")

Start the Web UI

dagloom serve
# Open http://localhost:8000 in your browser

๐Ÿ—๏ธ Architecture

Single Process Architecture
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  CLI / Web UI                       โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  FastAPI (REST API + WebSocket)     โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  Scheduler (APScheduler + asyncio)  โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  Core (@node + Pipeline + DAG)      โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  SQLite (embedded, zero config)     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ“ฆ Project Structure

dagloom/
โ”œโ”€โ”€ core/       # @node decorator, Pipeline class, DAG validation
โ”œโ”€โ”€ scheduler/  # Cron/interval scheduler, asyncio executor, caching, checkpoint
โ”œโ”€โ”€ connectors/ # PostgreSQL, MySQL, S3, HTTP connectors
โ”œโ”€โ”€ server/     # FastAPI REST API + WebSocket
โ”œโ”€โ”€ store/      # SQLite storage layer
โ””โ”€โ”€ cli/        # Click CLI (serve, run, list, inspect, scheduler)

๐Ÿ“– Documentation

๐Ÿค Contributing

Contributions are welcome! Please feel free to:

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

๐Ÿ“„ License

Apache License 2.0 โ€” see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagloom-0.8.0.tar.gz (172.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagloom-0.8.0-py3-none-any.whl (76.6 kB view details)

Uploaded Python 3

File details

Details for the file dagloom-0.8.0.tar.gz.

File metadata

  • Download URL: dagloom-0.8.0.tar.gz
  • Upload date:
  • Size: 172.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dagloom-0.8.0.tar.gz
Algorithm Hash digest
SHA256 4e7525a3c3af39e08180a459dbdfa722700e75b0908fa7f0ce3c75710e406101
MD5 13aef5e2fc754f2b87b609fb898ad640
BLAKE2b-256 d6d64a3dc8b43da396cb667f5e08bc0b6dca416ab1420c4a8230c01b5cff982b

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagloom-0.8.0.tar.gz:

Publisher: publish.yml on lucientong/dagloom

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dagloom-0.8.0-py3-none-any.whl.

File metadata

  • Download URL: dagloom-0.8.0-py3-none-any.whl
  • Upload date:
  • Size: 76.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dagloom-0.8.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e796044088dd846ccfb72f43661c37f9bd0bb8b4fc1ac0eef64a54ebcee2df4a
MD5 6d92b9ff1a5f84f29179f5a27d8331fe
BLAKE2b-256 7dc43eceadda05c7e27e8194135f160aa65c319855356819664cc5e56345d720

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagloom-0.8.0-py3-none-any.whl:

Publisher: publish.yml on lucientong/dagloom

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page