Skip to main content

File delivery monitoring and archival service

Project description

Carbonation

File delivery monitoring and archival service. Watches directories for incoming files, groups them by stem, validates integrity, extracts metadata via plugins, applies selection and routing rules, and archives to configurable destinations.

Installation

Requires Python 3.11+.

pip install carbonation

For MariaDB or PostgreSQL support:

pip install "carbonation[mariadb]"     # mysqlclient
pip install "carbonation[postgresql]"  # psycopg2

Quick Start

# 1. Scaffold a new project (config + rules + plugin package skeleton)
mkdir /opt/carbonation && cd /opt/carbonation
carbonation init

# 2. Edit the generated files
$EDITOR config.toml                  # watch paths, archive destination, database
$EDITOR rules.toml                   # integrity, completeness, selection, routing
$EDITOR my_plugin/my_plugin/model.py        # File columns
$EDITOR my_plugin/my_plugin/extractors.py   # extract() implementations

# 3. Install the plugin, validate, initialize, and run
pip install -e ./my_plugin           # or: uv pip install -e ./my_plugin
carbonation check config             # validate configuration
carbonation db init                  # run core + plugin migrations
carbonation run                      # start the service

carbonation init generates a complete starter project:

config.toml                       # service, logging, database, watch, archive
rules.toml                        # integrity, completeness, metadata, selection, routing
my_plugin/
├── pyproject.toml                # declares the carbonation.plugin entry point
└── my_plugin/
    ├── __init__.py               # builds `plugin = Plugin(...)`
    ├── model.py                  # File model (FileBase subclass)
    ├── extractors.py             # extract_sample() etc.
    ├── rules.py                  # optional grouping/selection/routing fns
    └── migrations/versions/      # alembic revisions on the plugin's own branch

See Plugin System for details on the Plugin contract.

CLI Reference

carbonation check config                        Validate config.toml and rules.toml
carbonation check rules FILE                    Dry-run rules changes against DB
carbonation db init                             Run core + plugin migrations
carbonation db upgrade                          Apply pending schema migrations
carbonation db revision -m "..."                Generate a new plugin alembic revision
                           [--autogenerate]
carbonation db status                           Component counts and metadata check
carbonation run [--dry-run] [--once]            Start the service
carbonation status                              Heartbeat, status table, watch health
carbonation retry [--watch-name X]              Re-enqueue failed components
carbonation reconcile delivery                  Sync delivery directories with DB
carbonation reconcile archive [--watch NAME]    Sync archive directories with DB
carbonation query files [filters]               Query with dynamic plugin filters

Query Examples

# Filter by plugin columns (dynamically generated from your model)
carbonation query files --category alpha --limit 50

# Date range overlap
carbonation query files --daterange 2026-01-01/2026-02-01

# Recent files by age
carbonation query files --age 7d

# Output formats
carbonation query files --category alpha --format json
carbonation query files --format csv > export.csv

# Count only
carbonation query files --category alpha --count

# Specific columns, sorted
carbonation query files --columns group_key,category,start --order-by -start

Programmatic API

Query the database from Python scripts without going through the CLI:

from carbonation import connect

with connect("config.toml") as db:
    # All files from a watch
    rows = db.query_files({"watch_name": ["incoming"]}, limit=50)
    for row in rows:
        print(row["group_key"], row["created_at"])

    # Count
    n = db.query_files({"complete": True}, count_only=True)

    # Date filtering + plugin columns
    from datetime import datetime
    rows = db.query_files({
        "category": ["alpha"],
        "created_at_after": datetime(2026, 1, 1),
    }, order_by="-created_at", columns=["group_key", "category", "start"])

For one-off scripts, use the convenience function that handles setup and teardown in one call:

from carbonation import query_files

rows = query_files("config.toml", {"category": ["alpha"]}, limit=50)

Watermark-based polling

For external consumers (e.g. an orchestrator) that need to poll for newly completed files without gaps:

from carbonation import connect

cursor = None  # persist this between invocations
with connect("config.toml") as db:
    while True:
        rows, cursor = db.get_new_files(cursor=cursor)
        if not rows:
            break
        for row in rows:
            print(row["group_key"], row["completed_at"])

    # Check if the service is alive
    status = db.get_service_status()
    if status:
        print(f"Last heartbeat: {status['last_heartbeat']}")

ORM access

For direct ORM access to query, modify, and commit changes:

from carbonation import session
from carbonation.db.models import FileComponent, FileStatus

with session("config.toml") as s:
    comps = s.query(FileComponent).filter_by(status=FileStatus.ARCHIVED).all()
    for c in comps:
        c.status = FileStatus.CLEARED
    s.commit()

Filter operators

Suffix Operator Example
_after / _before >= / <= (datetime) {"created_at_after": datetime(2026, 1, 1)}
_min / _max >= / <= (numeric) {"id_min": 100}
_daterange start/stop overlap {"_daterange": (start, end)}
_age recency {"_age": datetime(2026, 3, 20)}
(none) exact / IN / LIKE {"category": ["alpha", "beta"]}

Architecture

Processing Pipeline

flowchart TD
    A["File arrives in<br>watch directory"] --> B["Watchdog detects<br>creation / modification"]
    B --> C["Settle timer<br>debounces"]
    C --> D["Event enqueued"]
    D --> E["Worker thread<br>picks up event"]

    E --> S1["1 — Record component"]
    S1 --> IC{{"2 — Integrity checks<br>(min_size, readability)"}}

    IC -- fail --> IF["INTEGRITY_FAILED"]
    IC -- pass --> S3{{"3 — Completeness<br>grouping"}}

    S3 -- "incomplete<br>(waiting for extensions)" --> W["WAITING"]
    S3 -- "timeout<br>(on_timeout=skip)" --> TO["TIMED_OUT"]
    S3 -- "complete / standalone" --> S4["4 — Metadata extraction<br>(plugin extract)"]

    S4 --> S5{{"5 — Selection rules"}}
    S5 -- rejected --> NS["NOT_SELECTED"]
    S5 -- accepted --> S6["6 — Routing<br>(choose archive)"]

    S6 --> S7{{"7 — Archive<br>(copy / move / hardlink / symlink)"}}
    S7 -- success --> AR["ARCHIVED"]
    S7 -- "failure<br>(retries left)" --> RP["RETRY_PENDING"]
    S7 -- "failure<br>(retries exhausted)" --> FA["FAILED"]

    RP -. "retry_delay<br>elapsed" .-> E
    IF -. "reconciliation<br>(re-assessed)" .-> E
    NS -. "rules hot-reload<br>(re-evaluated)" .-> E

    style IF fill:#d32f2f,color:#fff
    style FA fill:#d32f2f,color:#fff
    style TO fill:#757575,color:#fff
    style NS fill:#757575,color:#fff
    style W fill:#f9a825
    style RP fill:#f57c00,color:#fff
    style AR fill:#388e3c,color:#fff

Reliability

  • Retry with backoff: Archive failures are retried up to max_retries times with retry_delay between attempts. Only permanently failed after all retries exhausted.
  • Post-archive verification: File size verified after copy/move. Mismatched files are deleted and retried.
  • Reconciliation: Periodic sync between filesystem and database catches missed files, clears removed files, re-enqueues pending work.
  • Rules hot-reload: The service watches rules.toml for changes and validates before applying. Invalid changes are rejected without disruption.

Plugin System

A plugin is an installable Python package that declares a carbonation.plugin entry point. The package's __init__.py builds a Plugin spec object that carbonation imports at startup:

from pathlib import Path
from carbonation import Plugin
from .model import File
from .extractors import extract_sample
from .rules import should_archive

plugin = Plugin(
    file_model=File,                                          # FileBase subclass
    extractors={"sample": extract_sample},                    # named extractors
    selection_functions={"should_archive": should_archive},   # optional rule fns
    # group_functions={...}, routing_functions={...} also supported
    migrations_path=Path(__file__).parent / "migrations" / "versions",
    query_defaults={"order_by": "-start"},
)

Discovery: set [plugins] package = "my_plugin" in config.toml to pin a package, or omit the field and carbonation will use the single registered entry point. rules.toml references plugin callables by short key — e.g. metadata.extract = "sample", selection.function = "should_archive" — and missing keys fail at config-load time with a ConfigError listing the available options.

Schema: the plugin owns its own alembic migrations. Carbonation's core migrations live on a "core" branch; the plugin ships revisions on its own branch (e.g. "my_plugin") with depends_on = ("core",). carbonation db init / db upgrade compose both version locations into a single alembic upgrade heads so the two branches advance together. Use carbonation db revision -m "..." [--autogenerate] to generate a new plugin revision — the wrapper handles first-revision bootstrapping automatically.

File model contract: the FileBase subclass must set __tablename__ = "files" and its domain columns must be nullable (they're populated after the group row is created).

For a full walkthrough of the Plugin contract, callable signatures, alembic branch mechanics, and testing patterns, see docs/plugin-development.md.

Configuration

  • config.toml: Service settings, logging, database, plugin package, watch directories, archive destinations
  • rules.toml: Integrity checks, completeness grouping (stem or plugin), metadata extractor, selection rules, routing rules

Paths are resolved relative to the config file's directory. archive.destination is resolved relative to archive.base_path.

Rules can be hot-reloaded by editing rules.toml while the service is running.

Deployment

Setting up a production install

Create a dedicated virtualenv so the service has an isolated, reproducible Python environment:

# Create the project directory and venv
sudo mkdir -p /opt/carbonation
cd /opt/carbonation
python3 -m venv .venv

# Install carbonation into the venv
.venv/bin/pip install carbonation
# Add database drivers if needed:
# .venv/bin/pip install "carbonation[mariadb]"
# .venv/bin/pip install "carbonation[postgresql]"

# Scaffold config, rules, and plugin package skeleton
.venv/bin/carbonation init

# Edit to match your environment
$EDITOR config.toml rules.toml my_plugin/my_plugin/model.py my_plugin/my_plugin/extractors.py

# Install the plugin package, then validate and initialize
.venv/bin/pip install -e ./my_plugin
.venv/bin/carbonation check config
.venv/bin/carbonation db init

systemd

Generate a systemd unit file during scaffolding:

.venv/bin/carbonation init --systemd --user carbonation

Then install and enable it:

sudo cp carbonation.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable --now carbonation
sudo systemctl status carbonation

The generated unit file uses Type=notify with WatchdogSec=300 — systemd waits for carbonation to signal readiness before marking it as started, and automatically restarts it if the watchdog heartbeat stops (e.g. hung process). Edit ReadWritePaths= in the unit file to include your delivery and archive directories.

WatchdogSec should be at least 2x your heartbeat_interval in config.toml (default: 60s).

Cron

For environments where a persistent service is not needed, run carbonation as a cron job:

*/5 * * * * /opt/carbonation/.venv/bin/carbonation -c /opt/carbonation/config.toml run --once

--once reconciles delivery directories, processes all pending events, and exits. It can also supplement a running service as defense-in-depth.

Upgrading

cd /opt/carbonation
.venv/bin/pip install --upgrade carbonation            # core
.venv/bin/pip install --upgrade -e ./my_plugin         # or --upgrade <published-plugin>
.venv/bin/carbonation -c config.toml db upgrade        # run pending migrations
sudo systemctl restart carbonation

db upgrade applies pending Alembic migrations on both the core and plugin branches in a single upgrade heads. It is idempotent — running it twice is a no-op.

Monitoring

.venv/bin/carbonation -c /opt/carbonation/config.toml status

The status command shows:

  • Service heartbeat age (warns if stale)
  • Component status breakdown by time window (1h/4h/1d/7d/30d)
  • Delivery and archive totals
  • Retry queue depth
  • Oldest incomplete group
  • Watch directory health

For log aggregation, set format = "json" in the logging config to produce structured JSON lines.

Development

uv sync                                    # install dependencies
uv run pytest tests/                       # all tests (~50s)
uv run pytest tests/ -m "not integration"  # unit tests only (~2s)
uv run ruff check src/ tests/              # lint

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

carbonation-0.0.6.tar.gz (209.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

carbonation-0.0.6-py3-none-any.whl (82.6 kB view details)

Uploaded Python 3

File details

Details for the file carbonation-0.0.6.tar.gz.

File metadata

  • Download URL: carbonation-0.0.6.tar.gz
  • Upload date:
  • Size: 209.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for carbonation-0.0.6.tar.gz
Algorithm Hash digest
SHA256 8ab607449659105c87e2bd6a1c42e7a351ea197f4a4333a441a14e86309259e9
MD5 48e5e4047014f5ceb012dc3203f40bed
BLAKE2b-256 df27f736a7570bd10e1af5b74570df0fef07d04f11a52153b7b839802e9b99a1

See more details on using hashes here.

File details

Details for the file carbonation-0.0.6-py3-none-any.whl.

File metadata

  • Download URL: carbonation-0.0.6-py3-none-any.whl
  • Upload date:
  • Size: 82.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for carbonation-0.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 b7afcd8491f3172a84b9897bee584b1d55e31396d2a50d99f55b63b61321efb6
MD5 5075d1f881ce9a231641840738bab6b6
BLAKE2b-256 40f763b93c32d8e3992157f17dcbcc8f7ca85abfa4416eba871dd5752eb675bc

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page