Skip to main content

PypeLine - Python pipelines for the Real World

Project description

PypeLine

Quickstart

  1. Add pypeline as a dependency to your Python application

  2. Install extras depending on what you are building:

  3. flask - Convenient interface for Flask applications

  4. web - Some standard web server dependencies we like

  5. workers - Installs Celery and networkx, which are required if using pipelines.

Overview

PypeLines is a fork of [Sermos] (https://gitlab.com/sermos/sermos). PypeLines diverges from Sermos as a SAAS platform and is intented as a suite for job management in conjuction with or indepent from a Flask Web App. Common job management workflow's include running pipelines, scheduled tasks, and other various types of jobs. Pypelines is designed to make these systems faster and more intuitive to create for Python developers.

Under the hood we are simply extending various Celery capabilities like their existing complex workflows and make them suitable for large scale pipelines that can be run in production. To do this PypeLines uses a custom Celery configuration and a library known as Celery-Dyrygent to help orchestrate thousands of tasks at once.

Pypeline

  • Celery Configuration
  • Pipelines
  • CronJobs
  • APIs
  • Utilities

Your Application

This is where all of your code lives and only has a few requirements:

  1. It is a base application written in Python.
  2. Scheduled tasks and Pipeline nodes must be Python Methods that accept at least one positional argument: event
  3. A sermos.yaml file, which is a configuration file for running scheduled tasks and pipelines.

Celery

Pypelines provides sensical default configurations for the use of Celery. The default deployment uses RabbitMQ, and is recommended. This library can be implemented in any other workflow (e.g. Kafka) as desired.

There are two core aspects of Celery that pypeline handles and differ from a standard Celery deployment.

ChainedTask

In celery.py when imported it will configure Celery and also run GenerateCeleryTasks().generate(), which will use the sermos.yaml config to turn customer methods into decorated Celery tasks.

Part of this process includes adding ChainedTask as the base for all of these dynamically generated tasks.

ChainedTask is a Celery Task that injects tools and event into the signature of all dynamically generated tasks.

SermosScheduler

We allow users to set new scheduled / recurring tasks on-the-fly. Celery's default beat_scheduler does not support this behavior and would require the Beat process be killed/restarted upon every change. Instead, we set our custom sermos.celery_beat:SermosScheduler as the beat_scheduler, which takes care of watching the database for new/modified entries and reloads dynamically.

Workers / Tasks / Pipeline Nodes

PypeLine handles decorating the tasks, generating the correct Celery chains, etc.

Customer code has one requirement: write a python method that accepts one positional argument: event

e.g.

def demo_pipeline_node_a(event):
    logger.info(f"RUNNING demo_pipeline_node_a: {event}")
    return

Generators

TODO: This needs to be updated both in code and documentation. Leaving here because it's valuable to update in the future.

A common task associated with processing batches of documents is generating the list of files to process. pypeline.generators contains two helpful classes to generate lists of files from S3 and from a local file system.

S3KeyGenerator will produce a list of object keys in S3. Example:

gen = S3KeyGenerator('access-key', 'secret-key')
files = gen.list_files(
    'bucket-name',
    'folder/path/',
    offset=0,
    limit=4,
    return_full_path=False
)

LocalKeyGenerator will produce a list of file names on a local file system. Example:

gen = LocalKeyGenerator()
files = gen.list_files('/path/to/list/')

Testing

If you are developing pypeline and want to test this package, install the test dependencies:

$ pip install -e .[test]

Now, run the tests:

$ tox

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

scalable-pypeline-1.2.1.tar.gz (51.0 kB view hashes)

Uploaded Source

Built Distribution

scalable_pypeline-1.2.1-py2.py3-none-any.whl (57.8 kB view hashes)

Uploaded Python 2 Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page