PypeLine - Python pipelines for the Real World
Project description
______ __ ________ _____ _ _____ _ _ _____
| ___ \\ \ / /| ___ \| ___|| | |_ _|| \ | || ___|
| |_/ / \ V / | |_/ /| |__ | | | | | \| || |__
| __/ \ / | __/ | __| | | | | | . ` || __|
| | | | | | | |___ | |_____| |_ | |\ || |___
\_| \_/ \_| \____/ \_____/\___/ \_| \_/\____/
Overview
PypeLine is a versatile open-source library designed to streamline the management of data workflows and APIs. With PypeLine, you can efficiently schedule cron jobs, execute complex Directed Acyclical Graph (DAG) pipelines, and set up a Flask API complete with OpenAPI documentation.
Key Features
- Cron Job Scheduling: Easily schedule recurring tasks with flexible cron job functionality, ensuring that your processes run reliably at specified intervals.
- DAG Pipelines: Define and execute DAGs to manage complex data workflows with dependencies. PypeLine handles the execution order and parallelism, ensuring that each task runs in the correct sequence.
- Flask API with OpenAPI: Quickly configure a RESTful API using Flask, with built-in support for OpenAPI documentation, allowing for clear, standardized documentation of your endpoints.
Requirements
- RabbitMQ
- Redis
- Docker (optional for dev)
Getting Started
Install PypeLines:
pip install scalable-pypeline[flask,web,workers]>=1.2.3
Configure your Flask project (app.py)
from flask import Flask
from pypeline.flask import FlaskPypeline
from pypeline_demo.api import bp
from pypeline_demo.config import Config
from pypeline_demo.extensions import dramatiq
def create_app():
app = Flask(__name__)
dramatiq.init_app(app)
# Initialize your app with a configuration
app.config.from_object(Config)
pypeline = FlaskPypeline()
pypeline.init_app(app, init_api=True)
# Register API blueprints you wish
app.extensions["pypeline_core_api"].register_blueprint(bp)
# Register application blueprints to application
app.register_blueprint(bp)
return app
if __name__ == "__main__":
app = create_app()
app.run(port=5001)
Configure Dramatiq extension (extensions.py)
from pypeline.dramatiq import Dramatiq
dramatiq = Dramatiq()
Setup your yaml configuration for pypelines (pypeline.yaml)
serviceConfig:
- name: pipeline-worker
registeredTasks:
- handler: pypeline_demo.pipeline.a
- handler: pypeline_demo.pipeline.b
- handler: pypeline_demo.pipeline.c
- handler: pypeline_demo.scheduled_tasks.cron_task
pipelines:
demo_pipeline:
name: Demo Pipeline
description: Pipeline to show examples of DAG Adjacency
schemaVersion: 1
config:
dagAdjacency:
a:
- b
- c
metadata:
maxRetry: 1
retryBackoff: 180
retryBackoffMax: 300
retryJitter: true
maxTtl: 10800
queue: new-queue
taskDefinitions:
a:
handler: pypeline_demo.pipeline.a
b:
handler: pypeline_demo.pipeline.b
c:
handler: pypeline_demo.pipeline.c
scheduledTasks:
cron-task:
name: Example cron task
enabled: true
config:
task: pypeline_demo.scheduled_tasks.cron_task
queue: new-queue
schedule:
minute: '*'
hour: '*'
dayOfWeek: '*'
dayOfMonth: '*'
monthOfYear: '*'
schemaVersion: 1
Setup your modules to be executed by yaml (pipeline.py && scheduled_tasks.py)
import time
def a(event):
print("A")
def b(event):
print("B")
time.sleep(10)
def c(event):
print("C")
def cron_task():
print("HI")
Configure your environment variables (demo.env)
SERMOS_BASE_URL=local
PYPELINE_CLIENT_PKG_NAME=pypeline_demo
REDIS_URL=redis://:password@localhost:6379/0
RABBITMQ_URL=amqp://admin:password@localhost:5672
Start Rabbit & Redis as your message broker and backend results storage. We use docker compose
for this.
DEMO PROJECT COMING SOON!
Testing
If you are developing pypeline and want to test this package, install the test dependencies:
$ pip install -e .[test]
Now, run the tests:
$ tox
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file scalable-pypeline-2.0.10.tar.gz
.
File metadata
- Download URL: scalable-pypeline-2.0.10.tar.gz
- Upload date:
- Size: 35.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.10.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0e0f71e6247a78ac11fdd8a012417dad267a1be3a371c595f443478dfc303fb6 |
|
MD5 | a1210e6a981e580a5a34b91e279667ec |
|
BLAKE2b-256 | 35f4d88e7585fd6924be681f677b438ae180025eb22a73ca36e73ee4cfebdd3b |
File details
Details for the file scalable_pypeline-2.0.10-py2.py3-none-any.whl
.
File metadata
- Download URL: scalable_pypeline-2.0.10-py2.py3-none-any.whl
- Upload date:
- Size: 38.9 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.10.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5de1c1f69d99f77e4ff43440f221d7c67b1b573d26b696325edac053c45d2913 |
|
MD5 | 5d1f4b13c9aa3f446601f3aae6aca923 |
|
BLAKE2b-256 | 4db65eccf5f2a4696a27f676de157bbb89f1586fd53eb1bfa68231c7414d79bc |