Skip to main content

PypeLine - Python pipelines for the Real World

Project description

______ __   ________  _____  _     _____  _   _  _____
| ___ \\ \ / /| ___ \|  ___|| |   |_   _|| \ | ||  ___|
| |_/ / \ V / | |_/ /| |__  | |     | |  |  \| || |__
|  __/   \ /  |  __/ |  __| | |     | |  | . ` ||  __|
| |      | |  | |    | |___ | |_____| |_ | |\  || |___
\_|      \_/  \_|    \____/ \_____/\___/ \_| \_/\____/

PypeLine is a Python library for building and running distributed data pipelines. It provides:

  • DAG Pipelines — define tasks as a directed acyclic graph; PypeLine handles execution order and parallelism
  • Scheduled Tasks — run recurring jobs via cron schedule
  • Flask API — built-in REST API with OpenAPI/Swagger documentation for triggering and managing pipelines
  • Job Runner — one-shot worker for running a single pipeline task (e.g., in a Kubernetes Job)

PypeLine uses Dramatiq for task queuing, RabbitMQ (or Redis) as the message broker, and Redis for result storage.


Table of Contents


Requirements

  • Python 3.8+
  • RabbitMQ (default broker) or Redis (as broker)
  • Redis (for result storage — always required)

Docker Compose is the easiest way to run RabbitMQ and Redis locally. See Run the Services below.


Installation

Install with the extras you need:

# Full install (Flask API + web server + pipeline workers)
pip install scalable-pypeline[flask,web,workers]

# Workers only (no Flask API)
pip install scalable-pypeline[workers]

Available extras:

Extra Installs
flask Flask, flask-smorest (OpenAPI)
web gunicorn, gevent (production web server)
workers Dramatiq, APScheduler, networkx (pipeline execution)
dev black
test pytest, tox, coverage

Project Structure

PypeLine expects your application to be a Python package with a pypeline.yaml file inside it:

my_app/
├── my_app/
│   ├── __init__.py          # must define __version__
│   ├── pypeline.yaml        # PypeLine configuration
│   ├── pipeline.py          # your pipeline task functions
│   └── scheduled_tasks.py   # your scheduled task functions
├── app.py                   # Flask application factory
├── extensions.py            # Dramatiq extension
└── my_app.env               # environment variables

1. Define Your Tasks

Tasks are plain Python functions. Pipeline tasks receive an event argument containing the pipeline's input data. Scheduled tasks take no arguments.

my_app/pipeline.py

def step_a(event):
    # event contains the data passed when the pipeline was triggered
    data = event.get("my_input")
    print(f"Step A processing: {data}")


def step_b(event):
    print("Step B running (depends on A)")


def step_c(event):
    print("Step C running (depends on A, runs in parallel with B)")

my_app/scheduled_tasks.py

def hourly_report():
    print("Running hourly report...")

2. Configure pypeline.yaml

Place pypeline.yaml inside your Python package directory (next to __init__.py).

There are two pipeline schema versions. Use schemaVersion: 1 for simple pipelines and schemaVersion: 2 when you need multiple handlers per task node or typed pipeline settings.

Schema Version 1

Each task node maps to a single handler function. This is the simplest option and covers most use cases.

serviceConfig:
    - name: pipeline-worker
      registeredTasks:
          - handler: my_app.pipeline.step_a
          - handler: my_app.pipeline.step_b
          - handler: my_app.pipeline.step_c
          - handler: my_app.scheduled_tasks.hourly_report

pipelines:
    my-pipeline:
        name: My Pipeline
        description: An example pipeline with parallel steps
        schemaVersion: 1
        config:
            # DAG adjacency: step_a runs first, then step_b and step_c in parallel
            dagAdjacency:
                step_a:
                    - step_b
                    - step_c
            metadata:
                maxRetry: 3
                retryBackoff: 60        # seconds before first retry
                retryBackoffMax: 300    # max backoff seconds
                retryJitter: true
                maxTtl: 3600           # task time-to-live in seconds
                queue: pipeline-queue
            taskDefinitions:
                step_a:
                    handler: my_app.pipeline.step_a
                step_b:
                    handler: my_app.pipeline.step_b
                    queue: step-b-queue  # optional per-task queue override
                step_c:
                    handler: my_app.pipeline.step_c

scheduledTasks:
    hourly-report:
        name: Hourly Report
        enabled: true
        schemaVersion: 1
        config:
            task: my_app.scheduled_tasks.hourly_report
            queue: pipeline-queue
            schedule:
                minute: '0'
                hour: '*'
                dayOfWeek: '*'
                dayOfMonth: '*'
                monthOfYear: '*'

Schema Version 2

Version 2 adds two capabilities:

  1. Multiple handlers per task node (handlers list instead of handler string). When triggering a pipeline you can specify which handler index to use for each node — this enables scenario-based execution (e.g. run an alternate algorithm for a given task without changing the pipeline structure).
  2. Typed settings schema — define and validate the input parameters your pipeline accepts.
pipelines:
    my-pipeline-v2:
        name: My Pipeline V2
        description: Pipeline with swappable task handlers and validated settings
        schemaVersion: 2
        config:
            dagAdjacency:
                step_a:
                    - step_b
                    - step_c
            metadata:
                maxRetry: 3
                maxTtl: 3600
                queue: pipeline-queue
            taskDefinitions:
                step_a:
                    # Index 0: default handler; index 1: alternate implementation
                    handlers:
                        - my_app.pipeline.step_a_default
                        - my_app.pipeline.step_a_alternate
                step_b:
                    handlers:
                        - my_app.pipeline.step_b
                step_c:
                    handlers:
                        - my_app.pipeline.step_c

            # Optional: define typed, validated input settings for this pipeline
            settings:
                required:
                    - threshold
                properties:
                    threshold:
                        dataType: float
                        inputType: text
                        label: Decision Threshold
                        minimum: 0.0
                        maximum: 1.0
                    mode:
                        dataType: string
                        inputType: dropdown
                        label: Processing Mode
                        options:
                            - label: Fast
                              value: fast
                            - label: Accurate
                              value: accurate

All registered handlers must still be listed under serviceConfig.registeredTasks:

serviceConfig:
    - name: pipeline-worker
      registeredTasks:
          - handler: my_app.pipeline.step_a_default
          - handler: my_app.pipeline.step_a_alternate
          - handler: my_app.pipeline.step_b
          - handler: my_app.pipeline.step_c

Environment variable interpolation — use ${VAR_NAME} or ${VAR_NAME:default} anywhere in pypeline.yaml:

metadata:
    queue: ${TASK_QUEUE:pipeline-queue}

3. Set Up Your Flask App

extensions.py — create the Dramatiq extension:

from pypeline.dramatiq import Dramatiq

dramatiq = Dramatiq()

app.py — create the Flask application factory:

from flask import Flask
from pypeline.flask import FlaskPypeline
from extensions import dramatiq


def create_app():
    app = Flask(__name__)
    app.config.from_envvar("APP_CONFIG")  # or app.config.from_object(...)

    # Initialize Dramatiq broker (must happen before FlaskPypeline)
    dramatiq.init_app(app)

    # Initialize PypeLine — pass init_api=True to enable the REST API + Swagger UI
    pypeline = FlaskPypeline()
    pypeline.init_app(app, init_api=True)

    # Optionally register your own API blueprints into the PypeLine API
    # app.extensions["pypeline_core_api"].register_blueprint(my_bp)

    return app


if __name__ == "__main__":
    app = create_app()
    app.run(port=5001)

When init_api=True, PypeLine registers:

  • GET/POST /api/v1/pipelines — list and trigger pipelines
  • GET/POST /api/v1/schedules — list and manage scheduled tasks
  • Swagger UI at /api/v1/docs

4. Set Environment Variables

# Required: tells PypeLine which package contains your pypeline.yaml
PYPELINE_CLIENT_PKG_NAME=my_app

# Redis URL (used for result storage, always required)
REDIS_URL=redis://:password@localhost:6379/0

# Message broker: RABBITMQ (default) or REDIS
MESSAGE_BROKER=RABBITMQ
RABBIT_URL=amqp://admin:password@localhost:5672

# Optional: protect API endpoints with an access key
API_ACCESS_KEY=your-secret-key

# Optional: override which worker config to load (defaults to WORKER_NAME env var)
# WORKER_NAME=pipeline-worker

5. Run the Services

Start Infrastructure

# docker-compose.yml (example)
# services: rabbitmq, redis
docker compose up -d

Run the Flask API

# Development
flask --app app run --port 5001

# Production
gunicorn "app:create_app()" --bind 0.0.0.0:5001 --worker-class gevent

Run Pipeline Workers

Workers consume tasks from the queues defined in pypeline.yaml:

flask --app app pypeline-worker

# Options:
#   -p / --processes   number of worker processes (default: CPU count)
#   -t / --threads     threads per process (default: 8)
#   -Q / --queues      comma-separated list of queues to consume from
flask --app app pypeline-worker -p 2 -t 4 -Q pipeline-queue

Run the Cron Scheduler

The scheduler reads your scheduledTasks config and enqueues tasks on their cron schedules:

flask --app app cron-scheduler

Run a One-Shot Job (Kubernetes Jobs)

The job-runner CLI starts a single worker that processes one task and exits — useful for Kubernetes Jobs:

# Listen on a specific queue; exits after processing one task
job-runner -q pipeline-queue

# With an idle timeout (exit if no job arrives within 30 seconds)
job-runner -q pipeline-queue --idle-timeout-ms 30000

Configuration Reference

pypeline.yaml Structure

Field Required Description
serviceConfig Yes List of worker definitions. Each entry has a name and registeredTasks.
serviceConfig[].registeredTasks Yes List of task handlers (handler: module.path.function) this worker loads.
pipelines No Dict of pipeline definitions keyed by pipeline ID.
scheduledTasks No Dict of scheduled task definitions keyed by task ID.

Pipeline config Fields

Field Description
dagAdjacency Dict mapping each task to a list of tasks that run after it.
taskDefinitions Dict mapping task names to their handler function paths.
metadata.maxRetry Max number of retries on failure.
metadata.retryBackoff Seconds before first retry.
metadata.retryBackoffMax Max backoff seconds.
metadata.retryJitter Add random jitter to retry delays.
metadata.maxTtl Max task lifetime in seconds before it is discarded.
metadata.queue RabbitMQ/Redis queue name for this pipeline's tasks.

Environment Variables

Variable Default Description
PYPELINE_CLIENT_PKG_NAME Required. Your package name (directory containing pypeline.yaml).
REDIS_URL redis://localhost:6379/0 Redis connection URL.
MESSAGE_BROKER RABBITMQ Broker type: RABBITMQ or REDIS.
RABBIT_URL amqp://admin:password@127.0.0.1:5672 RabbitMQ connection URL.
API_ACCESS_KEY If set, API endpoints require this key in the accesskey header.
WORKER_NAME Name of the service config entry this worker uses. Defaults to serviceConfig[0].
PYPELINE_YAML_PATH pypeline.yaml Relative path to pypeline.yaml within the package.
IDLE_TIMEOUT_MS 0 (infinite) job-runner only: exit if no job starts within this many milliseconds.
RESTRICT_WORKER_SHUTDOWN_WHILE_JOBS_RUNNING false Enable graceful shutdown — prevents workers from stopping mid-task.

API Authentication

Protect your API by setting API_ACCESS_KEY in the environment. Clients must include the key in requests:

curl -H "accesskey: your-secret-key" http://localhost:5001/api/v1/pipelines

You can also use the require_accesskey decorator on your own endpoints:

from pypeline.flask.decorators import require_accesskey

@bp.route("/my-endpoint")
@require_accesskey
def my_endpoint():
    return {"status": "ok"}

Testing

Install test dependencies:

pip install -e ".[test]"

Run tests:

tox
# or directly with pytest
pytest

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

scalable_pypeline-2.2.6.tar.gz (58.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

scalable_pypeline-2.2.6-py2.py3-none-any.whl (64.4 kB view details)

Uploaded Python 2Python 3

File details

Details for the file scalable_pypeline-2.2.6.tar.gz.

File metadata

  • Download URL: scalable_pypeline-2.2.6.tar.gz
  • Upload date:
  • Size: 58.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.9

File hashes

Hashes for scalable_pypeline-2.2.6.tar.gz
Algorithm Hash digest
SHA256 a73d15a0714e123fed5b3bbb57745bb3ed500969760274e7c162d26ddc5df04e
MD5 16711d63b8322987731423aa8f8e8739
BLAKE2b-256 a98dadd2b82f71a98da6870ff73bca3367a229d404be4832855174df1cbdfd45

See more details on using hashes here.

File details

Details for the file scalable_pypeline-2.2.6-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for scalable_pypeline-2.2.6-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 8a3e6d6a10f1e1e49308b8dfe0bc23f1fac001c3227fc886eeba9e471e1445c4
MD5 2b09293914f5b579dbe8bb81e0b550b9
BLAKE2b-256 fbe5063e2fb4b923b98c37bc6dc3162c05cda167124dc6a574c55402e49737ca

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page