simple way to build the declarative and distributed data pipelines with python. it supports rabbitmq or kafka as a broker
Project description
unipipeline
simple way to build the declarative and distributed data pipelines.
Why you should use it
- Declarative config
- Fully typed
- Multi-broker support
- kafka
- rabbitmq
- inmemory pubsub
How to Install
$ pip3 install unipipeline
Example
# dag.yml
brokers:
default_broker:
import_template: "some.module.broker:MyBroker"
messages:
first_message:
import_template: "some.module.first_message:FirstMessage"
second_message:
import_template: "some.module.second_message:SecondMessage"
workers:
__default__:
broker: default_broker
first_worker:
input_message: first_message
inport_template: "some.module.first_worker:FirstWorker"
second_worker:
input_message: second_message
import_template: "some.module.second_worker:SecondWorker"
# ./some/module/second_message.py
from unipipeline import UniMessage
class SecondMessage(UniMessage):
some_prop: bool
some_other_prop: str
# ./some/module/first_worker.py
from unipipeline import UniWorker
from some.module.second_message import SecondMessage
class MyWorker(UniWorker[SecondMessage]):
def handle_message(self, message: SecondMessage) -> None:
print("hello ", message.some_other_prop)
# main.py
from unipipeline import Uni
u = Uni("dag.yml")
u.check_load_all(create=True)
w = u.get_worker("name_of_worker")
w.send(
some_prop=True,
some_other_prop="World!"
)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
unipipeline-1.2.5.tar.gz
(26.6 kB
view hashes)
Built Distribution
Close
Hashes for unipipeline-1.2.5-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 496f716a6fd4f2b5e1594fe21dcb8c775fbe5d6985966eed38d0f2ad35c22a45 |
|
MD5 | 76c22c79e28bc8413f35e06c5f704f9a |
|
BLAKE2b-256 | 4ba4e2ddd00622ddf0d226ad3522d205e93808db04cc677550b4fd6d3183bc03 |