Skip to main content

A Railway pattern based operation package

Project description

Operation

python-versions pypi-badge license

Railway Oriented Programming implementation for Python.

Motivation

Handling events that trigger a sequence of commands or side-effects can be extremely elegant using the railway pattern. This package can allow you to create a dedicated pipeline for each flow or event, that will preform the tasks in an ordered fashion.

Basic Usage

  1. Create a class that inherit from Operation, with phases method:
from operation import Operation


class SomeOperation(Operation):
    def phases(self):
        return [
            'phase_one',
            'phase_two',
            'phase_three'
        ]

    def phase_one(self):
        self.options['option_for_second_phase'] = True

    def phase_two(self):
        if 'option_for_second_phase' in self.options:
            return 'an option got passed from first phase, cool!'

    def phase_three(self):
        self.options['reached_third_phase'] = True
  1. in the operation manager, use SomeOperation(options).run(), where options are the input to the operation flow.

Example

For example, if you create an operation that first connects to a database, then fetches the a document by an id, validates it, and finally send a post request of the entry's user_id, it can look like this:

import requests
from pymongo import MongoClient

from operation import Operation


class RetrieveDataAndPost(Operation):
    def phases(self):
        return [
            'connect_to_mongo',
            'fetch_document',
            'validate_document',
            'post_results'
        ]

    def connect_to_mongo(self):
        self.options['albums_collection'] = MongoClient('<MongoDB_URL>').albums

    def fetch_document(self):
        album_id = self.options.get('album_id')
        self.options['album'] = self.options['albums_collection'].find_one({ 'id': album_id })

    def validate_document(self):
        if 'artist_name' not in self.options['album']:
            self.fail_operation()

    def post_results(self):
        selected_artist =  self.options.get('album').get('artist_name')
        requests.post('<Artists_Service_URL>', data={'selected_artist': selected_artist})

Features

Break operation

Calling the break_operation(message?) allows you to break from the operation's (event's handler) pipeline on an invalid result, without failing the whole operation. An example for such a use-case is when checking in a DataBase if an entity not exists, and if so - not continuing the operation.

In the bellow flow, we want to process a CRUD event named OrderCreated of our online shop, where if the user details not present in our DB, we would not want to proceed, because only signed-up users can create an order:

from operation import Operation


class OrderCreated(Operation):
    def phases(self):
        return [
            'validate_user_exists',
            'create_an_order',
            '...',
            '...'
        ]

    def validate_user_exists(self):
        user = db.user_collection.find_one({'id': self.options['user_id']})
        if not user:
            self.break_operation(message='user not in mongo collection')

    def create_an_order(self):
        ...

Fail operation

By calling fail_operation(message?), we can stop and fail the operation. This can be very useful in case of, for example, a momentarily network connection issue with another service or a remote DB, when you will want to send the operation result object to a failed-queue, for later processing of the event from the phase it failed, and with the current options.

We can also raise an Exception and it will count as a failed-operation, but in a case where we have a wrapper around the service call / DB access which already catches the exception, this is very useful:

Other Module that handle mongo connection:

from pymongo import MongoClient


class UserCollection:
    def init_connection(self):
        try:
            return MongoClient('<MongoDB_URL>').db.user_collection
        except Exception as e:
            return None
    ...

Our operation:

from models.user import UserCollection
from operation import Operation


class OrderCreated(Operation):
    def phases(self):
        return [
            'validate_user_exists',
            'create_an_order',
            '...',
            '...'
        ]

    def validate_user_exists(self):
        user_collection =  UserCollection().init_connection()
        if not user_collection:
            self.fail_operation(message='mongo connection failure')

    def create_an_order(self):
        ...

Schema

The Schema mechanism allows us to validate the messages (the event payload) been passed to the operation, before the operation itself starts:

schema = Schema({'user_id': int, 'logged_in': bool})
event_payload = {'user_id': 'not a number', 'logged_in': True}
OrderCreated(options=event_payload, schema=schema).run()

This run will result in a failure, with a fail message of: schema.SchemaUnexpectedTypeError: 'not a number' should be instance of 'int'

entry_phase

The entry phase allow us to use the operation from a specific phase, and skip it's previous phases. The most common use-case for this is re-running the operation in a case of failure, or using only a small part of the whole operation.

event_payload = {'user_id': 123, 'logged_in': True}
OrderCreated(options=event_payload, entry_phase='create_an_order').run()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

operation-0.0.4.tar.gz (5.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

operation-0.0.4-py3-none-any.whl (6.1 kB view details)

Uploaded Python 3

File details

Details for the file operation-0.0.4.tar.gz.

File metadata

  • Download URL: operation-0.0.4.tar.gz
  • Upload date:
  • Size: 5.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/47.1.0 requests-toolbelt/0.9.1 tqdm/4.48.2 CPython/3.8.5

File hashes

Hashes for operation-0.0.4.tar.gz
Algorithm Hash digest
SHA256 33bcc8b6c978f0e15eb6d279ccfb96c8cd03c492dca6334191842a1dc6ef486d
MD5 c0be7dc2d4915461183d01debd7615ec
BLAKE2b-256 5029ebe5e921e23057cf0288e94a7f8582157b4d7bd102107308998a4f3ab63e

See more details on using hashes here.

File details

Details for the file operation-0.0.4-py3-none-any.whl.

File metadata

  • Download URL: operation-0.0.4-py3-none-any.whl
  • Upload date:
  • Size: 6.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/47.1.0 requests-toolbelt/0.9.1 tqdm/4.48.2 CPython/3.8.5

File hashes

Hashes for operation-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 abf1b03c8a6fd5c573253e8efec3540cb2d3a1e5f18d03a38a3d3cb5662758b8
MD5 befe586354c173bb6e46eaa776884c60
BLAKE2b-256 321c469a12efb8ccb23df6879bd867d9032506c9d0ec6b80e99242892fe46c79

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page