A Railway pattern based pypeline package
Project description
Pypelinerr
Railway Oriented Programming implementation for Python.
Motivation
Handling events that trigger a sequence of commands or side-effects can be extremely elegant using the railway pattern. This package can allow you to create a dedicated pipeline for each flow or event, that will preform the tasks in an ordered fashion.
Basic Usage
- Create a class that inherit from
Pipeline
, withphases
method:
from pypelinerr import Pipeline
class SomeEventHandler(Pipeline):
def phases(self):
return [
'phase_one',
'phase_two',
'phase_three'
]
def phase_one(self):
self.options['option_for_second_phase'] = True
def phase_two(self):
if 'option_for_second_phase' in self.options:
return 'an option got passed from first phase, cool!'
def phase_three(self):
self.options['reached_third_phase'] = True
- in the pypeline manager, use
SomeEventHandler(options).run()
, whereoptions
are the input to the pipeline flow.
Example
For example, if you create a pipeline that first connects to a database, then fetches the a document by an id, validates it, and finally send a post request of the entry's user_id, it can look like this:
import requests
from pymongo import MongoClient
from pypelinerr import Pipeline
class RetrieveDataAndPost(Pipeline):
def phases(self):
return [
'connect_to_mongo',
'fetch_document',
'validate_document',
'post_results'
]
def connect_to_mongo(self):
self.options['albums_collection'] = MongoClient('<MongoDB_URL>').albums
def fetch_document(self):
album_id = self.options.get('album_id')
self.options['album'] = self.options['albums_collection'].find_one({ 'id': album_id })
def validate_document(self):
if 'artist_name' not in self.options['album']:
self.fail_operation()
def post_results(self):
selected_artist = self.options.get('album').get('artist_name')
requests.post('<Artists_Service_URL>', data={'selected_artist': selected_artist})
Features
Break operation
Calling the break_operation(message?)
allows you to break from the pipeline's (event's handler) pipeline on an invalid result, without failing the whole pipeline.
An example for such a use-case is when checking in a DataBase if an entity not exists, and if so - not continuing the pipeline's flow.
In the bellow flow, we want to process a CRUD event named OrderCreated
of our online shop,
where if the user details not present in our DB, we would not want to proceed, because only signed-up users can create an order:
from pypelinerr import Pipeline
class OrderCreated(Pipeline):
def phases(self):
return [
'validate_user_exists',
'create_an_order',
'...',
'...'
]
def validate_user_exists(self):
user = db.user_collection.find_one({'id': self.options['user_id']})
if not user:
self.break_operation(message='user not in mongo collection')
def create_an_order(self):
...
Fail operation
By calling fail_operation(message?)
, we can stop and fail the pipeline's operation.
This can be very useful in case of, for example, a momentarily network connection issue with another service or a remote DB,
when you will want to send the pipeline's operation result object to a failed-queue, for later processing of the event from the phase
it failed, and with the current options.
We can also raise
an Exception and it will count as a failed-operation, but in a case where we have a wrapper around the service call /
DB access which already catches the exception, this is very useful:
Other Module that handle mongo connection:
from pymongo import MongoClient
class UserCollection:
def init_connection(self):
try:
return MongoClient('<MongoDB_URL>').db.user_collection
except Exception as e:
return None
...
Our pypeline:
from models.user import UserCollection
from pypelinerr import Pipeline
class OrderCreated(Pipeline):
def phases(self):
return [
'validate_user_exists',
'create_an_order',
'...',
'...'
]
def validate_user_exists(self):
user_collection = UserCollection().init_connection()
if not user_collection:
self.fail_operation(message='mongo connection failure')
def create_an_order(self):
...
Schema
The Schema mechanism allows us to validate the messages (the event payload) been passed to the pipeline, before the pipeline itself starts:
schema = Schema({'user_id': int, 'logged_in': bool})
event_payload = {'user_id': 'not a number', 'logged_in': True}
OrderCreated(options=event_payload, schema=schema).run()
This run will result in a failure, with a fail message of:
schema.SchemaUnexpectedTypeError: 'not a number' should be instance of 'int'
entry_phase
The entry phase allow us to use the pipeline from a specific phase, and skip it's previous phases. The most common use-case for this is re-running the pipeline in a case of failure, or using only a small part of the whole pipeline.
event_payload = {'user_id': 123, 'logged_in': True}
OrderCreated(options=event_payload, entry_phase='create_an_order').run()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for pypelinerr-0.0.6-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3aa18090a2ca56552a1dd43b511b090ac0fe966ccf3ca78de6f2eeb6d52af360 |
|
MD5 | fbbe6c361e547e0b819bc7e48bb1a861 |
|
BLAKE2b-256 | a70f716ea61ec6f37fb57d324c4d49c2ef84b8e6c72c374b8bfc9f1b6c414d63 |