Create simple event-driven, distributed data pipelines in Python
Project description
Qanat
Qanat: Create simple event-driven, distributed data pipelines in Python.
Qanat is a lightweight tool for setting up event-driven, distributed data pipelines with ease. It lets you link together components into a type-checked, decoupled pipeline, with inter-process communication handled automatically through MQTT (in the future RabbitMQ?) and serialization handled through JSON.
Usage Example
from qanat import QanatPipeline
from dataclasses import dataclass
from typing import List
@dataclass
class Frame:
image: bytes
timestamp: str
@dataclass
class Detection:
type: str
confidence: float
@dataclass
class DetectionResults:
objects: List[Detection]
frame_timestamp: str
# Initialize the pipeline with the MQTT broker connection
pipeline = QanatPipeline(broker="mqtt://broker.hivemq.com:1883")
@pipeline.component(output="qanat-demo/frames/raw")
def frame_producer() -> Frame:
"""
Simulates producing a frame and its metadata.
"""
image_data = b'some_image_data' # Replace with actual image data
return Frame(image=image_data, timestamp="2021-07-01T00:00:00Z")
@pipeline.component(
input="qanat-demo/frames/raw",
output="qanat-demo/detections/results"
)
def detector(frame: Frame) -> DetectionResults:
"""
Processes a frame and detects objects, outputting detection results.
"""
detected_objects = [Detection(type="smoke", confidence=0.98)]
return DetectionResults(objects=detected_objects, frame_timestamp=frame.timestamp)
@pipeline.component(input="qanat-demo/detections/results")
def result_publisher(detection_results: DetectionResults):
"""
Publishes the detection results.
"""
print(f"Publishing results: {detection_results}")
# Start the event loop for testing and demonstration
if __name__ == "__main__":
# For demo or testing purposes, this runs all components in the pipeline
pipeline.start_event_loop() # Blocks until the event loop terminates
# In real-world distributed usage, you would start three separate processes,
# one for each component, e.g.:
# frame_producer.start() # Blocks until this specific component terminates
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pyqanat-0.0.0.tar.gz
(4.2 kB
view hashes)