Skip to main content

Create simple event-driven, distributed data pipelines in Python

Project description

Qanat

Qanat: Create simple event-driven, distributed data pipelines in Python.

Qanat is a lightweight tool for setting up event-driven, distributed data pipelines with ease. It lets you link together components into a type-checked, decoupled pipeline, with inter-process communication handled automatically through MQTT (in the future RabbitMQ?) and serialization handled through JSON.

Usage Example

from qanat import QanatPipeline
from dataclasses import dataclass
from typing import List

@dataclass
class Frame:
    image: bytes
    timestamp: str

@dataclass
class Detection:
    type: str
    confidence: float

@dataclass
class DetectionResults:
    objects: List[Detection]
    frame_timestamp: str

# Initialize the pipeline with the MQTT broker connection
pipeline = QanatPipeline(broker="mqtt://broker.hivemq.com:1883")

@pipeline.component(output="qanat-demo/frames/raw")
def frame_producer() -> Frame:
    """
    Simulates producing a frame and its metadata.
    """
    image_data = b'some_image_data'  # Replace with actual image data
    return Frame(image=image_data, timestamp="2021-07-01T00:00:00Z")

@pipeline.component(
    input="qanat-demo/frames/raw", 
    output="qanat-demo/detections/results"
)
def detector(frame: Frame) -> DetectionResults:
    """
    Processes a frame and detects objects, outputting detection results.
    """
    detected_objects = [Detection(type="smoke", confidence=0.98)]
    return DetectionResults(objects=detected_objects, frame_timestamp=frame.timestamp)

@pipeline.component(input="qanat-demo/detections/results")
def result_publisher(detection_results: DetectionResults):
    """
    Publishes the detection results.
    """
    print(f"Publishing results: {detection_results}")

# Start the event loop for testing and demonstration
if __name__ == "__main__":
    # For demo or testing purposes, this runs all components in the pipeline
    pipeline.start_event_loop() # Blocks until the event loop terminates

    # In real-world distributed usage, you would start three separate processes, 
    # one for each component, e.g.:
    # frame_producer.start() # Blocks until this specific component terminates

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyqanat-0.0.0.tar.gz (4.2 kB view hashes)

Uploaded Source

Built Distribution

pyqanat-0.0.0-py3-none-any.whl (4.6 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page