a pipeline framework for streaming processing
Project description
a flexible stream processing framework supporting RabbitMQ, Pulsar, Kafka and Redis.
Features
at-least-once guaranteed with acknowledgement on every message
horizontally scalable through consumer groups
flow is controlled in deployment, develop it once, use it everywhere
testability provided with FILE and MEMORY input/output
Requirements
Python 3.8
Installation
$ pip install tanbih-pipeline
You can install the required backend dependencies with:
$ pip install tanbih-pipeline[redis]
$ pip install tanbih-pipeline[kafka]
$ pip install tanbih-pipeline[pulsar]
$ pip install tanbih-pipeline[rabbitmq]
$ pip install tanbih-pipeline[azure]
If you want to support all backends, you can:
$ pip install tanbih-pipeline[full]
Producer
Producer is to be used when developing a data source in our pipeline. A source will produce output without input. A crawler can be seen as a producer.
>>> from typing import Generator
>>> from pydantic import BaseModel
>>> from pipeline import Producer as Worker, ProducerSettings as Settings
>>>
>>> class Output(BaseModel):
... key: int
>>>
>>> class MyProducer(Worker):
... def generate(self) -> Generator[Output, None, None]:
... for i in range(10):
... yield Output(key=i)
>>>
>>> settings = Settings(name='producer', version='0.0.0', description='')
>>> producer = MyProducer(settings, output_class=Output)
>>> producer.parse_args("--out-kind MEM --out-topic test".split())
>>> producer.start()
>>> [r.get('key') for r in producer.destination.results]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Processor
Processor is to be used to process input. Modification will be in-place. A processor can produce one output for each input, or no output.
>>> from pipeline import Processor as Worker, ProcessorSettings as Settings
>>>
>>> class Input(BaseModel):
... key: int
>>>
>>> class Output(BaseModel):
... key: int
... processed: bool
>>>
>>> class MyProcessor(Worker):
... def process(self, input):
... return Output(key=input.key, processed=True)
>>>
>>> settings = Settings(name='processor', version='0.1.0', description='')
>>> processor = MyProcessor(settings, input_class=Input, output_class=Output)
>>> args = "--in-kind MEM --in-topic test --out-kind MEM --out-topic test".split()
>>> processor.parse_args(args)
>>> processor.start()
Splitter
Splitter is to be used when writing to multiple outputs. It will take a function to generate output topic based on the processing message, and use it when writing output.
>>> from pipeline import Splitter as Worker, SplitterSettings as Settings
>>>
>>> class MySplitter(Worker):
... def get_topic(self, msg):
... return '{}-{}'.format(self.destination.topic, msg.get('id'))
>>>
>>> settings = Settings(name='splitter', version='0.1.0', description='')
>>> splitter = MySplitter(settings)
>>> args = "--in-kind MEM --in-topic test --out-kind MEM --out-topic test".split()
>>> splitter.parse_args(args)
>>> splitter.start()
Usage
Writing a Worker
Choose Producer, Processor or Splitter to subclass from.
Environment Variables
Application accepts following environment variables (Please note, you will need to add prefix IN_, –in- and OUT_, –out- to these variables to indicate the option for input and output):
environment variable |
command line argument |
options |
---|---|---|
KIND |
–kind |
KAFKA, PULSAR, FILE |
PULSAR |
–pulsar |
pulsar url |
TENANT |
–tenant |
pulsar tenant |
NAMESPACE |
–namespace |
pulsar namespace |
SUBSCRIPTION |
–subscription |
pulsar subscription |
KAFKA |
–kafka |
kafka url |
GROUPID |
–group-id |
kafka group id |
TOPIC |
–topic |
topic to read |
Custom Code
Define add_arguments to add new arguments to worker.
Define setup to run initialization code before worker starts processing messages. setup is called after command line arguments have been parsed. Logic based on options (parsed arguments) goes here.
Options
Errors
The value None above is error you should return if dct or dcts is empty. Error will be sent to topic errors with worker information.
Contribute
Use pre-commit to run black and flake8
Credits
Yifan Zhang (yzhang at hbku.edu.qa)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file tanbih-pipeline-0.11.0.tar.gz
.
File metadata
- Download URL: tanbih-pipeline-0.11.0.tar.gz
- Upload date:
- Size: 252.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/3.10.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.60.0 CPython/3.8.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a0ce7add9a34016cac83d213f244a7730007f91f4f6b5f119575942c75da02cf |
|
MD5 | 40cf9af7f52c0c78a7540c345e55ba20 |
|
BLAKE2b-256 | 7a04a754ba241dfe2259c0cae246a4a37e8066f3d6c99a75268fdce4cbec4bea |
File details
Details for the file tanbih_pipeline-0.11.0-py3-none-any.whl
.
File metadata
- Download URL: tanbih_pipeline-0.11.0-py3-none-any.whl
- Upload date:
- Size: 621.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/3.10.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.60.0 CPython/3.8.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3dc2c53f128e1cd6383dd18a2f82689f0374c9cdce1d241bc75ebe7fd6d3541f |
|
MD5 | 3602d29705cc0606d7513c86135d0483 |
|
BLAKE2b-256 | e23f3a0c49025f31679232b04fbb7e258afc01b9bc5145d673a0e6594db945b4 |