File delivery monitoring and archival service
Project description
Carbonation
File delivery monitoring and archival service. Watches directories for incoming files, groups them by stem, validates integrity, extracts metadata via plugins, applies selection and routing rules, and archives to configurable destinations.
Installation
Requires Python 3.11+.
pip install carbonation
For MariaDB or PostgreSQL support:
pip install "carbonation[mariadb]" # mysqlclient
pip install "carbonation[postgresql]" # psycopg2
Quick Start
# 1. Scaffold a new project
mkdir /opt/carbonation && cd /opt/carbonation
carbonation init
# 2. Edit the generated files
$EDITOR config.toml # set watch paths, archive destination, database
$EDITOR rules.toml # configure integrity, completeness, selection rules
$EDITOR plugins/file_model.py # define your File model columns and extract()
# 3. Validate, initialize, and run
carbonation check-config # validate configuration
carbonation db init # create database tables
carbonation run # start the service
carbonation init generates a complete starter project:
config.toml # service, logging, database, watch, and archive config
rules.toml # integrity, completeness, metadata, selection, routing
plugins/file_model.py # File model + extract() function template
Edit plugins/file_model.py to define the domain-specific columns for your use case and the extract() function that reads metadata from your files. See Plugin System for details.
CLI Reference
carbonation check-config Validate config.toml and rules.toml
carbonation db init Create schema (migrations + plugin columns)
carbonation db upgrade Apply pending schema migrations only
carbonation db status Component counts and metadata check
carbonation run [--dry-run] [--once] Start the service
carbonation status Heartbeat, status table, watch health
carbonation retry [--watch-name X] Re-enqueue failed components
carbonation check-rules FILE Dry-run rules changes against DB
carbonation reconcile delivery Sync delivery directories with DB
carbonation reconcile archive Sync archive directories with DB
carbonation query files [filters] Query with dynamic plugin filters
Query Examples
# Filter by plugin columns (dynamically generated from your model)
carbonation query files --category alpha --limit 50
# Date range overlap
carbonation query files --daterange 2026-01-01/2026-02-01
# Recent files by age
carbonation query files --age 7d
# Output formats
carbonation query files --category alpha --format json
carbonation query files --format csv > export.csv
# Count only
carbonation query files --category alpha --count
# Specific columns, sorted
carbonation query files --columns group_key,category,start --order-by -start
Programmatic API
Query the database from Python scripts without going through the CLI:
from carbonation import connect
with connect("config.toml") as db:
# All files from a watch
rows = db.query_files({"watch_name": ["incoming"]}, limit=50)
for row in rows:
print(row["group_key"], row["created_at"])
# Count
n = db.query_files({"complete": True}, count_only=True)
# Date filtering + plugin columns
from datetime import datetime
rows = db.query_files({
"category": ["alpha"],
"created_at_after": datetime(2026, 1, 1),
}, order_by="-created_at", columns=["group_key", "category", "start"])
For one-off scripts, use the convenience function that handles setup and teardown in one call:
from carbonation import query_files
rows = query_files("config.toml", {"category": ["alpha"]}, limit=50)
Watermark-based polling
For external consumers (e.g. an orchestrator) that need to poll for newly completed files without gaps:
from carbonation import connect
cursor = None # persist this between invocations
with connect("config.toml") as db:
while True:
rows, cursor = db.get_new_files(cursor=cursor)
if not rows:
break
for row in rows:
print(row["group_key"], row["completed_at"])
# Check if the service is alive
status = db.get_service_status()
if status:
print(f"Last heartbeat: {status['last_heartbeat']}")
ORM access
For direct ORM access to query, modify, and commit changes:
from carbonation import session
from carbonation.db.models import FileComponent, FileStatus
with session("config.toml") as s:
comps = s.query(FileComponent).filter_by(status=FileStatus.ARCHIVED).all()
for c in comps:
c.status = FileStatus.CLEARED
s.commit()
Filter operators
| Suffix | Operator | Example |
|---|---|---|
_after / _before |
>= / <= (datetime) |
{"created_at_after": datetime(2026, 1, 1)} |
_min / _max |
>= / <= (numeric) |
{"id_min": 100} |
_daterange |
start/stop overlap | {"_daterange": (start, end)} |
_age |
recency | {"_age": datetime(2026, 3, 20)} |
| (none) | exact / IN / LIKE | {"category": ["alpha", "beta"]} |
Architecture
Processing Pipeline
flowchart TD
A["File arrives in<br>watch directory"] --> B["Watchdog detects<br>creation / modification"]
B --> C["Settle timer<br>debounces"]
C --> D["Event enqueued"]
D --> E["Worker thread<br>picks up event"]
E --> S1["1 — Record component"]
S1 --> IC{{"2 — Integrity checks<br>(min_size, readability)"}}
IC -- fail --> IF["INTEGRITY_FAILED"]
IC -- pass --> S3{{"3 — Completeness<br>grouping"}}
S3 -- "incomplete<br>(waiting for extensions)" --> W["WAITING"]
S3 -- "timeout<br>(on_timeout=skip)" --> TO["TIMED_OUT"]
S3 -- "complete / standalone" --> S4["4 — Metadata extraction<br>(plugin extract)"]
S4 --> S5{{"5 — Selection rules"}}
S5 -- rejected --> NS["NOT_SELECTED"]
S5 -- accepted --> S6["6 — Routing<br>(choose archive)"]
S6 --> S7{{"7 — Archive<br>(copy / move / hardlink / symlink)"}}
S7 -- success --> AR["ARCHIVED"]
S7 -- "failure<br>(retries left)" --> RP["RETRY_PENDING"]
S7 -- "failure<br>(retries exhausted)" --> FA["FAILED"]
RP -. "retry_delay<br>elapsed" .-> E
IF -. "reconciliation<br>(re-assessed)" .-> E
NS -. "rules hot-reload<br>(re-evaluated)" .-> E
style IF fill:#d32f2f,color:#fff
style FA fill:#d32f2f,color:#fff
style TO fill:#757575,color:#fff
style NS fill:#757575,color:#fff
style W fill:#f9a825
style RP fill:#f57c00,color:#fff
style AR fill:#388e3c,color:#fff
Reliability
- Retry with backoff: Archive failures are retried up to
max_retriestimes withretry_delaybetween attempts. Only permanently failed after all retries exhausted. - Post-archive verification: File size verified after copy/move. Mismatched files are deleted and retried.
- Reconciliation: Periodic sync between filesystem and database catches missed files, clears removed files, re-enqueues pending work.
- Rules hot-reload: The service watches
rules.tomlfor changes and validates before applying. Invalid changes are rejected without disruption.
Plugin System
Plugins are .py files in the configured plugins directory. A plugin must provide:
- A
FileBasesubclass with__tablename__ = "files"defining domain-specific columns - An
extract(file_paths: list[Path]) -> dictfunction that returns metadata for the domain columns
Domain columns must be nullable since they are populated after the group row is created.
Configuration
- config.toml: Service settings, logging, database, plugins, watch directories, archive destinations
- rules.toml: Integrity checks, completeness grouping, metadata module, selection rules, routing rules
Paths are resolved relative to the config file's directory. archive.destination is resolved relative to archive.base_path.
Rules can be hot-reloaded by editing rules.toml while the service is running.
Deployment
Setting up a production install
Create a dedicated virtualenv so the service has an isolated, reproducible Python environment:
# Create the project directory and venv
sudo mkdir -p /opt/carbonation
cd /opt/carbonation
python3 -m venv .venv
# Install carbonation into the venv
.venv/bin/pip install carbonation
# Add database drivers if needed:
# .venv/bin/pip install "carbonation[mariadb]"
# .venv/bin/pip install "carbonation[postgresql]"
# Scaffold config, rules, and plugin template
.venv/bin/carbonation init
# Edit to match your environment
$EDITOR config.toml rules.toml plugins/file_model.py
# Validate and initialize
.venv/bin/carbonation check-config
.venv/bin/carbonation db init
systemd
Generate a systemd unit file during scaffolding:
.venv/bin/carbonation init --systemd --user carbonation
Then install and enable it:
sudo cp carbonation.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable --now carbonation
sudo systemctl status carbonation
The generated unit file uses Type=notify with WatchdogSec=300 — systemd waits for carbonation to signal readiness before marking it as started, and automatically restarts it if the watchdog heartbeat stops (e.g. hung process). Edit ReadWritePaths= in the unit file to include your delivery and archive directories.
WatchdogSec should be at least 2x your heartbeat_interval in config.toml (default: 60s).
Cron
For environments where a persistent service is not needed, run carbonation as a cron job:
*/5 * * * * /opt/carbonation/.venv/bin/carbonation -c /opt/carbonation/config.toml run --once
--once reconciles delivery directories, processes all pending events, and exits. It can also supplement a running service as defense-in-depth.
Upgrading
cd /opt/carbonation
.venv/bin/pip install --upgrade carbonation
.venv/bin/carbonation -c config.toml db upgrade # apply schema migrations
.venv/bin/carbonation -c config.toml db init # sync plugin columns
sudo systemctl restart carbonation
db upgrade applies pending Alembic migrations (carbonation's base tables). db init then adds any new plugin-defined columns. Both are idempotent.
Monitoring
.venv/bin/carbonation -c /opt/carbonation/config.toml status
The status command shows:
- Service heartbeat age (warns if stale)
- Component status breakdown by time window (1h/4h/1d/7d/30d)
- Delivery and archive totals
- Retry queue depth
- Oldest incomplete group
- Watch directory health
For log aggregation, set format = "json" in the logging config to produce structured JSON lines.
Development
uv sync # install dependencies
uv run pytest tests/ # all tests (~50s)
uv run pytest tests/ -m "not integration" # unit tests only (~2s)
uv run ruff check src/ tests/ # lint
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file carbonation-0.0.5.tar.gz.
File metadata
- Download URL: carbonation-0.0.5.tar.gz
- Upload date:
- Size: 173.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2d05a4a90dc7c8991d1907a30e7a6a865d5417c084717bf371df279a11329207
|
|
| MD5 |
374b4bc1c20ee46aff88b7ae4f8df392
|
|
| BLAKE2b-256 |
01576dca2c0b3f964befdb0f1c0b5e4e48e4e471992b46cd4174420e77ff59be
|
File details
Details for the file carbonation-0.0.5-py3-none-any.whl.
File metadata
- Download URL: carbonation-0.0.5-py3-none-any.whl
- Upload date:
- Size: 65.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cd8634d4344fd0a1e8dbcf5e6d587309003ad51d952c408d4fffb52a915c8c4b
|
|
| MD5 |
83337b1df39deed931b54c49217523b7
|
|
| BLAKE2b-256 |
6ae079ae2a5590439f1e7b40359aeb463a6f4477ebad87e5b7d009d72c069c04
|