Skip to main content

A plugin-based, event-driven orchestration framework for building data processing pipelines

Project description

Courier

Tests Coverage Mypy Ruff Install CSpell Commitizen

Courier is a plugin-based, event-driven orchestration framework for building data processing pipelines. It watches for incoming data, groups it into jobs, and dispatches those jobs to processing workflows. It scales from a single laptop to a distributed cluster without changing your pipeline code.

While general-purpose, Courier ships with extra tooling for geolocation data — satellite instrument configs, metadata extraction, and integration with GeoIPS.

Design Philosophy

Plugin-based. Data monitors, job builders, and dispatchers are all plugins that conform to a simple protocol. Swap a filesystem watcher for a RabbitMQ consumer, or a serial dispatcher for a SLURM submitter, without touching the rest of your pipeline.

Event-driven. Plugins communicate through message queues and do not share state[^1]. When a monitor detects a file, it emits an event. A job builder consumes that event, groups files, and emits a job. A dispatcher picks up the job and runs it. Each stage is decoupled and independently scalable and duplicatable.

Distributable. The broker backend (AMQP, Redis, in-memory or many others) determines your deployment topology. Run everything in one process for development, or spread plugins across machines or even networks for production.

Observable by default. Every plugin exposes health checks and Prometheus metrics. Structured logs carry correlation IDs from file arrival to final product. Optional Loki and Grafana integration for centralized monitoring.

How It Works

Courier runs a central Service that coordinates three stages of plugins through a message broker:

[Data Monitor] → detects new files, emits events
       ↓ (broker queue)
[Job Builder]  → groups files into complete jobs
       ↓ (broker queue)
[Dispatcher]   → executes the processing workflow

Each plugin runs in its own thread with independent health monitoring and automatic restart on failure. Configuration is validated at startup with Pydantic — not halfway through a run.

Quick Start

pip install courier

Running the service

Point Courier at a YAML config and start it:

courier run --config my_config.yaml

Validate your config before running:

courier validate --config my_config.yaml

Writing a data monitor

Subclass DataMonitorBasePlugin and implement find_file as a generator that yields File objects. The base class handles threading, metadata enrichment, and queue emission for you.

from collections.abc import Generator
from pathlib import Path

from courier.interfaces.module_based.data_monitors import DataMonitorBasePlugin
from courier.service import Service
from courier.types.file import File

interface = "data_monitors"
family = "standard"
name = "my_monitor"


class MyMonitor(DataMonitorBasePlugin):
    name = "my-monitor"
    version = "1.0.0"

    def __init__(self, service: Service, config: dict) -> None:
        super().__init__(service, config)
        self.watch_dir = Path(config["path"])

    def find_file(self) -> Generator[File, None, None]:
        # Yield File objects as they appear — the base class
        # handles metadata, emission, and metrics automatically.
        for path in self.watch_dir.iterdir():
            if path.is_file():
                yield File(file=path, hostname="localhost")

Satellite Data Support

Courier ships with YAML configs for common satellite instruments including GOES-16/18/19 ABI, Himawari-9 AHI, GK-2A AMI, and Meteosat SEVIRI. These configs define file-matching patterns, expected file counts per scan, and metadata extraction rules — so Courier knows when a complete observation has arrived and how to label it.

Development

pip install -e .[doc,lint,test]
pre-commit install
pre-commit run --all-files

Python 3.11–3.14. Strict mypy. Conventional Commits.

License

See LICENSE for details.

[^1]: Except for rare edge cases for high availability deployments on clusters.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

runcourier-0.3.0.tar.gz (98.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

runcourier-0.3.0-py3-none-any.whl (138.6 kB view details)

Uploaded Python 3

File details

Details for the file runcourier-0.3.0.tar.gz.

File metadata

  • Download URL: runcourier-0.3.0.tar.gz
  • Upload date:
  • Size: 98.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for runcourier-0.3.0.tar.gz
Algorithm Hash digest
SHA256 71bb909c07e48439cfe114c5ef8b96b0baf39b9aa7a73348cdc70954e57cec38
MD5 89db24ea9c29da1505a69fa272133962
BLAKE2b-256 cd5abbb0acce6acadc49b0d359c2c5fc6df6607a2293c0b07b452b3ee9f4f567

See more details on using hashes here.

Provenance

The following attestation bundles were made for runcourier-0.3.0.tar.gz:

Publisher: package-and-publish.yaml on CIRA-GEOIPS/courier

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file runcourier-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: runcourier-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 138.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for runcourier-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 fd2b13c3386917633581ae581076e725abd6b40129f6eb9b273b665a4b1a67f2
MD5 6d21a877f91b041f7d9693e34a831fb9
BLAKE2b-256 c07f200c3bffb4c6c6b5ee19c1fe7e435d985f5b2d1412d0c2fa68914e419ef5

See more details on using hashes here.

Provenance

The following attestation bundles were made for runcourier-0.3.0-py3-none-any.whl:

Publisher: package-and-publish.yaml on CIRA-GEOIPS/courier

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page