simple way to build the declarative and distributed data pipelines with python. it supports rabbitmq or kafka as a broker
Project description
unipipeline
simple way to build the declarative and distributed data pipelines.
Why you should use it
- Declarative config
- Fully typed
- Multi-broker support
- kafka
- rabbitmq
- inmemory pubsub
How to Install
$ pip3 install unipipeline
Example
# dag.yml
brokers:
default_broker:
import_template: "some.module.broker:MyBroker"
messages:
first_message:
import_template: "some.module.first_message:FirstMessage"
second_message:
import_template: "some.module.second_message:SecondMessage"
workers:
__default__:
broker: default_broker
first_worker:
input_message: first_message
inport_template: "some.module.first_worker:FirstWorker"
second_worker:
input_message: second_message
import_template: "some.module.second_worker:SecondWorker"
# ./some/module/second_message.py
from unipipeline import UniMessage
class SecondMessage(UniMessage):
some_prop: bool
some_other_prop: str
# ./some/module/first_worker.py
from unipipeline import UniWorker
from some.module.second_message import SecondMessage
class MyWorker(UniWorker[SecondMessage]):
def handle_message(self, message: SecondMessage) -> None:
print("hello ", message.some_other_prop)
# main.py
from unipipeline import Uni
u = Uni("dag.yml")
u.check_load_all(create=True)
w = u.get_worker("name_of_worker")
w.send(
some_prop=True,
some_other_prop="World!"
)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
unipipeline-1.2.1.tar.gz
(23.7 kB
view hashes)
Built Distribution
Close
Hashes for unipipeline-1.2.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | bc05d22ddcda999a65646c0e7676a17d8623ab57fe1053ebb14eb3720ab30ad3 |
|
MD5 | 7e85bcb080ec22200122732ec270993c |
|
BLAKE2b-256 | 6c2659f96301b1a3f3187afb451d414530ca2143c3151408690ad7cf55afc49a |