Skip to main content

File delivery monitoring and archival service

Project description

Carbonation

File delivery monitoring and archival service. Watches directories for incoming files, groups them by stem, validates integrity, extracts metadata via plugins, applies selection and routing rules, and archives to configurable destinations.

Installation

Requires Python 3.11+.

pip install carbonation

For MariaDB or PostgreSQL support:

pip install "carbonation[mariadb]"     # mysqlclient
pip install "carbonation[postgresql]"  # psycopg2

Quick Start

# 1. Scaffold a new project
mkdir /opt/carbonation && cd /opt/carbonation
carbonation init

# 2. Edit the generated files
$EDITOR config.toml       # set watch paths, archive destination, database
$EDITOR rules.toml        # configure integrity, completeness, selection rules
$EDITOR plugins/file_model.py  # define your File model columns and extract()

# 3. Validate, initialize, and run
carbonation check-config   # validate configuration
carbonation db init        # create database tables
carbonation run            # start the service

carbonation init generates a complete starter project:

config.toml             # service, logging, database, watch, and archive config
rules.toml              # integrity, completeness, metadata, selection, routing
plugins/file_model.py   # File model + extract() function template

Edit plugins/file_model.py to define the domain-specific columns for your use case and the extract() function that reads metadata from your files. See Plugin System for details.

CLI Reference

carbonation check-config              Validate config.toml and rules.toml
carbonation db init                    Create schema (migrations + plugin columns)
carbonation db upgrade                 Apply pending schema migrations only
carbonation db status                  Component counts and metadata check
carbonation run [--dry-run] [--once]   Start the service
carbonation status                     Heartbeat, status table, watch health
carbonation retry [--watch-name X]     Re-enqueue failed components
carbonation check-rules FILE           Dry-run rules changes against DB
carbonation reconcile delivery         Sync delivery directories with DB
carbonation reconcile archive          Sync archive directories with DB
carbonation query files [filters]      Query with dynamic plugin filters

Query Examples

# Filter by plugin columns (dynamically generated from your model)
carbonation query files --category alpha --limit 50

# Date range overlap
carbonation query files --daterange 2026-01-01/2026-02-01

# Recent files by age
carbonation query files --age 7d

# Output formats
carbonation query files --category alpha --format json
carbonation query files --format csv > export.csv

# Count only
carbonation query files --category alpha --count

# Specific columns, sorted
carbonation query files --columns group_key,category,start --order-by -start

Programmatic API

Query the database from Python scripts without going through the CLI:

from carbonation import connect

with connect("config.toml") as db:
    # All files from a watch
    rows = db.query_files({"watch_name": ["incoming"]}, limit=50)
    for row in rows:
        print(row["group_key"], row["created_at"])

    # Count
    n = db.query_files({"complete": True}, count_only=True)

    # Date filtering + plugin columns
    from datetime import datetime
    rows = db.query_files({
        "category": ["alpha"],
        "created_at_after": datetime(2026, 1, 1),
    }, order_by="-created_at", columns=["group_key", "category", "start"])

For one-off scripts, use the convenience function that handles setup and teardown in one call:

from carbonation import query_files

rows = query_files("config.toml", {"category": ["alpha"]}, limit=50)

Watermark-based polling

For external consumers (e.g. an orchestrator) that need to poll for newly completed files without gaps:

from carbonation import connect

cursor = None  # persist this between invocations
with connect("config.toml") as db:
    while True:
        rows, cursor = db.get_new_files(cursor=cursor)
        if not rows:
            break
        for row in rows:
            print(row["group_key"], row["completed_at"])

    # Check if the service is alive
    status = db.get_service_status()
    if status:
        print(f"Last heartbeat: {status['last_heartbeat']}")

ORM access

For direct ORM access to query, modify, and commit changes:

from carbonation import session
from carbonation.db.models import FileComponent, FileStatus

with session("config.toml") as s:
    comps = s.query(FileComponent).filter_by(status=FileStatus.ARCHIVED).all()
    for c in comps:
        c.status = FileStatus.CLEARED
    s.commit()

Filter operators

Suffix Operator Example
_after / _before >= / <= (datetime) {"created_at_after": datetime(2026, 1, 1)}
_min / _max >= / <= (numeric) {"id_min": 100}
_daterange start/stop overlap {"_daterange": (start, end)}
_age recency {"_age": datetime(2026, 3, 20)}
(none) exact / IN / LIKE {"category": ["alpha", "beta"]}

Architecture

Processing Pipeline

flowchart TD
    A["File arrives in<br>watch directory"] --> B["Watchdog detects<br>creation / modification"]
    B --> C["Settle timer<br>debounces"]
    C --> D["Event enqueued"]
    D --> E["Worker thread<br>picks up event"]

    E --> S1["1 — Record component"]
    S1 --> IC{{"2 — Integrity checks<br>(min_size, readability)"}}

    IC -- fail --> IF["INTEGRITY_FAILED"]
    IC -- pass --> S3{{"3 — Completeness<br>grouping"}}

    S3 -- "incomplete<br>(waiting for extensions)" --> W["WAITING"]
    S3 -- "timeout<br>(on_timeout=skip)" --> TO["TIMED_OUT"]
    S3 -- "complete / standalone" --> S4["4 — Metadata extraction<br>(plugin extract)"]

    S4 --> S5{{"5 — Selection rules"}}
    S5 -- rejected --> NS["NOT_SELECTED"]
    S5 -- accepted --> S6["6 — Routing<br>(choose archive)"]

    S6 --> S7{{"7 — Archive<br>(copy / move / hardlink / symlink)"}}
    S7 -- success --> AR["ARCHIVED"]
    S7 -- "failure<br>(retries left)" --> RP["RETRY_PENDING"]
    S7 -- "failure<br>(retries exhausted)" --> FA["FAILED"]

    RP -. "retry_delay<br>elapsed" .-> E
    IF -. "reconciliation<br>(re-assessed)" .-> E
    NS -. "rules hot-reload<br>(re-evaluated)" .-> E

    style IF fill:#d32f2f,color:#fff
    style FA fill:#d32f2f,color:#fff
    style TO fill:#757575,color:#fff
    style NS fill:#757575,color:#fff
    style W fill:#f9a825
    style RP fill:#f57c00,color:#fff
    style AR fill:#388e3c,color:#fff

Reliability

  • Retry with backoff: Archive failures are retried up to max_retries times with retry_delay between attempts. Only permanently failed after all retries exhausted.
  • Post-archive verification: File size verified after copy/move. Mismatched files are deleted and retried.
  • Reconciliation: Periodic sync between filesystem and database catches missed files, clears removed files, re-enqueues pending work.
  • Rules hot-reload: The service watches rules.toml for changes and validates before applying. Invalid changes are rejected without disruption.

Plugin System

Plugins are .py files in the configured plugins directory. A plugin must provide:

  • A FileBase subclass with __tablename__ = "files" defining domain-specific columns
  • An extract(file_paths: list[Path]) -> dict function that returns metadata for the domain columns

Domain columns must be nullable since they are populated after the group row is created.

Configuration

  • config.toml: Service settings, logging, database, plugins, watch directories, archive destinations
  • rules.toml: Integrity checks, completeness grouping, metadata module, selection rules, routing rules

Paths are resolved relative to the config file's directory. archive.destination is resolved relative to archive.base_path.

Rules can be hot-reloaded by editing rules.toml while the service is running.

Deployment

Setting up a production install

Create a dedicated virtualenv so the service has an isolated, reproducible Python environment:

# Create the project directory and venv
sudo mkdir -p /opt/carbonation
cd /opt/carbonation
python3 -m venv .venv

# Install carbonation into the venv
.venv/bin/pip install carbonation
# Add database drivers if needed:
# .venv/bin/pip install "carbonation[mariadb]"
# .venv/bin/pip install "carbonation[postgresql]"

# Scaffold config, rules, and plugin template
.venv/bin/carbonation init

# Edit to match your environment
$EDITOR config.toml rules.toml plugins/file_model.py

# Validate and initialize
.venv/bin/carbonation check-config
.venv/bin/carbonation db init

systemd

Generate a systemd unit file during scaffolding:

.venv/bin/carbonation init --systemd --user carbonation

Then install and enable it:

sudo cp carbonation.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable --now carbonation
sudo systemctl status carbonation

The generated unit file uses Type=notify with WatchdogSec=300 — systemd waits for carbonation to signal readiness before marking it as started, and automatically restarts it if the watchdog heartbeat stops (e.g. hung process). Edit ReadWritePaths= in the unit file to include your delivery and archive directories.

WatchdogSec should be at least 2x your heartbeat_interval in config.toml (default: 60s).

Cron

For environments where a persistent service is not needed, run carbonation as a cron job:

*/5 * * * * /opt/carbonation/.venv/bin/carbonation -c /opt/carbonation/config.toml run --once

--once reconciles delivery directories, processes all pending events, and exits. It can also supplement a running service as defense-in-depth.

Upgrading

cd /opt/carbonation
.venv/bin/pip install --upgrade carbonation
.venv/bin/carbonation -c config.toml db upgrade   # apply schema migrations
.venv/bin/carbonation -c config.toml db init       # sync plugin columns
sudo systemctl restart carbonation

db upgrade applies pending Alembic migrations (carbonation's base tables). db init then adds any new plugin-defined columns. Both are idempotent.

Monitoring

.venv/bin/carbonation -c /opt/carbonation/config.toml status

The status command shows:

  • Service heartbeat age (warns if stale)
  • Component status breakdown by time window (1h/4h/1d/7d/30d)
  • Delivery and archive totals
  • Retry queue depth
  • Oldest incomplete group
  • Watch directory health

For log aggregation, set format = "json" in the logging config to produce structured JSON lines.

Development

uv sync                                    # install dependencies
uv run pytest tests/                       # all tests (~50s)
uv run pytest tests/ -m "not integration"  # unit tests only (~2s)
uv run ruff check src/ tests/              # lint

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

carbonation-0.0.1.tar.gz (125.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

carbonation-0.0.1-py3-none-any.whl (57.7 kB view details)

Uploaded Python 3

File details

Details for the file carbonation-0.0.1.tar.gz.

File metadata

  • Download URL: carbonation-0.0.1.tar.gz
  • Upload date:
  • Size: 125.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.2 {"installer":{"name":"uv","version":"0.11.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for carbonation-0.0.1.tar.gz
Algorithm Hash digest
SHA256 7a3f1cf645076d846e72af136711de4d9b13ffb42b3274fd1f4343e38fc40400
MD5 9810a777fcaf9f21e0c32f562e1a4978
BLAKE2b-256 9c1e29037d987dae41a82d33b67acfd6c3621033e5f8b64b7c3ad9618e6e49e9

See more details on using hashes here.

File details

Details for the file carbonation-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: carbonation-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 57.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.2 {"installer":{"name":"uv","version":"0.11.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for carbonation-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 c251e94ac520a39554d72087beb00bc36c996379958f55e20218f8b83bca19a4
MD5 0441c9d68357990815750ed3cbefb908
BLAKE2b-256 c693dc443f96c13997115348c849b3fbefb3318f0498835c923159c98257a99b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page