Skip to main content

PypeLine - Python pipelines for the Real World

Project description

______ __   ________  _____  _     _____  _   _  _____ 
| ___ \\ \ / /| ___ \|  ___|| |   |_   _|| \ | ||  ___|
| |_/ / \ V / | |_/ /| |__  | |     | |  |  \| || |__  
|  __/   \ /  |  __/ |  __| | |     | |  | . ` ||  __| 
| |      | |  | |    | |___ | |_____| |_ | |\  || |___ 
\_|      \_/  \_|    \____/ \_____/\___/ \_| \_/\____/                                 

Overview

PypeLine is a versatile open-source library designed to streamline the management of data workflows and APIs. With PypeLine, you can efficiently schedule cron jobs, execute complex Directed Acyclical Graph (DAG) pipelines, and set up a Flask API complete with OpenAPI documentation.

Key Features

  • Cron Job Scheduling: Easily schedule recurring tasks with flexible cron job functionality, ensuring that your processes run reliably at specified intervals.
  • DAG Pipelines: Define and execute DAGs to manage complex data workflows with dependencies. PypeLine handles the execution order and parallelism, ensuring that each task runs in the correct sequence.
  • Flask API with OpenAPI: Quickly configure a RESTful API using Flask, with built-in support for OpenAPI documentation, allowing for clear, standardized documentation of your endpoints.

Requirements

  • RabbitMQ
  • Redis
  • Docker (optional for dev)

Getting Started

Install PypeLines:

pip install scalable-pypeline[flask,web,workers]>=1.2.3

Configure your Flask project (app.py)

from flask import Flask
from pypeline.flask import FlaskPypeline
from pypeline_demo.api import bp
from pypeline_demo.config import Config
from pypeline_demo.extensions import dramatiq



def create_app():
    app = Flask(__name__)

    dramatiq.init_app(app)

    # Initialize your app with a configuration
    app.config.from_object(Config)

    pypeline = FlaskPypeline()
    pypeline.init_app(app, init_api=True)

    # Register API blueprints you wish 
    app.extensions["pypeline_core_api"].register_blueprint(bp)
    # Register application blueprints to application
    app.register_blueprint(bp)

    return app


if __name__ == "__main__":
    app = create_app()
    app.run(port=5001)

Configure Dramatiq extension (extensions.py)

from pypeline.dramatiq import Dramatiq


dramatiq = Dramatiq()

Setup your yaml configuration for pypelines (pypeline.yaml)

serviceConfig:
    - name: pipeline-worker
      registeredTasks:
          - handler: pypeline_demo.pipeline.a
          - handler: pypeline_demo.pipeline.b
          - handler: pypeline_demo.pipeline.c
          - handler: pypeline_demo.scheduled_tasks.cron_task

pipelines:
    demo_pipeline:
        name: Demo Pipeline
        description: Pipeline to show examples of DAG Adjacency
        schemaVersion: 1
        config:
            dagAdjacency:
                a:
                    - b
                    - c
            metadata:
                maxRetry: 1
                retryBackoff: 180
                retryBackoffMax: 300
                retryJitter: true
                maxTtl: 10800
                queue: new-queue
            taskDefinitions:
                a:
                    handler: pypeline_demo.pipeline.a
                b:
                    handler:  pypeline_demo.pipeline.b
                c:
                    handler:  pypeline_demo.pipeline.c
scheduledTasks:
    cron-task:
        name: Example cron task
        enabled: true
        config:
            task: pypeline_demo.scheduled_tasks.cron_task
            queue: new-queue
            schedule:
                minute: '*'
                hour: '*'
                dayOfWeek: '*'
                dayOfMonth: '*'
                monthOfYear: '*'
        schemaVersion: 1

Setup your modules to be executed by yaml (pipeline.py && scheduled_tasks.py)

import time


def a(event):
    print("A")


def b(event):
    print("B")
    time.sleep(10)


def c(event):
    print("C")
def cron_task():
    print("HI")

Configure your environment variables (demo.env)

SERMOS_BASE_URL=local
PYPELINE_CLIENT_PKG_NAME=pypeline_demo
REDIS_URL=redis://:password@localhost:6379/0
RABBITMQ_URL=amqp://admin:password@localhost:5672

Start Rabbit & Redis as your message broker and backend results storage. We use docker compose for this.

DEMO PROJECT COMING SOON!

Testing

If you are developing pypeline and want to test this package, install the test dependencies:

$ pip install -e .[test]

Now, run the tests:

$ tox

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

scalable-pypeline-2.1.17.tar.gz (45.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

scalable_pypeline-2.1.17-py2.py3-none-any.whl (52.8 kB view details)

Uploaded Python 2Python 3

File details

Details for the file scalable-pypeline-2.1.17.tar.gz.

File metadata

  • Download URL: scalable-pypeline-2.1.17.tar.gz
  • Upload date:
  • Size: 45.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.9

File hashes

Hashes for scalable-pypeline-2.1.17.tar.gz
Algorithm Hash digest
SHA256 0f1acfcfb457d63e154106546969527624d018cbf7abaad6454d6a5e7a2504f0
MD5 f6bc2176eb1f0a5458b2bb44b53ed1e9
BLAKE2b-256 ad823f434a0f8ee6acd7a39d4dc4da917faf8570a8b7f4146d258bca6633cb9f

See more details on using hashes here.

File details

Details for the file scalable_pypeline-2.1.17-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for scalable_pypeline-2.1.17-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 3d379f9781c8965494b84abb3477fbcc747b87e7018b726d6dbcb5785848cc55
MD5 c18fe9b7ca02a26f307afef65cbf28b9
BLAKE2b-256 9b5725264ebd24c6d8244d38d6a9b0f05238999c3884645059237cd833484487

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page