a pipeline framework for streaming processing
Project description
Pipeline is a data streaming framework supporting Pulsar/Kafka
Generator
Generator is to be used when developing a data source in our pipeline. A source will produce output without input. A crawler can be seen as a generator.
>>> from pipeline import Generator, Message
>>>
>>> class MyGenerator(Generator):
... def generate(self):
... for i in range(10):
... yield {'id': i}
>>>
>>> generator = MyGenerator('generator', '0.1.0', description='simple generator')
>>> generator.parse_args("--kind MEM --out-topic test".split())
>>> generator.start()
>>> [r.get('id') for r in generator.destination.results]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Processor
Processor is to be used to process input. Modification will be in-place. A processor can produce one output for each input, or no output.
>>> from pipeline import Processor, Message
>>>
>>> class MyProcessor(Processor):
... def process(self, msg):
... msg.update({'processed': True})
... return None
>>>
>>> processor = MyProcessor('processor', '0.1.0', description='simple processor')
>>> config = {'data': [{'id': 1}]}
>>> processor.parse_args("--kind MEM --in-topic test --out-topic test".split(), config=config)
>>> processor.start()
>>> [r.get('id') for r in processor.destination.results]
[1]
Splitter
Splitter is to be used when writing to multiple outputs. It will take a function to generate output topic based on the processing message, and use it when writing output.
>>> from pipeline import Splitter, Message
>>>
>>> class MySplitter(Splitter):
... def get_topic(self, msg):
... return '{}-{}'.format(self.destination.topic, msg.get('id'))
...
... def process(self, msg):
... msg.update({
... 'processed': True,
... })
... return None
>>>
>>> splitter = MySplitter('splitter', '0.1.0', description='simple splitter')
>>> config = {'data': [{'id': 1}]}
>>> splitter.parse_args("--kind MEM --in-topic test --out-topic test".split(), config=config)
>>> splitter.start()
>>> [r.get('id') for r in splitter.destinations['test-1'].results]
[1]
Usage
## Writing a Worker
Choose Generator, Processor or Splitter to subclass from.
## Environment Variables
Application accepts following environment variables:
environment command line variable argument options PIPELINE –kind KAFKA, PULSAR, FILE PULSAR –pulsar pulsar url TENANT –tenant pulsar tenant NAMESPACE –namespace pulsar namespace SUBSCRIPTION –subscription pulsar subscription KAFKA –kafka kafka url GROUPID –group-id kafka group id INTOPIC –in-topic topic to read OUTTOPIC –out-topic topic to write to
## Custom Code
Define add_arguments to add new arguments to worker.
Define setup to run initialization code before worker starts processing messages. setup is called after command line arguments have been parsed. Logic based on options (parsed arguments) goes here.
## Options
## Errors
The value None above is error you should return if dct or dcts is empty. Error will be sent to topic errors with worker information.
Credits
Yifan Zhang (yzhang at hbku.edu.qa)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file tanbih-pipeline-0.5.3.tar.gz
.
File metadata
- Download URL: tanbih-pipeline-0.5.3.tar.gz
- Upload date:
- Size: 72.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/47.1.0 requests-toolbelt/0.9.1 tqdm/4.48.0 CPython/3.8.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d61fd62b0c9f799159c78cd7b98311981ca33e7cb0b3e130bc40e575af46af3f |
|
MD5 | 7b1257787c5551d99a9f64916c2ed2dd |
|
BLAKE2b-256 | 82265b067b4a4a5274d47834f74e47a25793509e3e77ff9082ad7ccabff85b72 |
File details
Details for the file tanbih_pipeline-0.5.3-py3-none-any.whl
.
File metadata
- Download URL: tanbih_pipeline-0.5.3-py3-none-any.whl
- Upload date:
- Size: 140.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/47.1.0 requests-toolbelt/0.9.1 tqdm/4.48.0 CPython/3.8.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | cd8204c14764b0a22cf588180855c1e6a66c92c7847e0c9fbba66af70252352d |
|
MD5 | 3faed35f5817827b807d5e8ce90691eb |
|
BLAKE2b-256 | 24607b70a2ac608a1efd74c7512c4bbc138c649fb7e1a269f240cc241dba846a |