Skip to main content

A lightweight pipeline/workflow engine. Weave data processing nodes into DAG workflows with decorators and the >> operator.

Project description

๐Ÿงถ Dagloom

PyPI version PyPI Downloads Python 3.11+ License

Like a loom weaving threads into fabric, Dagloom weaves data processing nodes into DAG workflows.

A lightweight pipeline/workflow engine for Python. Define nodes with decorators, connect them with the >> operator, visualize and edit in a drag-and-drop Web UI.

ไธญๆ–‡ๆ–‡ๆกฃ


โœจ Why Dagloom?

Problem Competitors Dagloom
Overkill installation Airflow needs PostgreSQL + Redis + Celery + Webserver pip install dagloom && dagloom serve
Too many concepts Dagster: Assets, Ops, Jobs, Resources, IO Managers... Just @node and >>
Code/visual disconnect Airflow UI is read-only True bidirectional sync
Can't resume from failure Re-run the entire pipeline dagloom resume picks up where it left off
Shell-only nodes Dagu only supports shell commands Native Python objects (DataFrames, dicts, classes)

๐Ÿš€ Quick Start

Installation

pip install dagloom

Your First Pipeline

from dagloom import node, Pipeline

@node
def greet(name: str) -> str:
    """Create a greeting message."""
    return f"Hello, {name}!"

@node
def shout(message: str) -> str:
    """Convert message to uppercase."""
    return message.upper()

@node
def add_emoji(message: str) -> str:
    """Add emoji to the message."""
    return f"๐ŸŽ‰ {message} ๐ŸŽ‰"

# Build DAG with >> operator
pipeline = greet >> shout >> add_emoji

# Run the pipeline
result = pipeline.run(name="World")
print(result)  # ๐ŸŽ‰ HELLO, WORLD! ๐ŸŽ‰

Conditional Branching

Use the | operator to create mutually exclusive branches โ€” the runtime selects which branch to execute based on the upstream output:

from dagloom import node

@node
def classify(text: str) -> dict:
    """Route to different processors."""
    if "urgent" in text:
        return {"branch": "urgent_handler", "text": text}
    return {"branch": "normal_handler", "text": text}

@node
def urgent_handler(data: dict) -> str:
    return f"๐Ÿšจ URGENT: {data['text']}"

@node
def normal_handler(data: dict) -> str:
    return f"๐Ÿ“‹ Normal: {data['text']}"

pipeline = classify >> (urgent_handler | normal_handler)
result = pipeline.run(text="urgent: server down!")
# ๐Ÿšจ URGENT: urgent: server down!

Streaming Nodes (Generator)

Node functions can be generators โ€” yielded values are automatically collected into a list:

@node
def stream_data(url: str):
    """Yield data chunks."""
    for i in range(5):
        yield {"chunk": i, "url": url}

@node
def aggregate(chunks: list[dict]) -> int:
    return len(chunks)

pipeline = stream_data >> aggregate
result = pipeline.run(url="https://example.com")
# 5

Execution Hooks

Monitor node execution with on_node_start / on_node_end callbacks:

import asyncio
from dagloom import node, AsyncExecutor

@node
def step(x: int) -> int:
    return x + 1

pipeline = step

def my_hook(node_name, ctx):
    print(f"  โ†’ {node_name}: {ctx.get_node_info(node_name).status}")

executor = AsyncExecutor(
    pipeline,
    on_node_start=my_hook,
    on_node_end=my_hook,
)
result = asyncio.run(executor.execute(x=1))

Pipeline Scheduling

Schedule pipelines to run automatically on cron expressions or fixed intervals:

from dagloom import node, Pipeline

@node
def fetch(url: str = "https://example.com/data.csv") -> list:
    return [1, 2, 3]

@node
def process(data: list) -> int:
    return sum(data)

# Set schedule via Pipeline constructor
pipeline = Pipeline(name="daily_etl", schedule="0 9 * * *")

# Or use interval shorthand
pipeline = Pipeline(name="frequent_check", schedule="every 30m")

# Or set after construction
pipeline = fetch >> process
pipeline.name = "my_pipeline"
pipeline.schedule = "0 9 * * 1-5"  # Weekdays at 9am

The scheduler runs in-process with dagloom serve โ€” schedules are persisted to SQLite and auto-restored on restart.

Advanced Features

@node(retry=3, cache=True, timeout=30.0)
def fetch_data(url: str) -> pd.DataFrame:
    """Fetch CSV data with retry and caching."""
    return pd.read_csv(url)

@node(cache=True)
def clean(df: pd.DataFrame) -> pd.DataFrame:
    """Remove rows with missing values."""
    return df.dropna()

@node
def save(df: pd.DataFrame) -> str:
    """Persist cleaned data to parquet file."""
    path = "output/cleaned.parquet"
    df.to_parquet(path)
    return path

pipeline = fetch_data >> clean >> save
pipeline.run(url="https://example.com/data.csv")

Start the Web UI

dagloom serve
# Open http://localhost:8000 in your browser

๐Ÿ—๏ธ Architecture

Single Process Architecture
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  CLI / Web UI                       โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  FastAPI (REST API + WebSocket)     โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  Scheduler (APScheduler + asyncio)  โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  Core (@node + Pipeline + DAG)      โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  SQLite (embedded, zero config)     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ“ฆ Project Structure

dagloom/
โ”œโ”€โ”€ core/       # @node decorator, Pipeline class, DAG validation
โ”œโ”€โ”€ scheduler/  # Cron/interval scheduler, asyncio executor, caching, checkpoint
โ”œโ”€โ”€ connectors/ # PostgreSQL, MySQL, S3, HTTP connectors
โ”œโ”€โ”€ server/     # FastAPI REST API + WebSocket
โ”œโ”€โ”€ store/      # SQLite storage layer
โ””โ”€โ”€ cli/        # Click CLI (serve, run, list, inspect, scheduler)

๐Ÿ“– Documentation

๐Ÿค Contributing

Contributions are welcome! Please feel free to:

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

๐Ÿ“„ License

Apache License 2.0 โ€” see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagloom-0.4.0.tar.gz (136.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagloom-0.4.0-py3-none-any.whl (60.2 kB view details)

Uploaded Python 3

File details

Details for the file dagloom-0.4.0.tar.gz.

File metadata

  • Download URL: dagloom-0.4.0.tar.gz
  • Upload date:
  • Size: 136.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dagloom-0.4.0.tar.gz
Algorithm Hash digest
SHA256 9183db330616b791a97658f6cbb3accab19916ebe3bb3d6558ba16003bc6c991
MD5 6f423d049ee79b9f686a1fe7219b8303
BLAKE2b-256 e1f03d9dc53b5310c353306e70e26303a0c9a353eb4e9e0269c03c4f4da6658e

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagloom-0.4.0.tar.gz:

Publisher: publish.yml on lucientong/dagloom

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dagloom-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: dagloom-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 60.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dagloom-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 579ae93f2df938f5175fdb369c43b765751a974bd88f0344e771827cf06bbde3
MD5 c4969e5f8cbdd2cd6370b0439a5722d2
BLAKE2b-256 f9cda6e686b00453d068a8338cf792afc0a5eada92df5c8de29c1e5eff138f6c

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagloom-0.4.0-py3-none-any.whl:

Publisher: publish.yml on lucientong/dagloom

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page