A Railway pattern based pypeline package
Project description
Pypelinerr
Railway Oriented Programming implementation for Python.
Motivation
Handling events that trigger a sequence of commands or side-effects can be extremely elegant using the railway pattern. This package can allow you to create a dedicated pipeline for each flow or event, that will preform the tasks in an ordered fashion.
Basic Usage
- Create a class that inherit from
Pipeline, withphasesmethod:
from pypelinerr import Pipeline
class SomeEventHandler(Pipeline):
def phases(self):
return [
'phase_one',
'phase_two',
'phase_three'
]
def phase_one(self):
self.options['option_for_second_phase'] = True
def phase_two(self):
if 'option_for_second_phase' in self.options:
return 'an option got passed from first phase, cool!'
def phase_three(self):
self.options['reached_third_phase'] = True
- in the pypeline manager, use
SomeEventHandler(options).run(), whereoptionsare the input to the pipeline flow.
Example
For example, if you create a pipeline that first connects to a database, then fetches the a document by an id, validates it, and finally send a post request of the entry's user_id, it can look like this:
import requests
from pymongo import MongoClient
from pypelinerr import Pipeline
class RetrieveDataAndPost(Pipeline):
def phases(self):
return [
'connect_to_mongo',
'fetch_document',
'validate_document',
'post_results'
]
def connect_to_mongo(self):
self.options['albums_collection'] = MongoClient('<MongoDB_URL>').albums
def fetch_document(self):
album_id = self.options.get('album_id')
self.options['album'] = self.options['albums_collection'].find_one({ 'id': album_id })
def validate_document(self):
if 'artist_name' not in self.options['album']:
self.fail_operation()
def post_results(self):
selected_artist = self.options.get('album').get('artist_name')
requests.post('<Artists_Service_URL>', data={'selected_artist': selected_artist})
Features
Break operation
Calling the break_operation(message?) allows you to break from the pipeline's (event's handler) pipeline on an invalid result, without failing the whole pipeline.
An example for such a use-case is when checking in a DataBase if an entity not exists, and if so - not continuing the pipeline's flow.
In the bellow flow, we want to process a CRUD event named OrderCreated of our online shop,
where if the user details not present in our DB, we would not want to proceed, because only signed-up users can create an order:
from pypelinerr import Pipeline
class OrderCreated(Pipeline):
def phases(self):
return [
'validate_user_exists',
'create_an_order',
'...',
'...'
]
def validate_user_exists(self):
user = db.user_collection.find_one({'id': self.options['user_id']})
if not user:
self.break_operation(message='user not in mongo collection')
def create_an_order(self):
...
Fail operation
By calling fail_operation(message?), we can stop and fail the pipeline's operation.
This can be very useful in case of, for example, a momentarily network connection issue with another service or a remote DB,
when you will want to send the pipeline's operation result object to a failed-queue, for later processing of the event from the phase
it failed, and with the current options.
We can also raise an Exception and it will count as a failed-operation, but in a case where we have a wrapper around the service call /
DB access which already catches the exception, this is very useful:
Other Module that handle mongo connection:
from pymongo import MongoClient
class UserCollection:
def init_connection(self):
try:
return MongoClient('<MongoDB_URL>').db.user_collection
except Exception as e:
return None
...
Our pypeline:
from models.user import UserCollection
from pypelinerr import Pipeline
class OrderCreated(Pipeline):
def phases(self):
return [
'validate_user_exists',
'create_an_order',
'...',
'...'
]
def validate_user_exists(self):
user_collection = UserCollection().init_connection()
if not user_collection:
self.fail_operation(message='mongo connection failure')
def create_an_order(self):
...
Schema
The Schema mechanism allows us to validate the messages (the event payload) been passed to the pipeline, before the pipeline itself starts:
schema = Schema({'user_id': int, 'logged_in': bool})
event_payload = {'user_id': 'not a number', 'logged_in': True}
OrderCreated(options=event_payload, schema=schema).run()
This run will result in a failure, with a fail message of:
schema.SchemaUnexpectedTypeError: 'not a number' should be instance of 'int'
entry_phase
The entry phase allow us to use the pipeline from a specific phase, and skip it's previous phases. The most common use-case for this is re-running the pipeline in a case of failure, or using only a small part of the whole pipeline.
event_payload = {'user_id': 123, 'logged_in': True}
OrderCreated(options=event_payload, entry_phase='create_an_order').run()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pypelinerr-0.0.6.tar.gz.
File metadata
- Download URL: pypelinerr-0.0.6.tar.gz
- Upload date:
- Size: 5.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.7.1 importlib_metadata/4.8.2 pkginfo/1.8.2 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.10.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e03fbb35862ba776f636f811d8bc696b325e5988da3fe10bba12ba57f53c77fd
|
|
| MD5 |
6140a55934d115147b75881bb23b1be5
|
|
| BLAKE2b-256 |
45454a95446f6cf1e98a9230d9641a7c7c0fb9147c37c2f598a107267c9b101a
|
File details
Details for the file pypelinerr-0.0.6-py3-none-any.whl.
File metadata
- Download URL: pypelinerr-0.0.6-py3-none-any.whl
- Upload date:
- Size: 6.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.7.1 importlib_metadata/4.8.2 pkginfo/1.8.2 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.10.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3aa18090a2ca56552a1dd43b511b090ac0fe966ccf3ca78de6f2eeb6d52af360
|
|
| MD5 |
fbbe6c361e547e0b819bc7e48bb1a861
|
|
| BLAKE2b-256 |
a70f716ea61ec6f37fb57d324c4d49c2ef84b8e6c72c374b8bfc9f1b6c414d63
|