PypeLine - Python pipelines for the Real World
Project description
______ __ ________ _____ _ _____ _ _ _____
| ___ \\ \ / /| ___ \| ___|| | |_ _|| \ | || ___|
| |_/ / \ V / | |_/ /| |__ | | | | | \| || |__
| __/ \ / | __/ | __| | | | | | . ` || __|
| | | | | | | |___ | |_____| |_ | |\ || |___
\_| \_/ \_| \____/ \_____/\___/ \_| \_/\____/
PypeLine is a Python library for building and running distributed data pipelines. It provides:
- DAG Pipelines — define tasks as a directed acyclic graph; PypeLine handles execution order and parallelism
- Scheduled Tasks — run recurring jobs via cron schedule
- Flask API — built-in REST API with OpenAPI/Swagger documentation for triggering and managing pipelines
- Job Runner — one-shot worker for running a single pipeline task (e.g., in a Kubernetes Job)
PypeLine uses Dramatiq for task queuing, RabbitMQ (or Redis) as the message broker, and Redis for result storage.
Table of Contents
- Requirements
- Installation
- Project Structure
- 1. Define Your Tasks
- 2. Configure
pypeline.yaml - 3. Set Up Your Flask App
- 4. Set Environment Variables
- 5. Run the Services
- Configuration Reference
- API Authentication
- Testing
Requirements
- Python 3.8+
- RabbitMQ (default broker) or Redis (as broker)
- Redis (for result storage — always required)
Docker Compose is the easiest way to run RabbitMQ and Redis locally. See Run the Services below.
Installation
Install with the extras you need:
# Full install (Flask API + web server + pipeline workers)
pip install scalable-pypeline[flask,web,workers]
# Workers only (no Flask API)
pip install scalable-pypeline[workers]
Available extras:
| Extra | Installs |
|---|---|
flask |
Flask, flask-smorest (OpenAPI) |
web |
gunicorn, gevent (production web server) |
workers |
Dramatiq, APScheduler, networkx (pipeline execution) |
dev |
black |
test |
pytest, tox, coverage |
Project Structure
PypeLine expects your application to be a Python package with a pypeline.yaml file inside it:
my_app/
├── my_app/
│ ├── __init__.py # must define __version__
│ ├── pypeline.yaml # PypeLine configuration
│ ├── pipeline.py # your pipeline task functions
│ └── scheduled_tasks.py # your scheduled task functions
├── app.py # Flask application factory
├── extensions.py # Dramatiq extension
└── my_app.env # environment variables
1. Define Your Tasks
Tasks are plain Python functions. Pipeline tasks receive an event argument containing the pipeline's input data. Scheduled tasks take no arguments.
my_app/pipeline.py
def step_a(event):
# event contains the data passed when the pipeline was triggered
data = event.get("my_input")
print(f"Step A processing: {data}")
def step_b(event):
print("Step B running (depends on A)")
def step_c(event):
print("Step C running (depends on A, runs in parallel with B)")
my_app/scheduled_tasks.py
def hourly_report():
print("Running hourly report...")
2. Configure pypeline.yaml
Place pypeline.yaml inside your Python package directory (next to __init__.py).
There are two pipeline schema versions. Use schemaVersion: 1 for simple pipelines and schemaVersion: 2 when you need multiple handlers per task node or typed pipeline settings.
Schema Version 1
Each task node maps to a single handler function. This is the simplest option and covers most use cases.
serviceConfig:
- name: pipeline-worker
registeredTasks:
- handler: my_app.pipeline.step_a
- handler: my_app.pipeline.step_b
- handler: my_app.pipeline.step_c
- handler: my_app.scheduled_tasks.hourly_report
pipelines:
my-pipeline:
name: My Pipeline
description: An example pipeline with parallel steps
schemaVersion: 1
config:
# DAG adjacency: step_a runs first, then step_b and step_c in parallel
dagAdjacency:
step_a:
- step_b
- step_c
metadata:
maxRetry: 3
retryBackoff: 60 # seconds before first retry
retryBackoffMax: 300 # max backoff seconds
retryJitter: true
maxTtl: 3600 # task time-to-live in seconds
queue: pipeline-queue
taskDefinitions:
step_a:
handler: my_app.pipeline.step_a
step_b:
handler: my_app.pipeline.step_b
queue: step-b-queue # optional per-task queue override
step_c:
handler: my_app.pipeline.step_c
scheduledTasks:
hourly-report:
name: Hourly Report
enabled: true
schemaVersion: 1
config:
task: my_app.scheduled_tasks.hourly_report
queue: pipeline-queue
schedule:
minute: '0'
hour: '*'
dayOfWeek: '*'
dayOfMonth: '*'
monthOfYear: '*'
Schema Version 2
Version 2 adds two capabilities:
- Multiple handlers per task node (
handlerslist instead ofhandlerstring). When triggering a pipeline you can specify which handler index to use for each node — this enables scenario-based execution (e.g. run an alternate algorithm for a given task without changing the pipeline structure). - Typed
settingsschema — define and validate the input parameters your pipeline accepts.
pipelines:
my-pipeline-v2:
name: My Pipeline V2
description: Pipeline with swappable task handlers and validated settings
schemaVersion: 2
config:
dagAdjacency:
step_a:
- step_b
- step_c
metadata:
maxRetry: 3
maxTtl: 3600
queue: pipeline-queue
taskDefinitions:
step_a:
# Index 0: default handler; index 1: alternate implementation
handlers:
- my_app.pipeline.step_a_default
- my_app.pipeline.step_a_alternate
step_b:
handlers:
- my_app.pipeline.step_b
step_c:
handlers:
- my_app.pipeline.step_c
# Optional: define typed, validated input settings for this pipeline
settings:
required:
- threshold
properties:
threshold:
dataType: float
inputType: text
label: Decision Threshold
minimum: 0.0
maximum: 1.0
mode:
dataType: string
inputType: dropdown
label: Processing Mode
options:
- label: Fast
value: fast
- label: Accurate
value: accurate
All registered handlers must still be listed under serviceConfig.registeredTasks:
serviceConfig:
- name: pipeline-worker
registeredTasks:
- handler: my_app.pipeline.step_a_default
- handler: my_app.pipeline.step_a_alternate
- handler: my_app.pipeline.step_b
- handler: my_app.pipeline.step_c
Environment variable interpolation — use ${VAR_NAME} or ${VAR_NAME:default} anywhere in pypeline.yaml:
metadata:
queue: ${TASK_QUEUE:pipeline-queue}
3. Set Up Your Flask App
extensions.py — create the Dramatiq extension:
from pypeline.dramatiq import Dramatiq
dramatiq = Dramatiq()
app.py — create the Flask application factory:
from flask import Flask
from pypeline.flask import FlaskPypeline
from extensions import dramatiq
def create_app():
app = Flask(__name__)
app.config.from_envvar("APP_CONFIG") # or app.config.from_object(...)
# Initialize Dramatiq broker (must happen before FlaskPypeline)
dramatiq.init_app(app)
# Initialize PypeLine — pass init_api=True to enable the REST API + Swagger UI
pypeline = FlaskPypeline()
pypeline.init_app(app, init_api=True)
# Optionally register your own API blueprints into the PypeLine API
# app.extensions["pypeline_core_api"].register_blueprint(my_bp)
return app
if __name__ == "__main__":
app = create_app()
app.run(port=5001)
When init_api=True, PypeLine registers:
GET/POST /api/v1/pipelines— list and trigger pipelinesGET/POST /api/v1/schedules— list and manage scheduled tasks- Swagger UI at
/api/v1/docs
4. Set Environment Variables
# Required: tells PypeLine which package contains your pypeline.yaml
PYPELINE_CLIENT_PKG_NAME=my_app
# Redis URL (used for result storage, always required)
REDIS_URL=redis://:password@localhost:6379/0
# Message broker: RABBITMQ (default) or REDIS
MESSAGE_BROKER=RABBITMQ
RABBIT_URL=amqp://admin:password@localhost:5672
# Optional: protect API endpoints with an access key
API_ACCESS_KEY=your-secret-key
# Optional: override which worker config to load (defaults to WORKER_NAME env var)
# WORKER_NAME=pipeline-worker
5. Run the Services
Start Infrastructure
# docker-compose.yml (example)
# services: rabbitmq, redis
docker compose up -d
Run the Flask API
# Development
flask --app app run --port 5001
# Production
gunicorn "app:create_app()" --bind 0.0.0.0:5001 --worker-class gevent
Run Pipeline Workers
Workers consume tasks from the queues defined in pypeline.yaml:
flask --app app pypeline-worker
# Options:
# -p / --processes number of worker processes (default: CPU count)
# -t / --threads threads per process (default: 8)
# -Q / --queues comma-separated list of queues to consume from
flask --app app pypeline-worker -p 2 -t 4 -Q pipeline-queue
Run the Cron Scheduler
The scheduler reads your scheduledTasks config and enqueues tasks on their cron schedules:
flask --app app cron-scheduler
Run a One-Shot Job (Kubernetes Jobs)
The job-runner CLI starts a single worker that processes one task and exits — useful for Kubernetes Jobs:
# Listen on a specific queue; exits after processing one task
job-runner -q pipeline-queue
# With an idle timeout (exit if no job arrives within 30 seconds)
job-runner -q pipeline-queue --idle-timeout-ms 30000
Configuration Reference
pypeline.yaml Structure
| Field | Required | Description |
|---|---|---|
serviceConfig |
Yes | List of worker definitions. Each entry has a name and registeredTasks. |
serviceConfig[].registeredTasks |
Yes | List of task handlers (handler: module.path.function) this worker loads. |
pipelines |
No | Dict of pipeline definitions keyed by pipeline ID. |
scheduledTasks |
No | Dict of scheduled task definitions keyed by task ID. |
Pipeline config Fields
| Field | Description |
|---|---|
dagAdjacency |
Dict mapping each task to a list of tasks that run after it. |
taskDefinitions |
Dict mapping task names to their handler function paths. |
metadata.maxRetry |
Max number of retries on failure. |
metadata.retryBackoff |
Seconds before first retry. |
metadata.retryBackoffMax |
Max backoff seconds. |
metadata.retryJitter |
Add random jitter to retry delays. |
metadata.maxTtl |
Max task lifetime in seconds before it is discarded. |
metadata.queue |
RabbitMQ/Redis queue name for this pipeline's tasks. |
Environment Variables
| Variable | Default | Description |
|---|---|---|
PYPELINE_CLIENT_PKG_NAME |
— | Required. Your package name (directory containing pypeline.yaml). |
REDIS_URL |
redis://localhost:6379/0 |
Redis connection URL. |
MESSAGE_BROKER |
RABBITMQ |
Broker type: RABBITMQ or REDIS. |
RABBIT_URL |
amqp://admin:password@127.0.0.1:5672 |
RabbitMQ connection URL. |
API_ACCESS_KEY |
— | If set, API endpoints require this key in the accesskey header. |
WORKER_NAME |
— | Name of the service config entry this worker uses. Defaults to serviceConfig[0]. |
PYPELINE_YAML_PATH |
pypeline.yaml |
Relative path to pypeline.yaml within the package. |
IDLE_TIMEOUT_MS |
0 (infinite) |
job-runner only: exit if no job starts within this many milliseconds. |
RESTRICT_WORKER_SHUTDOWN_WHILE_JOBS_RUNNING |
false |
Enable graceful shutdown — prevents workers from stopping mid-task. |
API Authentication
Protect your API by setting API_ACCESS_KEY in the environment. Clients must include the key in requests:
curl -H "accesskey: your-secret-key" http://localhost:5001/api/v1/pipelines
You can also use the require_accesskey decorator on your own endpoints:
from pypeline.flask.decorators import require_accesskey
@bp.route("/my-endpoint")
@require_accesskey
def my_endpoint():
return {"status": "ok"}
Testing
Install test dependencies:
pip install -e ".[test]"
Run tests:
tox
# or directly with pytest
pytest
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file scalable_pypeline-2.2.5.tar.gz.
File metadata
- Download URL: scalable_pypeline-2.2.5.tar.gz
- Upload date:
- Size: 58.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
17df0379fff92ea01f1bdb79f0894fc1f13405f91788a366dac942ac350c09fd
|
|
| MD5 |
f14fd5368482791e51c88cabbd125954
|
|
| BLAKE2b-256 |
e9233e28827d0b55fbe8bb5e6d1c7cd01ae22527ed2fcf3479b71f2345b7333a
|
File details
Details for the file scalable_pypeline-2.2.5-py2.py3-none-any.whl.
File metadata
- Download URL: scalable_pypeline-2.2.5-py2.py3-none-any.whl
- Upload date:
- Size: 63.9 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d284ea5b6b2e7a1dea83a65d8b19d30955dd4741409e4fc6d3751445f7e7ca1d
|
|
| MD5 |
22077cdb95164d10a5d0272dfcb44870
|
|
| BLAKE2b-256 |
459c945ee2d9e8cc6e6139cafe54ef6b19cfe752948811fdee1dc29701c29c44
|