A plugin-based, event-driven orchestration framework for building data processing pipelines
Project description
Courier
Courier is a plugin-based, event-driven orchestration framework for building data processing pipelines. It watches for incoming data, groups it into jobs, and dispatches those jobs to processing workflows. It scales from a single laptop to a distributed cluster without changing your pipeline code.
While general-purpose, Courier ships with extra tooling for geolocation data — satellite instrument configs, metadata extraction, and integration with GeoIPS.
Design Philosophy
Plugin-based. Data monitors, job builders, and dispatchers are all plugins that conform to a simple protocol. Swap a filesystem watcher for a RabbitMQ consumer, or a serial dispatcher for a SLURM submitter, without touching the rest of your pipeline.
Event-driven. Plugins communicate through message queues and do not share state[^1]. When a monitor detects a file, it emits an event. A job builder consumes that event, groups files, and emits a job. A dispatcher picks up the job and runs it. Each stage is decoupled and independently scalable and duplicatable.
Distributable. The broker backend (AMQP, Redis, in-memory or many others) determines your deployment topology. Run everything in one process for development, or spread plugins across machines or even networks for production.
Observable by default. Every plugin exposes health checks and Prometheus metrics. Structured logs carry correlation IDs from file arrival to final product. Optional Loki and Grafana integration for centralized monitoring.
How It Works
Courier runs a central Service that coordinates three stages of plugins through a message broker:
[Data Monitor] → detects new files, emits events
↓ (broker queue)
[Job Builder] → groups files into complete jobs
↓ (broker queue)
[Dispatcher] → executes the processing workflow
Each plugin runs in its own thread with independent health monitoring and automatic restart on failure. Configuration is validated at startup with Pydantic — not halfway through a run.
Quick Start
pip install courier
Running the service
Point Courier at a YAML config and start it:
courier run --config my_config.yaml
Validate your config before running:
courier validate --config my_config.yaml
Writing a data monitor
Subclass DataMonitorBasePlugin and implement find_file as a generator that yields File objects. The base class handles threading, metadata enrichment, and queue emission for you.
from collections.abc import Generator
from pathlib import Path
from courier.interfaces.module_based.data_monitors import DataMonitorBasePlugin
from courier.service import Service
from courier.types.file import File
interface = "data_monitors"
family = "standard"
name = "my_monitor"
class MyMonitor(DataMonitorBasePlugin):
name = "my-monitor"
version = "1.0.0"
def __init__(self, service: Service, config: dict) -> None:
super().__init__(service, config)
self.watch_dir = Path(config["path"])
def find_file(self) -> Generator[File, None, None]:
# Yield File objects as they appear — the base class
# handles metadata, emission, and metrics automatically.
for path in self.watch_dir.iterdir():
if path.is_file():
yield File(file=path, hostname="localhost")
Satellite Data Support
Courier ships with YAML configs for common satellite instruments including GOES-16/18/19 ABI, Himawari-9 AHI, GK-2A AMI, and Meteosat SEVIRI. These configs define file-matching patterns, expected file counts per scan, and metadata extraction rules — so Courier knows when a complete observation has arrived and how to label it.
Development
pip install -e .[doc,lint,test]
pre-commit install
pre-commit run --all-files
Python 3.11–3.14. Strict mypy. Conventional Commits.
License
See LICENSE for details.
[^1]: Except for rare edge cases for high availability deployments on clusters.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file runcourier-0.3.0.tar.gz.
File metadata
- Download URL: runcourier-0.3.0.tar.gz
- Upload date:
- Size: 98.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
71bb909c07e48439cfe114c5ef8b96b0baf39b9aa7a73348cdc70954e57cec38
|
|
| MD5 |
89db24ea9c29da1505a69fa272133962
|
|
| BLAKE2b-256 |
cd5abbb0acce6acadc49b0d359c2c5fc6df6607a2293c0b07b452b3ee9f4f567
|
Provenance
The following attestation bundles were made for runcourier-0.3.0.tar.gz:
Publisher:
package-and-publish.yaml on CIRA-GEOIPS/courier
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
runcourier-0.3.0.tar.gz -
Subject digest:
71bb909c07e48439cfe114c5ef8b96b0baf39b9aa7a73348cdc70954e57cec38 - Sigstore transparency entry: 1405175848
- Sigstore integration time:
-
Permalink:
CIRA-GEOIPS/courier@863a9455fef5cee73e6110ef3b607d507fd1f939 -
Branch / Tag:
refs/tags/1.0.0-alpha.3 - Owner: https://github.com/CIRA-GEOIPS
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
package-and-publish.yaml@863a9455fef5cee73e6110ef3b607d507fd1f939 -
Trigger Event:
release
-
Statement type:
File details
Details for the file runcourier-0.3.0-py3-none-any.whl.
File metadata
- Download URL: runcourier-0.3.0-py3-none-any.whl
- Upload date:
- Size: 138.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fd2b13c3386917633581ae581076e725abd6b40129f6eb9b273b665a4b1a67f2
|
|
| MD5 |
6d21a877f91b041f7d9693e34a831fb9
|
|
| BLAKE2b-256 |
c07f200c3bffb4c6c6b5ee19c1fe7e435d985f5b2d1412d0c2fa68914e419ef5
|
Provenance
The following attestation bundles were made for runcourier-0.3.0-py3-none-any.whl:
Publisher:
package-and-publish.yaml on CIRA-GEOIPS/courier
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
runcourier-0.3.0-py3-none-any.whl -
Subject digest:
fd2b13c3386917633581ae581076e725abd6b40129f6eb9b273b665a4b1a67f2 - Sigstore transparency entry: 1405175944
- Sigstore integration time:
-
Permalink:
CIRA-GEOIPS/courier@863a9455fef5cee73e6110ef3b607d507fd1f939 -
Branch / Tag:
refs/tags/1.0.0-alpha.3 - Owner: https://github.com/CIRA-GEOIPS
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
package-and-publish.yaml@863a9455fef5cee73e6110ef3b607d507fd1f939 -
Trigger Event:
release
-
Statement type: