Skip to main content

a pipeline framework for streaming processing

Project description

https://badge.fury.io/py/tanbih-pipeline.svg Documentation Status

Pipeline is a data streaming framework supporting Pulsar/Kafka

Generator

Generator is to be used when developing a data source in our pipeline. A source will produce output without input. A crawler can be seen as a generator.

>>> from pipeline import Generator, Message
>>>
>>> class MyGenerator(Generator):
...     def generate(self):
...         for i in range(10):
...             yield {'id': i}
>>>
>>> generator = MyGenerator('generator', '0.1.0', description='simple generator')
>>> generator.parse_args("--kind MEM --out-topic test".split())
>>> generator.start()
>>> [r.dct['id'] for r in generator.destination.results]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Processor

Processor is to be used to process input. Modification will be in-place. A processor can produce one output for each input, or no output.

>>> from pipeline import Processor, Message
>>>
>>> class MyProcessor(Processor):
...     def process(self, dct_or_dcts):
...         if isinstance(dct_or_dcts, list):
...             print('SHOULD NOT BE HERE')
...         else:
...             dct_or_dcts['processed'] = True
...         return None
>>>
>>> processor = MyProcessor('processor', '0.1.0', description='simple processor')
>>> config = {'data': [{'id': 1}]}
>>> processor.parse_args("--kind MEM --in-topic test --out-topic test".split(), config=config)
>>> processor.start()
>>> [r.dct['id'] for r in processor.destination.results]
[1]

Splitter

Splitter is to be used when writing to multiple outputs. It will take a function to generate output topic based on the processing message, and use it when writing output.

>>> from pipeline import Splitter, Message
>>>
>>> class MySplitter(Splitter):
...     def get_topic(self, dct):
...         return '{}-{}'.format(self.destination.topic, dct['id'])
...
...     def process(self, dct_or_dcts):
...         if isinstance(dct_or_dcts, list):
...             print('SHOULD NOT BE HERE')
...         else:
...             dct_or_dcts['processed'] = True
...         return None
>>>
>>> splitter = MySplitter('splitter', '0.1.0', description='simple splitter')
>>> config = {'data': [{'id': 1}]}
>>> splitter.parse_args("--kind MEM --in-topic test --out-topic test".split(), config=config)
>>> splitter.start()
>>> [r.dct['id'] for r in splitter.destinations['test-1'].results]
[1]

Usage

## Writing a Worker

Choose Generator, Processor or Splitter to subclass from.

## Environment Variables

Application accepts following environment variables:

environment command line variable argument options PIPELINE –kind KAFKA, PULSAR, FILE PULSAR –pulsar pulsar url TENANT –tenant pulsar tenant NAMESPACE –namespace pulsar namespace SUBSCRIPTION –subscription pulsar subscription KAFKA –kafka kafka url GROUPID –group-id kafka group id INTOPIC –in-topic topic to read OUTTOPIC –out-topic topic to write to

## Custom Code

Define add_arguments to add new arguments to worker.

Define setup to run initialization code before worker starts processing messages. setup is called after command line arguments have been parsed. Logic based on options (parsed arguments) goes here.

## Options

## Errors

The value None above is error you should return if dct or dcts is empty. Error will be sent to topic errors with worker information.

Credits

Yifan Zhang (yzhang at hbku.edu.qa)

Project details


Release history Release notifications | RSS feed

This version

0.1.1

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tanbih-pipeline-0.1.1.tar.gz (48.7 kB view details)

Uploaded Source

Built Distribution

tanbih_pipeline-0.1.1-py3-none-any.whl (61.7 kB view details)

Uploaded Python 3

File details

Details for the file tanbih-pipeline-0.1.1.tar.gz.

File metadata

  • Download URL: tanbih-pipeline-0.1.1.tar.gz
  • Upload date:
  • Size: 48.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/39.0.1 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.6.9

File hashes

Hashes for tanbih-pipeline-0.1.1.tar.gz
Algorithm Hash digest
SHA256 add0692d57d2f303d07eb3e30d3d02cc9d34ca3a1908911ae9aee7699f534df3
MD5 219f5ee05c0ce3efb5bb4c662648a982
BLAKE2b-256 0b6e2a3a9004fe4bfd63df275f76f87fd2452cff63d6afea46746ce0da9617b3

See more details on using hashes here.

File details

Details for the file tanbih_pipeline-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: tanbih_pipeline-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 61.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/39.0.1 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.6.9

File hashes

Hashes for tanbih_pipeline-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 34127c3e1b7a9b63aacad4b469413fccc386ada323f20df6659ff4a72623bb9d
MD5 6e49dbdd9912d22df7f8a4732a000e04
BLAKE2b-256 c19221f6ce8c2f4a79cd7cda2dac4027e9072bd0fec01772aba1ff9964579130

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page