Skip to main content

A plugin-based, event-driven orchestration framework for building data processing pipelines

Project description

Courier

Tests Coverage Mypy Ruff Install CSpell Commitizen

Courier is a plugin-based, event-driven orchestration framework for building data processing pipelines. It watches for incoming data, groups it into jobs, and dispatches those jobs to processing workflows. It scales from a single laptop to a distributed cluster without changing your pipeline code.

While general-purpose, Courier ships with extra tooling for geolocation data — satellite instrument configs, metadata extraction, and integration with GeoIPS.

Design Philosophy

Plugin-based. Data monitors, job builders, and dispatchers are all plugins that conform to a simple protocol. Swap a filesystem watcher for a RabbitMQ consumer, or a serial dispatcher for a SLURM submitter, without touching the rest of your pipeline.

Event-driven. Plugins communicate through message queues and do not share state[^1]. When a monitor detects a file, it emits an event. A job builder consumes that event, groups files, and emits a job. A dispatcher picks up the job and runs it. Each stage is decoupled and independently scalable and duplicatable.

Distributable. The broker backend (AMQP, Redis, in-memory or many others) determines your deployment topology. Run everything in one process for development, or spread plugins across machines or even networks for production.

Observable by default. Every plugin exposes health checks and Prometheus metrics. Structured logs carry correlation IDs from file arrival to final product. Optional Loki and Grafana integration for centralized monitoring.

How It Works

Courier runs a central Service that coordinates three stages of plugins through a message broker:

[Data Monitor] → detects new files, emits events
       ↓ (broker queue)
[Job Builder]  → groups files into complete jobs
       ↓ (broker queue)
[Dispatcher]   → executes the processing workflow

Each plugin runs in its own thread with independent health monitoring and automatic restart on failure. Configuration is validated at startup with Pydantic — not halfway through a run.

Quick Start

pip install courier

Running the service

Point Courier at a YAML config and start it:

courier run --config my_config.yaml

Validate your config before running:

courier validate --config my_config.yaml

Writing a data monitor

Subclass DataMonitorBasePlugin and implement find_file as a generator that yields File objects. The base class handles threading, metadata enrichment, and queue emission for you.

from collections.abc import Generator
from pathlib import Path

from courier.interfaces.module_based.data_monitors import DataMonitorBasePlugin
from courier.service import Service
from courier.types.file import File

interface = "data_monitors"
family = "standard"
name = "my_monitor"


class MyMonitor(DataMonitorBasePlugin):
    name = "my-monitor"
    version = "1.0.0"

    def __init__(self, service: Service, config: dict) -> None:
        super().__init__(service, config)
        self.watch_dir = Path(config["path"])

    def find_file(self) -> Generator[File, None, None]:
        # Yield File objects as they appear — the base class
        # handles metadata, emission, and metrics automatically.
        for path in self.watch_dir.iterdir():
            if path.is_file():
                yield File(file=path, hostname="localhost")

Satellite Data Support

Courier ships with YAML configs for common satellite instruments including GOES-16/18/19 ABI, Himawari-9 AHI, GK-2A AMI, and Meteosat SEVIRI. These configs define file-matching patterns, expected file counts per scan, and metadata extraction rules — so Courier knows when a complete observation has arrived and how to label it.

Development

pip install -e .[doc,lint,test]
pre-commit install
pre-commit run --all-files

Python 3.11–3.14. Strict mypy. Conventional Commits.

License

See LICENSE for details.

[^1]: Except for rare edge cases for high availability deployments on clusters.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

runcourier-1.0.0a2.tar.gz (98.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

runcourier-1.0.0a2-py3-none-any.whl (138.6 kB view details)

Uploaded Python 3

File details

Details for the file runcourier-1.0.0a2.tar.gz.

File metadata

  • Download URL: runcourier-1.0.0a2.tar.gz
  • Upload date:
  • Size: 98.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for runcourier-1.0.0a2.tar.gz
Algorithm Hash digest
SHA256 feb7dc259a9338fa842af8fc4942716491fd47f0862ec0d495352c344e7a8a58
MD5 27b4559c85f0cd84ea77ef1051e71139
BLAKE2b-256 319edf9a015c361e08f491a0464aee4f30c8febb98d3073827757e95b4752162

See more details on using hashes here.

Provenance

The following attestation bundles were made for runcourier-1.0.0a2.tar.gz:

Publisher: package-and-publish.yaml on CIRA-GEOIPS/courier

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file runcourier-1.0.0a2-py3-none-any.whl.

File metadata

  • Download URL: runcourier-1.0.0a2-py3-none-any.whl
  • Upload date:
  • Size: 138.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for runcourier-1.0.0a2-py3-none-any.whl
Algorithm Hash digest
SHA256 620b7d0541450dbc1e019dd641974d92570aa2d5432adf94d970b12d049497d4
MD5 7cd51c6e68cb9574002fd1276770a7b0
BLAKE2b-256 d582c9b195aa655b0fe3532c30473a494683a1cc270f256bd54956baf11ca9f0

See more details on using hashes here.

Provenance

The following attestation bundles were made for runcourier-1.0.0a2-py3-none-any.whl:

Publisher: package-and-publish.yaml on CIRA-GEOIPS/courier

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page