Skip to main content

a pipeline framework for streaming processing

Project description

https://badge.fury.io/py/tanbih-pipeline.svg Documentation Status Maintainability Score

Pipeline provides an unified interface to set up data stream processing systems with Kafka, Pulsar, RabbitMQ, Redis and many more. The idea is to free developer from the dynamic change of technology in deployment, so that a docker image released for a certain task can be used with Kafka or Redis through changes of environment variables.

Features

  • a unified interface from Kakfa to Pulsar, from Redis to MongoDB

  • components connection controlled via command line, or environment variables

  • support file and in-memory for testing

Requirements

  • Python 3.7, 3.8

Installation

$ pip install tanbih-pipeline

You can install the required backend dependencies with:

$ pip install tanbih-pipeline[redis]
$ pip install tanbih-pipeline[kafka]
$ pip install tanbih-pipeline[pulsar]
$ pip install tanbih-pipeline[rabbitmq]
$ pip install tanbih-pipeline[elastic]
$ pip install tanbih-pipeline[mongodb]

If you want to support all backends, you can:

$ pip install tanbih-pipeline[full]

Producer

Producer is to be used when developing a data source in our pipeline. A source will produce output without input. A crawler can be seen as a producer.

>>> from typing import Generator
>>> from pydantic import BaseModel
>>> from pipeline import Producer as Worker, ProducerSettings as Settings
>>>
>>> class Output(BaseModel):
...     key: int
>>>
>>> class MyProducer(Worker):
...     def generate(self) -> Generator[Output, None, None]:
...         for i in range(10):
...             yield Output(key=i)
>>>
>>> settings = Settings(name='producer', version='0.0.0', description='')
>>> producer = MyProducer(settings, output_class=Output)
>>> producer.parse_args("--out-kind MEM --out-topic test".split())
>>> producer.start()
>>> [r.get('key') for r in producer.destination.results]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Processor

Processor is to be used to process input. Modification will be in-place. A processor can produce one output for each input, or no output.

>>> from pipeline import Processor as Worker, ProcessorSettings as Settings
>>>
>>> class Input(BaseModel):
...     key: int
>>>
>>> class Output(BaseModel):
...     key: int
...     processed: bool
>>>
>>> class MyProcessor(Worker):
...     def process(self, input):
...         return Output(key=input.key, processed=True)
>>>
>>> settings = Settings(name='processor', version='0.1.0', description='')
>>> processor = MyProcessor(settings, input_class=Input, output_class=Output)
>>> args = "--in-kind MEM --in-topic test --out-kind MEM --out-topic test".split()
>>> processor.parse_args(args)
>>> processor.start()

Splitter

Splitter is to be used when writing to multiple outputs. It will take a function to generate output topic based on the processing message, and use it when writing output.

>>> from pipeline import Splitter as Worker, SplitterSettings as Settings
>>>
>>> class MySplitter(Worker):
...     def get_topic(self, msg):
...         return '{}-{}'.format(self.destination.topic, msg.get('id'))
>>>
>>> settings = Settings(name='splitter', version='0.1.0', description='')
>>> splitter = MySplitter(settings)
>>> args = "--in-kind MEM --in-topic test --out-kind MEM --out-topic test".split()
>>> splitter.parse_args(args)
>>> splitter.start()

Usage

Choosing backend technology:

kind

description

multi- reader

shared reader

data expire

LREDIS

Redis List

X

X

read

XREDIS

Redis Stream

X

X

limit

KAFKA

Kafka

X

X

read

PULSAR

Pulsar

X

X

ttl

RABBITMQ

RabbitMQ

X

read

ELASTIC

ElasticSearch

MONGODB

MongoDB

FILE*

json,csv

MEM*

memory

  • FILE accepts jsonl input on stdin and with filename, it also accepts csv file. Both format can be gzipped.

  • MEM read and write to memory, designed for unit tests.

Environment Variables

Application accepts following environment variables (Please note, you will need to add prefix IN_, –in- and OUT_, –out- to these variables to indicate the option for input and output). Please refer to backend documentation for available arguments/environment variables.

Customize Settings

class CustomSettings(Settings):
    new_argument: str = Field("", title="a new argument for custom settings")

class CustomProcessor(Processor):
    def __init__(self):
        settings = CustomSettings("worker", "v0.1.0", "custom processor")
        super().__init__(settings, input_class=BaseModel, output_class=BaseModel)

Errors

PipelineError will be raised when error occurs

Contribute

Use pre-commit to run black and flake8

Credits

Yifan Zhang (yzhang at hbku.edu.qa)

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tanbih-pipeline-0.11.30.tar.gz (339.2 kB view details)

Uploaded Source

Built Distribution

tanbih_pipeline-0.11.30-py3-none-any.whl (760.6 kB view details)

Uploaded Python 3

File details

Details for the file tanbih-pipeline-0.11.30.tar.gz.

File metadata

  • Download URL: tanbih-pipeline-0.11.30.tar.gz
  • Upload date:
  • Size: 339.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.0 importlib_metadata/4.8.2 pkginfo/1.8.2 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.8.2

File hashes

Hashes for tanbih-pipeline-0.11.30.tar.gz
Algorithm Hash digest
SHA256 57f4cb9a246c67e50d048e7a3a0e017d504869333c7b766398035a5c6dcb826a
MD5 8057a9b8c517b4f8cdeb3a052e1db138
BLAKE2b-256 4e06d22f1ec3058852441c29b3d428d5e84e9fcf28b14a2e89cac1b87363fd07

See more details on using hashes here.

File details

Details for the file tanbih_pipeline-0.11.30-py3-none-any.whl.

File metadata

  • Download URL: tanbih_pipeline-0.11.30-py3-none-any.whl
  • Upload date:
  • Size: 760.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.0 importlib_metadata/4.8.2 pkginfo/1.8.2 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.8.2

File hashes

Hashes for tanbih_pipeline-0.11.30-py3-none-any.whl
Algorithm Hash digest
SHA256 c5cf270bf0ae2cf45e27d88e118200c119561ac16e7e09438d83a2b2e3f75b24
MD5 4d874d54a83990aeff2d5da399c51599
BLAKE2b-256 2c5ccc54d7ce533caf92a6f6fa77b2555f8803a0a936b620936c6fc7d57aa5ce

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page