Skip to main content

a pipeline framework for streaming processing

Project description

https://badge.fury.io/py/tanbih-pipeline.svg Documentation Status Maintainability Score

a flexible stream processing framework supporting RabbitMQ, Pulsar, Kafka and Redis.

Features

  • at-least-once guaranteed with acknowledgement on every message

  • horizontally scalable through consumer groups

  • flow is controlled in deployment, develop it once, use it everywhere

  • testability provided with FILE and MEMORY input/output

Requirements

  • Python 3.8

Installation

$ pip install tanbih-pipeline

You can install the required backend dependencies with:

$ pip install tanbih-pipeline[redis]
$ pip install tanbih-pipeline[kafka]
$ pip install tanbih-pipeline[pulsar]
$ pip install tanbih-pipeline[rabbitmq]
$ pip install tanbih-pipeline[azure]

If you want to support all backends, you can:

$ pip install tanbih-pipeline[full]

Producer

Producer is to be used when developing a data source in our pipeline. A source will produce output without input. A crawler can be seen as a producer.

>>> from typing import Generator
>>> from pydantic import BaseModel
>>> from pipeline import Producer as Worker, ProducerSettings as Settings
>>>
>>> class Output(BaseModel):
...     key: int
>>>
>>> class MyProducer(Worker):
...     def generate(self) -> Generator[Output, None, None]:
...         for i in range(10):
...             yield Output(key=i)
>>>
>>> settings = Settings(name='producer', version='0.0.0', description='')
>>> producer = MyProducer(settings, output_class=Output)
>>> producer.parse_args("--out-kind MEM --out-topic test".split())
>>> producer.start()
>>> [r.get('key') for r in producer.destination.results]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Processor

Processor is to be used to process input. Modification will be in-place. A processor can produce one output for each input, or no output.

>>> from pipeline import Processor as Worker, ProcessorSettings as Settings
>>>
>>> class Input(BaseModel):
...     key: int
>>>
>>> class Output(BaseModel):
...     key: int
...     processed: bool
>>>
>>> class MyProcessor(Worker):
...     def process(self, input):
...         return Output(key=input.key, processed=True)
>>>
>>> settings = Settings(name='processor', version='0.1.0', description='')
>>> processor = MyProcessor(settings, input_class=Input, output_class=Output)
>>> args = "--in-kind MEM --in-topic test --out-kind MEM --out-topic test".split()
>>> processor.parse_args(args)
>>> processor.start()

Splitter

Splitter is to be used when writing to multiple outputs. It will take a function to generate output topic based on the processing message, and use it when writing output.

>>> from pipeline import Splitter as Worker, SplitterSettings as Settings
>>>
>>> class MySplitter(Worker):
...     def get_topic(self, msg):
...         return '{}-{}'.format(self.destination.topic, msg.get('id'))
>>>
>>> settings = Settings(name='splitter', version='0.1.0', description='')
>>> splitter = MySplitter(settings)
>>> args = "--in-kind MEM --in-topic test --out-kind MEM --out-topic test".split()
>>> splitter.parse_args(args)
>>> splitter.start()

Usage

Writing a Worker

Choose Producer, Processor or Splitter to subclass from.

Environment Variables

Application accepts following environment variables (Please note, you will need to add prefix IN_, –in- and OUT_, –out- to these variables to indicate the option for input and output):

environment variable

command line argument

options

KIND

–kind

KAFKA, PULSAR, FILE

PULSAR

–pulsar

pulsar url

TENANT

–tenant

pulsar tenant

NAMESPACE

–namespace

pulsar namespace

SUBSCRIPTION

–subscription

pulsar subscription

KAFKA

–kafka

kafka url

GROUPID

–group-id

kafka group id

TOPIC

–topic

topic to read

Custom Code

Define add_arguments to add new arguments to worker.

Define setup to run initialization code before worker starts processing messages. setup is called after command line arguments have been parsed. Logic based on options (parsed arguments) goes here.

Options

Errors

The value None above is error you should return if dct or dcts is empty. Error will be sent to topic errors with worker information.

Contribute

Use pre-commit to run black and flake8

Credits

Yifan Zhang (yzhang at hbku.edu.qa)

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tanbih-pipeline-0.11.17.tar.gz (302.7 kB view details)

Uploaded Source

Built Distribution

tanbih_pipeline-0.11.17-py3-none-any.whl (691.0 kB view details)

Uploaded Python 3

File details

Details for the file tanbih-pipeline-0.11.17.tar.gz.

File metadata

  • Download URL: tanbih-pipeline-0.11.17.tar.gz
  • Upload date:
  • Size: 302.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.6.0 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.1 CPython/3.8.2

File hashes

Hashes for tanbih-pipeline-0.11.17.tar.gz
Algorithm Hash digest
SHA256 dd4f547961350edaef5e5cf20d0ae9bf36e0c636deeef26a1c73275ed0320baa
MD5 f31e0d81385b81e6546dfbf15b6c3dd4
BLAKE2b-256 477f47dc95c071dd89126990019f4eb361a729fc49d190a5ef588b85975e7ecd

See more details on using hashes here.

File details

Details for the file tanbih_pipeline-0.11.17-py3-none-any.whl.

File metadata

  • Download URL: tanbih_pipeline-0.11.17-py3-none-any.whl
  • Upload date:
  • Size: 691.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.6.0 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.1 CPython/3.8.2

File hashes

Hashes for tanbih_pipeline-0.11.17-py3-none-any.whl
Algorithm Hash digest
SHA256 7dd507078c3c308370e047c738a9dde3f072738fb40cf8d1bdf2ccbc573d0a98
MD5 f0e11f3821700f9003b6a746b7bc646c
BLAKE2b-256 ef20982b33f1afb0b8f9b6d6468da62b38d635ea4c8026f3b20f36d8facb31c1

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page